Refactor supervisor listener startup and add metrics

* Refactor agent supervisor listener startup and authn/authz to use upstream
  auth delegators to perform for SubjectAccessReview for access to
  metrics.
* Convert spegel and pprof handlers over to new structure.
* Promote bind-address to agent flag to allow setting supervisor bind
  address for both agent and server.
* Promote enable-pprof to agent flag to allow profiling agents. Access
  to the pprof endpoint now requires client cert auth, similar to the
  spegel registry api endpoint.
* Add prometheus metrics handler.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
This commit is contained in:
Brad Davidson 2024-04-25 01:02:05 +00:00
parent dba30ab21c
commit 4f324da44e
No known key found for this signature in database
GPG Key ID: FFB7A9376A9349B9
17 changed files with 462 additions and 141 deletions

View File

@ -512,12 +512,14 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
SELinux: envInfo.EnableSELinux,
ContainerRuntimeEndpoint: envInfo.ContainerRuntimeEndpoint,
ImageServiceEndpoint: envInfo.ImageServiceEndpoint,
EnablePProf: envInfo.EnablePProf,
EmbeddedRegistry: controlConfig.EmbeddedRegistry,
FlannelBackend: controlConfig.FlannelBackend,
FlannelIPv6Masq: controlConfig.FlannelIPv6Masq,
FlannelExternalIP: controlConfig.FlannelExternalIP,
EgressSelectorMode: controlConfig.EgressSelectorMode,
ServerHTTPSPort: controlConfig.HTTPSPort,
SupervisorMetrics: controlConfig.SupervisorMetrics,
Token: info.String(),
}
nodeConfig.FlannelIface = flannelIface
@ -580,13 +582,18 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
nodeConfig.Containerd.Template = filepath.Join(envInfo.DataDir, "agent", "etc", "containerd", "config.toml.tmpl")
nodeConfig.Certificate = servingCert
nodeConfig.AgentConfig.NodeIPs = nodeIPs
listenAddress, _, _, err := util.GetDefaultAddresses(nodeIPs[0])
if err != nil {
return nil, errors.Wrap(err, "cannot configure IPv4/IPv6 node-ip")
if envInfo.BindAddress != "" {
nodeConfig.AgentConfig.ListenAddress = envInfo.BindAddress
} else {
listenAddress, _, _, err := util.GetDefaultAddresses(nodeIPs[0])
if err != nil {
return nil, errors.Wrap(err, "cannot configure IPv4/IPv6 node-ip")
}
nodeConfig.AgentConfig.ListenAddress = listenAddress
}
nodeConfig.AgentConfig.NodeIP = nodeIPs[0].String()
nodeConfig.AgentConfig.ListenAddress = listenAddress
nodeConfig.AgentConfig.NodeIPs = nodeIPs
nodeConfig.AgentConfig.NodeExternalIPs = nodeExternalIPs
// if configured, set NodeExternalIP to the first IPv4 address, for legacy clients
@ -677,6 +684,8 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
nodeConfig.AgentConfig.ImageCredProvConfig = envInfo.ImageCredProvConfig
nodeConfig.AgentConfig.DisableCCM = controlConfig.DisableCCM
nodeConfig.AgentConfig.DisableNPC = controlConfig.DisableNPC
nodeConfig.AgentConfig.MinTLSVersion = controlConfig.MinTLSVersion
nodeConfig.AgentConfig.CipherSuites = controlConfig.CipherSuites
nodeConfig.AgentConfig.Rootless = envInfo.Rootless
nodeConfig.AgentConfig.PodManifests = filepath.Join(envInfo.DataDir, "agent", DefaultPodManifestPath)
nodeConfig.AgentConfig.ProtectKernelDefaults = envInfo.ProtectKernelDefaults

106
pkg/agent/https/https.go Normal file
View File

@ -0,0 +1,106 @@
package https
import (
"context"
"net/http"
"strconv"
"sync"
"github.com/gorilla/mux"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/generated/clientset/versioned/scheme"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authorization/authorizer"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/options"
)
// RouterFunc provides a hook for components to register additional routes to a request router
type RouterFunc func(ctx context.Context, nodeConfig *config.Node) (*mux.Router, error)
var once sync.Once
var router *mux.Router
var err error
// Start returns a router with authn/authz filters applied.
// The first time it is called, the router is created and a new HTTPS listener is started if the handler is nil.
// Subsequent calls will return the same router.
func Start(ctx context.Context, nodeConfig *config.Node, runtime *config.ControlRuntime) (*mux.Router, error) {
once.Do(func() {
router = mux.NewRouter().SkipClean(true)
config := server.Config{}
if runtime == nil {
// If we do not have an existing handler, set up a new listener
tcp, lerr := util.ListenWithLoopback(ctx, nodeConfig.AgentConfig.ListenAddress, strconv.Itoa(nodeConfig.ServerHTTPSPort))
if lerr != nil {
err = lerr
return
}
serving := options.NewSecureServingOptions()
serving.Listener = tcp
serving.CipherSuites = nodeConfig.AgentConfig.CipherSuites
serving.MinTLSVersion = nodeConfig.AgentConfig.MinTLSVersion
serving.ServerCert = options.GeneratableKeyCert{
CertKey: options.CertKey{
CertFile: nodeConfig.AgentConfig.ServingKubeletCert,
KeyFile: nodeConfig.AgentConfig.ServingKubeletKey,
},
}
if aerr := serving.ApplyTo(&config.SecureServing); aerr != nil {
err = aerr
return
}
} else {
// If we have an existing handler, wrap it
router.NotFoundHandler = runtime.Handler
runtime.Handler = router
}
authn := options.NewDelegatingAuthenticationOptions()
authn.DisableAnonymous = true
authn.SkipInClusterLookup = true
authn.ClientCert = options.ClientCertAuthenticationOptions{
ClientCA: nodeConfig.AgentConfig.ClientCA,
}
authn.RemoteKubeConfigFile = nodeConfig.AgentConfig.KubeConfigKubelet
if applyErr := authn.ApplyTo(&config.Authentication, config.SecureServing, nil); applyErr != nil {
err = applyErr
return
}
authz := options.NewDelegatingAuthorizationOptions()
authz.AlwaysAllowPaths = []string{"/v2", "/debug/pprof", "/v1-" + version.Program + "/p2p"}
authz.RemoteKubeConfigFile = nodeConfig.AgentConfig.KubeConfigKubelet
if applyErr := authz.ApplyTo(&config.Authorization); applyErr != nil {
err = applyErr
return
}
router.Use(filterChain(config.Authentication.Authenticator, config.Authorization.Authorizer))
if config.SecureServing != nil {
_, _, err = config.SecureServing.Serve(router, 0, ctx.Done())
}
})
return router, err
}
// filterChain runs the kubernetes authn/authz filter chain using the mux middleware API
func filterChain(authn authenticator.Request, authz authorizer.Authorizer) mux.MiddlewareFunc {
return func(handler http.Handler) http.Handler {
requestInfoResolver := &apirequest.RequestInfoFactory{}
failedHandler := genericapifilters.Unauthorized(scheme.Codecs)
handler = genericapifilters.WithAuthorization(handler, authz, scheme.Codecs)
handler = genericapifilters.WithAuthentication(handler, authn, failedHandler, nil, nil)
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
handler = genericapifilters.WithCacheControl(handler)
return handler
}
}

View File

@ -19,25 +19,25 @@ import (
"github.com/cloudnativelabs/kube-router/v2/pkg/controllers/netpol"
"github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck"
"github.com/cloudnativelabs/kube-router/v2/pkg/metrics"
krmetrics "github.com/cloudnativelabs/kube-router/v2/pkg/metrics"
"github.com/cloudnativelabs/kube-router/v2/pkg/options"
"github.com/cloudnativelabs/kube-router/v2/pkg/utils"
"github.com/cloudnativelabs/kube-router/v2/pkg/version"
"github.com/coreos/go-iptables/iptables"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/metrics"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1core "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/component-base/metrics/legacyregistry"
)
func init() {
// ensure that kube-router exposes metrics through the same registry used by Kubernetes components
metrics.DefaultRegisterer = legacyregistry.Registerer()
metrics.DefaultGatherer = legacyregistry.DefaultGatherer
krmetrics.DefaultRegisterer = metrics.DefaultRegisterer
krmetrics.DefaultGatherer = metrics.DefaultGatherer
}
// Run creates and starts a new instance of the kube-router network policy controller
@ -156,7 +156,7 @@ func Run(ctx context.Context, nodeConfig *config.Node) error {
}
// Start kube-router metrics controller to avoid complaints about metrics heartbeat missing
mc, err := metrics.NewMetricsController(krConfig)
mc, err := krmetrics.NewMetricsController(krConfig)
if err != nil {
return nil
}
@ -188,13 +188,13 @@ func Run(ctx context.Context, nodeConfig *config.Node) error {
}
// metricsRunCheck is a stub version of mc.Run() that doesn't start up a dedicated http server.
func metricsRunCheck(mc *metrics.Controller, healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) {
func metricsRunCheck(mc *krmetrics.Controller, healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) {
t := time.NewTicker(3 * time.Second)
defer wg.Done()
// register metrics for this controller
metrics.BuildInfo.WithLabelValues(runtime.Version(), version.Version).Set(1)
metrics.DefaultRegisterer.MustRegister(metrics.BuildInfo)
krmetrics.BuildInfo.WithLabelValues(runtime.Version(), version.Version).Set(1)
krmetrics.DefaultRegisterer.MustRegister(krmetrics.BuildInfo)
for {
healthcheck.SendHeartBeat(healthChan, "MC")

View File

@ -27,7 +27,9 @@ import (
"github.com/k3s-io/k3s/pkg/daemons/agent"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/metrics"
"github.com/k3s-io/k3s/pkg/nodeconfig"
"github.com/k3s-io/k3s/pkg/profile"
"github.com/k3s-io/k3s/pkg/rootless"
"github.com/k3s-io/k3s/pkg/spegel"
"github.com/k3s-io/k3s/pkg/util"
@ -113,6 +115,18 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
}
}
if nodeConfig.SupervisorMetrics {
if err := metrics.DefaultMetrics.Start(ctx, nodeConfig); err != nil {
return errors.Wrap(err, "failed to serve metrics")
}
}
if nodeConfig.EnablePProf {
if err := profile.DefaultProfiler.Start(ctx, nodeConfig); err != nil {
return errors.Wrap(err, "failed to serve pprof")
}
}
if err := setupCriCtlConfig(cfg, nodeConfig); err != nil {
return err
}

View File

@ -11,6 +11,7 @@ import (
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/control/deps"
"github.com/k3s-io/k3s/pkg/metrics"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/util/services"
"github.com/k3s-io/k3s/pkg/version"
@ -22,18 +23,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/component-base/metrics/legacyregistry"
)
var (
// DefaultRegisterer and DefaultGatherer are the implementations of the
// prometheus Registerer and Gatherer interfaces that all metrics operations
// will use. They are variables so that packages that embed this library can
// replace them at runtime, instead of having to pass around specific
// registries.
DefaultRegisterer = legacyregistry.Registerer()
DefaultGatherer = legacyregistry.DefaultGatherer
// Check certificates twice an hour. Kubernetes events have a TTL of 1 hour by default,
// so similar events should be aggregated and refreshed by the event recorder as long
// as they are created within the TTL period.
@ -50,7 +42,7 @@ var (
// Setup starts the certificate expiration monitor
func Setup(ctx context.Context, nodeConfig *daemonconfig.Node, dataDir string) error {
logrus.Debugf("Starting %s with monitoring period %s", controllerName, certCheckInterval)
DefaultRegisterer.MustRegister(certificateExpirationSeconds)
metrics.DefaultRegisterer.MustRegister(certificateExpirationSeconds)
client, err := util.GetClientSet(nodeConfig.AgentConfig.KubeConfigKubelet)
if err != nil {

View File

@ -1,20 +1,22 @@
package agent
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net/http"
"os"
"path/filepath"
"runtime"
"github.com/gorilla/mux"
"github.com/k3s-io/k3s/pkg/agent"
"github.com/k3s-io/k3s/pkg/authenticator"
"github.com/k3s-io/k3s/pkg/agent/https"
"github.com/k3s-io/k3s/pkg/cli/cmds"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/datadir"
k3smetrics "github.com/k3s-io/k3s/pkg/metrics"
"github.com/k3s-io/k3s/pkg/proctitle"
"github.com/k3s-io/k3s/pkg/profile"
"github.com/k3s-io/k3s/pkg/spegel"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
@ -22,7 +24,6 @@ import (
"github.com/rancher/wrangler/v3/pkg/signals"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
apiauth "k8s.io/apiserver/pkg/authentication/authenticator"
)
func Run(ctx *cli.Context) error {
@ -108,33 +109,22 @@ func Run(ctx *cli.Context) error {
// Until the agent is run and retrieves config from the server, we won't know
// if the embedded registry is enabled. If it is not enabled, these are not
// used as the registry is never started.
conf := spegel.DefaultRegistry
conf.Bootstrapper = spegel.NewAgentBootstrapper(cfg.ServerURL, cfg.Token, cfg.DataDir)
conf.HandlerFunc = func(conf *spegel.Config, router *mux.Router) error {
// Create and bind a new authenticator using the configured client CA
authArgs := []string{"--client-ca-file=" + conf.ClientCAFile}
auth, err := authenticator.FromArgs(authArgs)
if err != nil {
return err
}
conf.AuthFunc = func() apiauth.Request {
return auth
}
registry := spegel.DefaultRegistry
registry.Bootstrapper = spegel.NewAgentBootstrapper(cfg.ServerURL, cfg.Token, cfg.DataDir)
registry.Router = func(ctx context.Context, nodeConfig *config.Node) (*mux.Router, error) {
return https.Start(ctx, nodeConfig, nil)
}
// Create a new server and listen on the configured port
server := &http.Server{
Handler: router,
Addr: ":" + conf.RegistryPort,
TLSConfig: &tls.Config{
ClientAuth: tls.RequestClientCert,
},
}
go func() {
if err := server.ListenAndServeTLS(conf.ServerCertFile, conf.ServerKeyFile); err != nil && !errors.Is(err, http.ErrServerClosed) {
logrus.Fatalf("registry server failed: %v", err)
}
}()
return nil
// same deal for metrics - these are not used if the extra metrics listener is not enabled.
metrics := k3smetrics.DefaultMetrics
metrics.Router = func(ctx context.Context, nodeConfig *config.Node) (*mux.Router, error) {
return https.Start(ctx, nodeConfig, nil)
}
// and for pprof as well
pprof := profile.DefaultProfiler
pprof.Router = func(ctx context.Context, nodeConfig *config.Node) (*mux.Router, error) {
return https.Start(ctx, nodeConfig, nil)
}
return agent.Run(contextCtx, cfg)

View File

@ -20,6 +20,7 @@ type Agent struct {
LBServerPort int
ResolvConf string
DataDir string
BindAddress string
NodeIP cli.StringSlice
NodeExternalIP cli.StringSlice
NodeName string
@ -36,6 +37,7 @@ type Agent struct {
VPNAuth string
VPNAuthFile string
Debug bool
EnablePProf bool
Rootless bool
RootlessAlreadyUnshared bool
WithNodeID bool
@ -226,6 +228,16 @@ var (
Usage: "(agent/containerd) Disables containerd's fallback default registry endpoint when a mirror is configured for that registry",
Destination: &AgentConfig.ContainerdNoDefault,
}
EnablePProfFlag = &cli.BoolFlag{
Name: "enable-pprof",
Usage: "(experimental) Enable pprof endpoint on supervisor port",
Destination: &AgentConfig.EnablePProf,
}
BindAddressFlag = &cli.StringFlag{
Name: "bind-address",
Usage: "(listener) " + version.Program + " bind address (default: 0.0.0.0)",
Destination: &AgentConfig.BindAddress,
}
)
func NewAgentCommand(action func(ctx *cli.Context) error) cli.Command {
@ -278,6 +290,7 @@ func NewAgentCommand(action func(ctx *cli.Context) error) cli.Command {
DisableDefaultRegistryEndpointFlag,
AirgapExtraRegistryFlag,
NodeIPFlag,
BindAddressFlag,
NodeExternalIPFlag,
ResolvConfFlag,
FlannelIfaceFlag,
@ -286,6 +299,7 @@ func NewAgentCommand(action func(ctx *cli.Context) error) cli.Command {
ExtraKubeletArgs,
ExtraKubeProxyArgs,
// Experimental flags
EnablePProfFlag,
&cli.BoolFlag{
Name: "rootless",
Usage: "(experimental) Run rootless",

View File

@ -48,8 +48,6 @@ type Server struct {
HelmJobImage string
TLSSan cli.StringSlice
TLSSanSecurity bool
BindAddress string
EnablePProf bool
ExtraAPIArgs cli.StringSlice
ExtraEtcdArgs cli.StringSlice
ExtraSchedulerArgs cli.StringSlice
@ -87,6 +85,7 @@ type Server struct {
EncryptSkip bool
SystemDefaultRegistry string
StartupHooks []StartupHook
SupervisorMetrics bool
EtcdSnapshotName string
EtcdDisableSnapshots bool
EtcdExposeMetrics bool
@ -178,11 +177,7 @@ var ServerFlags = []cli.Flag{
VModule,
LogFile,
AlsoLogToStderr,
&cli.StringFlag{
Name: "bind-address",
Usage: "(listener) " + version.Program + " bind address (default: 0.0.0.0)",
Destination: &ServerConfig.BindAddress,
},
BindAddressFlag,
&cli.IntFlag{
Name: "https-listen-port",
Usage: "(listener) HTTPS listen port",
@ -493,9 +488,14 @@ var ServerFlags = []cli.Flag{
},
&cli.BoolFlag{
Name: "embedded-registry",
Usage: "(experimental/components) Enable embedded distributed container registry; requires use of embedded containerd",
Usage: "(experimental/components) Enable embedded distributed container registry; requires use of embedded containerd; when enabled agents will also listen on the supervisor port",
Destination: &ServerConfig.EmbeddedRegistry,
},
&cli.BoolFlag{
Name: "supervisor-metrics",
Usage: "(experimental/components) Enable serving " + version.Program + " internal metrics on the supervisor port; when enabled agents will also listen on the supervisor port",
Destination: &ServerConfig.SupervisorMetrics,
},
NodeNameFlag,
WithNodeIDFlag,
NodeLabels,
@ -534,11 +534,7 @@ var ServerFlags = []cli.Flag{
Destination: &ServerConfig.EncryptSecrets,
},
// Experimental flags
&cli.BoolFlag{
Name: "enable-pprof",
Usage: "(experimental) Enable pprof endpoint on supervisor port",
Destination: &ServerConfig.EnablePProf,
},
EnablePProfFlag,
&cli.BoolFlag{
Name: "rootless",
Usage: "(experimental) Run rootless",

View File

@ -12,13 +12,16 @@ import (
systemd "github.com/coreos/go-systemd/v22/daemon"
"github.com/gorilla/mux"
"github.com/k3s-io/k3s/pkg/agent"
"github.com/k3s-io/k3s/pkg/agent/https"
"github.com/k3s-io/k3s/pkg/agent/loadbalancer"
"github.com/k3s-io/k3s/pkg/cli/cmds"
"github.com/k3s-io/k3s/pkg/clientaccess"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/datadir"
"github.com/k3s-io/k3s/pkg/etcd"
k3smetrics "github.com/k3s-io/k3s/pkg/metrics"
"github.com/k3s-io/k3s/pkg/proctitle"
"github.com/k3s-io/k3s/pkg/profile"
"github.com/k3s-io/k3s/pkg/rootless"
"github.com/k3s-io/k3s/pkg/server"
"github.com/k3s-io/k3s/pkg/spegel"
@ -30,7 +33,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/authentication/authenticator"
kubeapiserverflag "k8s.io/component-base/cli/flag"
"k8s.io/kubernetes/pkg/controlplane/apiserver/options"
utilsnet "k8s.io/utils/net"
@ -136,12 +138,11 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
serverConfig.ControlConfig.ServiceLBNamespace = cfg.ServiceLBNamespace
serverConfig.ControlConfig.SANs = util.SplitStringSlice(cfg.TLSSan)
serverConfig.ControlConfig.SANSecurity = cfg.TLSSanSecurity
serverConfig.ControlConfig.BindAddress = cfg.BindAddress
serverConfig.ControlConfig.BindAddress = cmds.AgentConfig.BindAddress
serverConfig.ControlConfig.SupervisorPort = cfg.SupervisorPort
serverConfig.ControlConfig.HTTPSPort = cfg.HTTPSPort
serverConfig.ControlConfig.APIServerPort = cfg.APIServerPort
serverConfig.ControlConfig.APIServerBindAddress = cfg.APIServerBindAddress
serverConfig.ControlConfig.EnablePProf = cfg.EnablePProf
serverConfig.ControlConfig.ExtraAPIArgs = cfg.ExtraAPIArgs
serverConfig.ControlConfig.ExtraControllerArgs = cfg.ExtraControllerArgs
serverConfig.ControlConfig.ExtraEtcdArgs = cfg.ExtraEtcdArgs
@ -174,6 +175,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
serverConfig.ControlConfig.EncryptSecrets = cfg.EncryptSecrets
serverConfig.ControlConfig.EtcdExposeMetrics = cfg.EtcdExposeMetrics
serverConfig.ControlConfig.EtcdDisableSnapshots = cfg.EtcdDisableSnapshots
serverConfig.ControlConfig.SupervisorMetrics = cfg.SupervisorMetrics
serverConfig.ControlConfig.VLevel = cmds.LogConfig.VLevel
serverConfig.ControlConfig.VModule = cmds.LogConfig.VModule
@ -406,6 +408,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
}
tlsMinVersionArg := getArgValueFromList("tls-min-version", serverConfig.ControlConfig.ExtraAPIArgs)
serverConfig.ControlConfig.MinTLSVersion = tlsMinVersionArg
serverConfig.ControlConfig.TLSMinVersion, err = kubeapiserverflag.TLSVersion(tlsMinVersionArg)
if err != nil {
return errors.Wrap(err, "invalid tls-min-version")
@ -435,6 +438,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
}
serverConfig.ControlConfig.ExtraAPIArgs = append(serverConfig.ControlConfig.ExtraAPIArgs, "tls-cipher-suites="+strings.Join(tlsCipherSuites, ","))
}
serverConfig.ControlConfig.CipherSuites = tlsCipherSuites
serverConfig.ControlConfig.TLSCipherSuites, err = kubeapiserverflag.TLSCipherSuites(tlsCipherSuites)
if err != nil {
return errors.Wrap(err, "invalid tls-cipher-suites")
@ -556,28 +560,36 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
go getAPIAddressFromEtcd(ctx, serverConfig, agentConfig)
}
// Until the agent is run and retrieves config from the server, we won't know
// if the embedded registry is enabled. If it is not enabled, these are not
// used as the registry is never started.
registry := spegel.DefaultRegistry
registry.Bootstrapper = spegel.NewChainingBootstrapper(
spegel.NewServerBootstrapper(&serverConfig.ControlConfig),
spegel.NewAgentBootstrapper(cfg.ServerURL, token, agentConfig.DataDir),
spegel.NewSelfBootstrapper(),
)
registry.Router = func(ctx context.Context, nodeConfig *config.Node) (*mux.Router, error) {
return https.Start(ctx, nodeConfig, serverConfig.ControlConfig.Runtime)
}
// same deal for metrics - these are not used if the extra metrics listener is not enabled.
metrics := k3smetrics.DefaultMetrics
metrics.Router = func(ctx context.Context, nodeConfig *config.Node) (*mux.Router, error) {
return https.Start(ctx, nodeConfig, serverConfig.ControlConfig.Runtime)
}
// and for pprof as well
pprof := profile.DefaultProfiler
pprof.Router = func(ctx context.Context, nodeConfig *config.Node) (*mux.Router, error) {
return https.Start(ctx, nodeConfig, serverConfig.ControlConfig.Runtime)
}
if cfg.DisableAgent {
agentConfig.ContainerRuntimeEndpoint = "/dev/null"
return agent.RunStandalone(ctx, agentConfig)
}
if cfg.EmbeddedRegistry {
conf := spegel.DefaultRegistry
conf.Bootstrapper = spegel.NewChainingBootstrapper(
spegel.NewServerBootstrapper(&serverConfig.ControlConfig),
spegel.NewAgentBootstrapper(cfg.ServerURL, token, agentConfig.DataDir),
spegel.NewSelfBootstrapper(),
)
conf.HandlerFunc = func(_ *spegel.Config, router *mux.Router) error {
router.NotFoundHandler = serverConfig.ControlConfig.Runtime.Handler
serverConfig.ControlConfig.Runtime.Handler = router
return nil
}
conf.AuthFunc = func() authenticator.Request {
return serverConfig.ControlConfig.Runtime.Authenticator
}
}
return agent.Run(ctx, agentConfig)
}

View File

@ -4,17 +4,16 @@ import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"net/http/pprof"
"os"
"path/filepath"
"strconv"
"github.com/gorilla/mux"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/rancher/dynamiclistener"
"github.com/rancher/dynamiclistener/factory"
@ -24,7 +23,6 @@ import (
"github.com/rancher/wrangler/v3/pkg/generated/controllers/core"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilsnet "k8s.io/utils/net"
)
// newListener returns a new TCP listener and HTTP request handler using dynamiclistener.
@ -43,11 +41,7 @@ func (c *Cluster) newListener(ctx context.Context) (net.Listener, http.Handler,
os.Remove(filepath.Join(c.config.DataDir, "tls/dynamic-cert.json"))
}
}
ip := c.config.BindAddress
if utilsnet.IsIPv6String(ip) {
ip = fmt.Sprintf("[%s]", ip)
}
tcp, err := dynamiclistener.NewTCPListener(ip, c.config.SupervisorPort)
tcp, err := util.ListenWithLoopback(ctx, c.config.BindAddress, strconv.Itoa(c.config.SupervisorPort))
if err != nil {
return nil, nil, err
}
@ -114,17 +108,6 @@ func (c *Cluster) initClusterAndHTTPS(ctx context.Context) error {
return err
}
if c.config.EnablePProf {
mux := mux.NewRouter().SkipClean(true)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
mux.PathPrefix("/debug/pprof/").HandlerFunc(pprof.Index)
mux.NotFoundHandler = handler
handler = mux
}
// Create a HTTP server with the registered request handlers, using logrus for logging
server := http.Server{
Handler: handler,

View File

@ -41,6 +41,8 @@ type Node struct {
ImageServiceEndpoint string
NoFlannel bool
SELinux bool
EnablePProf bool
SupervisorMetrics bool
EmbeddedRegistry bool
FlannelBackend string
FlannelConfFile string
@ -128,6 +130,8 @@ type Agent struct {
AirgapExtraRegistry []string
DisableCCM bool
DisableNPC bool
MinTLSVersion string
CipherSuites []string
Rootless bool
ProtectKernelDefaults bool
DisableServiceLB bool
@ -159,6 +163,7 @@ type CriticalControlArgs struct {
EgressSelectorMode string `cli:"egress-selector-mode"`
ServiceIPRange *net.IPNet `cli:"service-cidr"`
ServiceIPRanges []*net.IPNet `cli:"service-cidr"`
SupervisorMetrics bool `cli:"supervisor-metrics"`
}
type Control struct {
@ -191,7 +196,6 @@ type Control struct {
DisableServiceLB bool
Rootless bool
ServiceLBNamespace string
EnablePProf bool
ExtraAPIArgs []string
ExtraControllerArgs []string
ExtraCloudControllerArgs []string
@ -208,8 +212,10 @@ type Control struct {
ClusterResetRestorePath string
EncryptForce bool
EncryptSkip bool
TLSMinVersion uint16
TLSCipherSuites []uint16
MinTLSVersion string
CipherSuites []string
TLSMinVersion uint16 `json:"-"`
TLSCipherSuites []uint16 `json:"-"`
EtcdSnapshotName string `json:"-"`
EtcdDisableSnapshots bool `json:"-"`
EtcdExposeMetrics bool `json:"-"`

45
pkg/metrics/metrics.go Normal file
View File

@ -0,0 +1,45 @@
package metrics
import (
"context"
"errors"
"github.com/gorilla/mux"
"github.com/k3s-io/k3s/pkg/agent/https"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/component-base/metrics/legacyregistry"
)
// DefaultRegisterer is the implementation of the
// prometheus Registerer interface that all metrics operations
// will use.
var DefaultRegisterer = legacyregistry.Registerer()
// DefaultGatherer is the implementation of the
// prometheus Gatherere interface that all metrics operations
// will use.
var DefaultGatherer = legacyregistry.DefaultGatherer
// DefaultMetrics is the default instance of a Metrics server
var DefaultMetrics = &Config{
Router: func(context.Context, *config.Node) (*mux.Router, error) {
return nil, errors.New("not implemented")
},
}
// Config holds fields for the metrics listener
type Config struct {
// Router will be called to add the metrics API handler to an existing router.
Router https.RouterFunc
}
// Start starts binds the metrics API to an existing HTTP router.
func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error {
mRouter, err := c.Router(ctx, nodeConfig)
if err != nil {
return err
}
mRouter.Handle("/metrics", promhttp.HandlerFor(DefaultGatherer, promhttp.HandlerOpts{}))
return nil
}

38
pkg/profile/profile.go Normal file
View File

@ -0,0 +1,38 @@
package profile
import (
"context"
"errors"
"net/http/pprof"
"github.com/gorilla/mux"
"github.com/k3s-io/k3s/pkg/agent/https"
"github.com/k3s-io/k3s/pkg/daemons/config"
)
// DefaultProfiler the default instance of a performance profiling server
var DefaultProfiler = &Config{
Router: func(context.Context, *config.Node) (*mux.Router, error) {
return nil, errors.New("not implemented")
},
}
// Config holds fields for the pprof listener
type Config struct {
// Router will be called to add the pprof API handler to an existing router.
Router https.RouterFunc
}
// Start starts binds the pprof API to an existing HTTP router.
func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error {
mRouter, err := c.Router(ctx, nodeConfig)
if err != nil {
return err
}
mRouter.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mRouter.HandleFunc("/debug/pprof/profile", pprof.Profile)
mRouter.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mRouter.HandleFunc("/debug/pprof/trace", pprof.Trace)
mRouter.PathPrefix("/debug/pprof/").HandlerFunc(pprof.Index)
return nil
}

View File

@ -13,13 +13,12 @@ import (
"time"
"github.com/containerd/containerd/remotes/docker"
"github.com/k3s-io/k3s/pkg/agent/https"
"github.com/k3s-io/k3s/pkg/clientaccess"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/version"
"github.com/rancher/dynamiclistener/cert"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/request/union"
"k8s.io/utils/ptr"
"github.com/go-logr/logr"
@ -43,11 +42,8 @@ import (
// DefaultRegistry is the default instance of a Spegel distributed registry
var DefaultRegistry = &Config{
Bootstrapper: NewSelfBootstrapper(),
HandlerFunc: func(_ *Config, _ *mux.Router) error {
return errors.New("not implemented")
},
AuthFunc: func() authenticator.Request {
return union.New(nil)
Router: func(context.Context, *config.Node) (*mux.Router, error) {
return nil, errors.New("not implemented")
},
}
@ -60,9 +56,6 @@ var (
resolveLatestTag = false
)
type authFunc func() authenticator.Request
type handlerFunc func(config *Config, router *mux.Router) error
// Config holds fields for a distributed registry
type Config struct {
ClientCAFile string
@ -89,10 +82,7 @@ type Config struct {
Bootstrapper routing.Bootstrapper
// HandlerFunc will be called to add the registry API handler to an existing router.
HandlerFunc handlerFunc
// Authenticator will be called to retrieve an authenticator used to validate the request to the registry API.
AuthFunc authFunc
Router https.RouterFunc
}
// These values are not currently configurable
@ -235,13 +225,12 @@ func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error {
// Track images available in containerd and publish via p2p router
go state.Track(ctx, ociClient, router, resolveLatestTag)
mRouter := mux.NewRouter().SkipClean(true)
mRouter.Use(c.authMiddleware())
mRouter.PathPrefix("/v2").Handler(regSvr.Handler)
mRouter.PathPrefix("/v1-" + version.Program + "/p2p").Handler(c.peerInfo())
if err := c.HandlerFunc(c, mRouter); err != nil {
mRouter, err := c.Router(ctx, nodeConfig)
if err != nil {
return err
}
mRouter.PathPrefix("/v2").Handler(regSvr.Handler)
mRouter.PathPrefix("/v1-" + version.Program + "/p2p").Handler(c.peerInfo())
// Wait up to 5 seconds for the p2p network to find peers. This will return
// immediately if the node is bootstrapping from itself.
@ -267,16 +256,3 @@ func (c *Config) peerInfo() http.HandlerFunc {
fmt.Fprintf(resp, "%s/p2p/%s", info.Addrs[0].String(), info.ID.String())
})
}
// authMiddleware calls the configured authenticator to gate access to the registry API
func (c *Config) authMiddleware() mux.MiddlewareFunc {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
if _, ok, err := c.AuthFunc().AuthenticateRequest(req); !ok || err != nil {
http.Error(resp, "Unauthorized", http.StatusUnauthorized)
return
}
next.ServeHTTP(resp, req)
})
}
}

View File

@ -1,12 +1,15 @@
package util
import (
"context"
"errors"
"fmt"
"net"
"os"
"strings"
"time"
"github.com/rancher/wrangler/v3/pkg/merr"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
apinet "k8s.io/apimachinery/pkg/util/net"
@ -319,3 +322,111 @@ func getIPFromInterface(ifaceName string) (string, error) {
return "", fmt.Errorf("can't find ip for interface %s", ifaceName)
}
type multiListener struct {
listeners []net.Listener
closing chan struct{}
conns chan acceptRes
}
type acceptRes struct {
conn net.Conn
err error
}
// explicit interface check
var _ net.Listener = &multiListener{}
var loopbacks = []string{"127.0.0.1", "::1"}
// ListenWithLoopback listens on the given address, as well as on IPv4 and IPv6 loopback addresses.
// If the address is a wildcard, the listener is return unwrapped.
func ListenWithLoopback(ctx context.Context, addr string, port string) (net.Listener, error) {
lc := &net.ListenConfig{
KeepAlive: 3 * time.Minute,
Control: permitReuse,
}
l, err := lc.Listen(ctx, "tcp", net.JoinHostPort(addr, port))
if err != nil {
return nil, err
}
// If we're listening on a wildcard address, we don't need to wrap with the other loopback addresses
switch addr {
case "", "::", "0.0.0.0":
return l, nil
}
ml := &multiListener{
listeners: []net.Listener{l},
closing: make(chan struct{}),
conns: make(chan acceptRes),
}
for _, laddr := range loopbacks {
if laddr == addr {
continue
}
if l, err := lc.Listen(ctx, "tcp", net.JoinHostPort(laddr, port)); err == nil {
ml.listeners = append(ml.listeners, l)
} else {
logrus.Debugf("Failed to listen on %s: %v", net.JoinHostPort(laddr, port), err)
}
}
for i := range ml.listeners {
go ml.accept(ml.listeners[i])
}
return ml, nil
}
// Addr returns the address of the non-loopback address that this multiListener is listening on
func (ml *multiListener) Addr() net.Addr {
return ml.listeners[0].Addr()
}
// Close closes all the listeners
func (ml *multiListener) Close() error {
close(ml.closing)
var errs merr.Errors
for i := range ml.listeners {
err := ml.listeners[i].Close()
if err != nil {
errs = append(errs, err)
}
}
return merr.NewErrors(errs)
}
// Accept returns a Conn/err pair from one of the waiting listeners
func (ml *multiListener) Accept() (net.Conn, error) {
select {
case res, ok := <-ml.conns:
if ok {
return res.conn, res.err
}
return nil, fmt.Errorf("connection channel closed")
case <-ml.closing:
return nil, fmt.Errorf("listener closed")
}
}
// accept runs a loop, accepting connections and trying to send on the result channel
func (ml *multiListener) accept(listener net.Listener) {
for {
conn, err := listener.Accept()
r := acceptRes{
conn: conn,
err: err,
}
select {
case ml.conns <- r:
case <-ml.closing:
if r.err == nil {
r.conn.Close()
}
return
}
}
}

18
pkg/util/net_unix.go Normal file
View File

@ -0,0 +1,18 @@
//go:build !windows
// +build !windows
package util
import (
"syscall"
"golang.org/x/sys/unix"
)
// permitReuse enables port and address sharing on the socket
func permitReuse(network, addr string, conn syscall.RawConn) error {
return conn.Control(func(fd uintptr) {
syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEPORT, 1)
syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEADDR, 1)
})
}

11
pkg/util/net_windows.go Normal file
View File

@ -0,0 +1,11 @@
//go:build windows
// +build windows
package util
import "syscall"
// permitReuse is a no-op; port and address reuse is not supported on Windows
func permitReuse(network, addr string, conn syscall.RawConn) error {
return nil
}