diff --git a/go.mod b/go.mod index 03e1bd5f8f..4e09f26f43 100644 --- a/go.mod +++ b/go.mod @@ -115,6 +115,7 @@ require ( github.com/tchap/go-patricia v2.3.0+incompatible // indirect github.com/urfave/cli v1.22.4 github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 + github.com/yl2chen/cidranger v1.0.2 go.etcd.io/etcd/api/v3 v3.5.4 go.etcd.io/etcd/client/pkg/v3 v3.5.4 go.etcd.io/etcd/client/v3 v3.5.4 diff --git a/go.sum b/go.sum index 374b4423d3..0fb68cd624 100644 --- a/go.sum +++ b/go.sum @@ -1262,6 +1262,8 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca h1:1CFlNzQhALwjS9mBAUkycX616GzgsuYUOCHA5+HSlXI= github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +github.com/yl2chen/cidranger v1.0.2 h1:lbOWZVCG1tCRX4u24kuM1Tb4nHqWkDxwLdoS+SevawU= +github.com/yl2chen/cidranger v1.0.2/go.mod h1:9U1yz7WPYDwf0vpNWFaeRh0bjwz5RVgRy/9UEQfHl0g= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/pkg/agent/run.go b/pkg/agent/run.go index f26da55ffa..8abfb24581 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -319,15 +319,21 @@ func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes t updateNode = true } + if changed, err := nodeconfig.SetNodeConfigLabels(node); err != nil { + return false, err + } else if changed { + updateNode = true + } + if updateNode { if _, err := nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil { - logrus.Infof("Failed to update node %s: %v", agentConfig.NodeName, err) + logrus.Infof("Failed to set annotations and labels on node %s: %v", agentConfig.NodeName, err) return false, nil } - logrus.Infof("labels have been set successfully on node: %s", agentConfig.NodeName) + logrus.Infof("Annotations and labels have been set successfully on node: %s", agentConfig.NodeName) return true, nil } - logrus.Infof("labels have already set on node: %s", agentConfig.NodeName) + logrus.Infof("Annotations and labels have already set on node: %s", agentConfig.NodeName) return true, nil } diff --git a/pkg/agent/tunnel/tunnel.go b/pkg/agent/tunnel/tunnel.go index a6189f8910..c8a7e76345 100644 --- a/pkg/agent/tunnel/tunnel.go +++ b/pkg/agent/tunnel/tunnel.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "fmt" "net" + "os" "reflect" "sync" @@ -16,6 +17,7 @@ import ( "github.com/k3s-io/k3s/pkg/version" "github.com/rancher/remotedialer" "github.com/sirupsen/logrus" + "github.com/yl2chen/cidranger" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -56,6 +58,11 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error { return err } + tunnel := &agentTunnel{ + client: client, + cidrs: cidranger.NewPCTrieRanger(), + } + // The loadbalancer is only disabled when there is a local apiserver. Servers without a local // apiserver load-balance to themselves initially, then switch over to an apiserver node as soon // as we get some addresses from the code below. @@ -83,7 +90,7 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error { wg := &sync.WaitGroup{} for _, address := range proxy.SupervisorAddresses() { if _, ok := disconnect[address]; !ok { - disconnect[address] = connect(ctx, wg, address, tlsConfig) + disconnect[address] = tunnel.connect(ctx, wg, address, tlsConfig) } } @@ -136,7 +143,7 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error { for _, address := range proxy.SupervisorAddresses() { validEndpoint[address] = true if _, ok := disconnect[address]; !ok { - disconnect[address] = connect(ctx, nil, address, tlsConfig) + disconnect[address] = tunnel.connect(ctx, nil, address, tlsConfig) } } @@ -167,7 +174,49 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error { return nil } -func connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, tlsConfig *tls.Config) context.CancelFunc { +type agentTunnel struct { + client kubernetes.Interface + cidrs cidranger.Ranger +} + +// authorized determines whether or not a dial request is authorized. +// Connections to the local kubelet ports are allowed. +// Connections to other IPs are allowed if they are contained in a CIDR managed by this node. +// All other requests are rejected. +func (a *agentTunnel) authorized(ctx context.Context, proto, address string) bool { + logrus.Debugf("Tunnel authorizer checking dial request for %s", address) + host, port, err := net.SplitHostPort(address) + if err == nil { + if proto == "tcp" && ports[port] && (host == "127.0.0.1" || host == "::1") { + return true + } + if ip := net.ParseIP(host); ip != nil { + // lazy populate the cidrs from the node object + if a.cidrs.Len() == 0 { + logrus.Debugf("Tunnel authorizer getting Pod CIDRs for %s", os.Getenv("NODE_NAME")) + node, err := a.client.CoreV1().Nodes().Get(ctx, os.Getenv("NODE_NAME"), metav1.GetOptions{}) + if err != nil { + logrus.Warnf("Tunnel authorizer failed to get Pod CIDRs: %v", err) + return false + } + for _, cidr := range node.Spec.PodCIDRs { + if _, n, err := net.ParseCIDR(cidr); err == nil { + logrus.Infof("Tunnel authorizer added Pod CIDR %s", cidr) + a.cidrs.Insert(cidranger.NewBasicRangerEntry(*n)) + } + } + } + if nets, err := a.cidrs.ContainingNetworks(ip); err == nil && len(nets) > 0 { + return true + } + } + } + return false +} + +// connect initiates a connection to the remotedialer server. Incoming dial requests from +// the server will be checked by the authorizer function prior to being fulfilled. +func (a *agentTunnel) connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, tlsConfig *tls.Config) context.CancelFunc { wsURL := fmt.Sprintf("wss://%s/v1-"+version.Program+"/connect", address) ws := &websocket.Dialer{ TLSClientConfig: tlsConfig, @@ -183,8 +232,7 @@ func connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, go func() { for { remotedialer.ClientConnect(ctx, wsURL, nil, ws, func(proto, address string) bool { - host, port, err := net.SplitHostPort(address) - return err == nil && proto == "tcp" && ports[port] && (host == "127.0.0.1" || host == "::1") + return a.authorized(rootCtx, proto, address) }, func(_ context.Context) error { if waitGroup != nil { once.Do(waitGroup.Done) diff --git a/pkg/cluster/https.go b/pkg/cluster/https.go index 976db7adf2..8c307faa65 100644 --- a/pkg/cluster/https.go +++ b/pkg/cluster/https.go @@ -50,7 +50,7 @@ func (c *Cluster) newListener(ctx context.Context) (net.Listener, http.Handler, return nil, nil, err } storage := tlsStorage(ctx, c.config.DataDir, c.config.Runtime) - return dynamiclistener.NewListener(tcp, storage, cert, key, dynamiclistener.Config{ + return wrapHandler(dynamiclistener.NewListener(tcp, storage, cert, key, dynamiclistener.Config{ ExpirationDaysCheck: config.CertificateRenewDays, Organization: []string{version.Program}, SANs: append(c.config.SANs, "kubernetes", "kubernetes.default", "kubernetes.default.svc", "kubernetes.default.svc."+c.config.ClusterDomain), @@ -70,7 +70,7 @@ func (c *Cluster) newListener(ctx context.Context) (net.Listener, http.Handler, } return false }, - }) + })) } // initClusterAndHTTPS sets up the dynamic tls listener, request router, @@ -131,3 +131,18 @@ func tlsStorage(ctx context.Context, dataDir string, runtime *config.ControlRunt return runtime.Core }, metav1.NamespaceSystem, version.Program+"-serving", cache) } + +// wrapHandler wraps the dynamiclistener request handler, adding a User-Agent value to +// CONNECT requests that will prevent DynamicListener from adding the request's Host +// header to the SAN list. CONNECT requests set the Host header to the target of the +// proxy connection, so it is not correct to add this value to the certificate. It would +// be nice if we could do this with with the FilterCN callback, but unfortunately that +// callback does not offer access to the request that triggered the change. +func wrapHandler(listener net.Listener, handler http.Handler, err error) (net.Listener, http.Handler, error) { + return listener, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodConnect { + r.Header.Add("User-Agent", "mozilla") + } + handler.ServeHTTP(w, r) + }), err +} diff --git a/pkg/daemons/config/types.go b/pkg/daemons/config/types.go index e38d956c20..b1d4917094 100644 --- a/pkg/daemons/config/types.go +++ b/pkg/daemons/config/types.go @@ -276,6 +276,8 @@ type ControlRuntime struct { Tunnel http.Handler Authenticator authenticator.Request + EgressSelectorConfig string + ClientAuthProxyCert string ClientAuthProxyKey string diff --git a/pkg/daemons/control/deps/deps.go b/pkg/daemons/control/deps/deps.go index e48004af03..a9dba1602f 100644 --- a/pkg/daemons/control/deps/deps.go +++ b/pkg/daemons/control/deps/deps.go @@ -22,12 +22,12 @@ import ( "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/passwd" "github.com/k3s-io/k3s/pkg/token" - "github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/version" certutil "github.com/rancher/dynamiclistener/cert" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/apis/apiserver" apiserverconfigv1 "k8s.io/apiserver/pkg/apis/config/v1" "k8s.io/apiserver/pkg/authentication/user" ) @@ -137,6 +137,8 @@ func CreateRuntimeCertFiles(config *config.Control) { runtime.ClientKubeletKey = filepath.Join(config.DataDir, "tls", "client-kubelet.key") runtime.ServingKubeletKey = filepath.Join(config.DataDir, "tls", "serving-kubelet.key") + runtime.EgressSelectorConfig = filepath.Join(config.DataDir, "etc", "egress-selector-config.yaml") + runtime.ClientAuthProxyCert = filepath.Join(config.DataDir, "tls", "client-auth-proxy.crt") runtime.ClientAuthProxyKey = filepath.Join(config.DataDir, "tls", "client-auth-proxy.key") @@ -181,6 +183,10 @@ func GenServerDeps(config *config.Control) error { return err } + if err := genEgressSelectorConfig(config); err != nil { + return err + } + return readTokens(runtime) } @@ -308,12 +314,7 @@ func genClientCerts(config *config.Control) error { var certGen bool - IPv6OnlyService, _ := util.IsIPv6OnlyCIDRs(config.ServiceIPRanges) - ip := "127.0.0.1" - if IPv6OnlyService { - ip = "[::1]" - } - apiEndpoint := fmt.Sprintf("https://%s:%d", ip, config.APIServerPort) + apiEndpoint := fmt.Sprintf("https://%s:%d", config.Loopback(), config.APIServerPort) certGen, err = factory("system:admin", []string{user.SystemPrivilegedGroup}, runtime.ClientAdminCert, runtime.ClientAdminKey) if err != nil { @@ -720,3 +721,42 @@ func genEncryptionConfigAndState(controlConfig *config.Control) error { ann := "start-" + hex.EncodeToString(encryptionConfigHash[:]) return ioutil.WriteFile(controlConfig.Runtime.EncryptionHash, []byte(ann), 0600) } + +func genEgressSelectorConfig(controlConfig *config.Control) error { + connection := apiserver.Connection{ + ProxyProtocol: apiserver.ProtocolHTTPConnect, + Transport: &apiserver.Transport{ + TCP: &apiserver.TCPTransport{ + URL: fmt.Sprintf("https://%s:%d", controlConfig.Loopback(), controlConfig.SupervisorPort), + TLSConfig: &apiserver.TLSConfig{ + CABundle: controlConfig.Runtime.ServerCA, + ClientKey: controlConfig.Runtime.ClientKubeAPIKey, + ClientCert: controlConfig.Runtime.ClientKubeAPICert, + }, + }, + }, + } + + egressConfig := apiserver.EgressSelectorConfiguration{ + TypeMeta: metav1.TypeMeta{ + Kind: "EgressSelectorConfiguration", + APIVersion: "apiserver.k8s.io/v1beta1", + }, + EgressSelections: []apiserver.EgressSelection{ + { + Name: "cluster", + Connection: connection, + }, + { + Name: "controlplane", + Connection: connection, + }, + }, + } + + b, err := json.Marshal(egressConfig) + if err != nil { + return err + } + return ioutil.WriteFile(controlConfig.Runtime.EgressSelectorConfig, b, 0600) +} diff --git a/pkg/daemons/control/proxy/proxy.go b/pkg/daemons/control/proxy/proxy.go new file mode 100644 index 0000000000..426d3e81db --- /dev/null +++ b/pkg/daemons/control/proxy/proxy.go @@ -0,0 +1,57 @@ +package proxy + +import ( + "io" + "net" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +type proxy struct { + lconn, rconn io.ReadWriteCloser + done bool + errc chan error +} + +func Proxy(lconn, rconn net.Conn) error { + p := &proxy{ + lconn: lconn, + rconn: rconn, + errc: make(chan error), + } + + defer p.rconn.Close() + defer p.lconn.Close() + go p.pipe(p.lconn, p.rconn) + go p.pipe(p.rconn, p.lconn) + return <-p.errc +} + +func (p *proxy) err(err error) { + if p.done { + return + } + if !errors.Is(err, io.EOF) { + logrus.Warnf("Proxy error: %v", err) + } + p.done = true + p.errc <- err +} + +func (p *proxy) pipe(src, dst io.ReadWriter) { + buff := make([]byte, 1<<15) + for { + n, err := src.Read(buff) + if err != nil { + p.err(errors.Wrap(err, "read failed")) + return + } + _, err = dst.Write(buff[:n]) + if err != nil { + p.err(errors.Wrap(err, "write failed")) + return + } + } + +} diff --git a/pkg/daemons/control/server.go b/pkg/daemons/control/server.go index 64abf8c95f..fdef10a4c6 100644 --- a/pkg/daemons/control/server.go +++ b/pkg/daemons/control/server.go @@ -46,7 +46,12 @@ func Server(ctx context.Context, cfg *config.Control) error { return errors.Wrap(err, "preparing server") } - cfg.Runtime.Tunnel = setupTunnel() + tunnel, err := setupTunnel(ctx, cfg) + if err != nil { + return errors.Wrap(err, "setup tunnel server") + } + cfg.Runtime.Tunnel = tunnel + proxyutil.DisableProxyHostnameCheck = true authArgs := []string{ @@ -178,6 +183,8 @@ func apiServer(ctx context.Context, cfg *config.Control) error { } else { argsMap["bind-address"] = cfg.APIServerBindAddress } + argsMap["enable-aggregator-routing"] = "true" + argsMap["egress-selector-config-file"] = runtime.EgressSelectorConfig argsMap["tls-cert-file"] = runtime.ServingKubeAPICert argsMap["tls-private-key-file"] = runtime.ServingKubeAPIKey argsMap["service-account-key-file"] = runtime.ServiceKey @@ -253,6 +260,7 @@ func prepare(ctx context.Context, config *config.Control) error { return err } + os.MkdirAll(filepath.Join(config.DataDir, "etc"), 0700) os.MkdirAll(filepath.Join(config.DataDir, "tls"), 0700) os.MkdirAll(filepath.Join(config.DataDir, "cred"), 0700) diff --git a/pkg/daemons/control/tunnel.go b/pkg/daemons/control/tunnel.go index a82d5c1b4b..6456ee54b8 100644 --- a/pkg/daemons/control/tunnel.go +++ b/pkg/daemons/control/tunnel.go @@ -2,45 +2,38 @@ package control import ( "context" + "fmt" "net" "net/http" "strings" "time" + "github.com/k3s-io/k3s/pkg/daemons/config" + "github.com/k3s-io/k3s/pkg/daemons/control/proxy" + "github.com/k3s-io/k3s/pkg/nodeconfig" + "github.com/k3s-io/k3s/pkg/version" "github.com/rancher/remotedialer" - "github.com/rancher/wrangler/pkg/kv" "github.com/sirupsen/logrus" - utilnet "k8s.io/apimachinery/pkg/util/net" + "github.com/yl2chen/cidranger" + v1 "k8s.io/api/core/v1" "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/kubernetes/cmd/kube-apiserver/app" + "k8s.io/client-go/kubernetes" ) func loggingErrorWriter(rw http.ResponseWriter, req *http.Request, code int, err error) { - logrus.Debugf("remoteDialer error: %d %v", code, err) + logrus.Debugf("Tunnel server error: %d %v", code, err) rw.WriteHeader(code) rw.Write([]byte(err.Error())) } -func setupTunnel() http.Handler { - tunnelServer := remotedialer.New(authorizer, loggingErrorWriter) - setupProxyDialer(tunnelServer) - return tunnelServer -} - -func setupProxyDialer(tunnelServer *remotedialer.Server) { - app.DefaultProxyDialerFn = utilnet.DialFunc(func(ctx context.Context, network, address string) (net.Conn, error) { - _, port, _ := net.SplitHostPort(address) - addr := "127.0.0.1" - if port != "" { - addr += ":" + port - } - nodeName, _ := kv.Split(address, ":") - if tunnelServer.HasSession(nodeName) { - return tunnelServer.Dial(nodeName, 15*time.Second, "tcp", addr) - } - var d net.Dialer - return d.DialContext(ctx, network, address) - }) +func setupTunnel(ctx context.Context, cfg *config.Control) (http.Handler, error) { + tunnel := &TunnelServer{ + cidrs: cidranger.NewPCTrieRanger(), + config: cfg, + server: remotedialer.New(authorizer, loggingErrorWriter), + } + go tunnel.watchNodes(ctx) + return tunnel, nil } func authorizer(req *http.Request) (clientKey string, authed bool, err error) { @@ -55,3 +48,185 @@ func authorizer(req *http.Request) (clientKey string, authed bool, err error) { return "", false, nil } + +// explicit interface check +var _ http.Handler = &TunnelServer{} + +type TunnelServer struct { + cidrs cidranger.Ranger + client kubernetes.Interface + config *config.Control + server *remotedialer.Server +} + +// explicit interface check +var _ cidranger.RangerEntry = &nodeAddress{} +var _ cidranger.RangerEntry = &nodeCIDR{} + +type nodeAddress struct { + cidr net.IPNet + node string +} + +func (n *nodeAddress) Network() net.IPNet { + return n.cidr +} + +type nodeCIDR struct { + cidr net.IPNet + clusterEgress bool + node string +} + +func (n *nodeCIDR) Network() net.IPNet { + return n.cidr +} + +// ServeHTTP handles either CONNECT requests, or websocket requests to the remotedialer server +func (t *TunnelServer) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + logrus.Debugf("Tunnel server handing %s %s request for %s from %s", req.Proto, req.Method, req.URL, req.RemoteAddr) + if req.Method == http.MethodConnect { + t.serveConnect(resp, req) + } else { + t.server.ServeHTTP(resp, req) + } +} + +// watchNodes waits for the runtime core to become available, +// and registers an OnChange handler to observe PodCIDR changes. +func (t *TunnelServer) watchNodes(ctx context.Context) { + for { + if t.config.Runtime.Core != nil { + t.config.Runtime.Core.Core().V1().Node().OnChange(ctx, version.Program+"-tunnel-server", t.onChangeNode) + return + } + logrus.Infof("Tunnel server waiting for runtime core to become available") + time.Sleep(5 * time.Second) + } +} + +// onChangeNode updates the node address/CIDR mappings by observing changes to nodes. +func (t *TunnelServer) onChangeNode(nodeName string, node *v1.Node) (*v1.Node, error) { + if node != nil { + logrus.Debugf("Tunnel server updating node %s", nodeName) + _, clusterEgress := node.Labels[nodeconfig.ClusterEgressLabel] + // Add all node IP addresses + for _, addr := range node.Status.Addresses { + if addr.Type == v1.NodeInternalIP || addr.Type == v1.NodeExternalIP { + address := addr.Address + if strings.Contains(address, ":") { + address += "/128" + } else { + address += "/32" + } + if _, n, err := net.ParseCIDR(address); err == nil { + if node.DeletionTimestamp != nil { + t.cidrs.Remove(*n) + } else { + t.cidrs.Insert(&nodeAddress{cidr: *n, node: nodeName}) + } + } + } + } + // Add all node PodCIDRs + for _, cidr := range node.Spec.PodCIDRs { + if _, n, err := net.ParseCIDR(cidr); err == nil { + if node.DeletionTimestamp != nil { + t.cidrs.Remove(*n) + } else { + t.cidrs.Insert(&nodeCIDR{cidr: *n, clusterEgress: clusterEgress, node: nodeName}) + } + } + } + } + return node, nil +} + +// serveConnect attempts to handle the HTTP CONNECT request by dialing +// a connection, either locally or via the remotedialer tunnel. +func (t *TunnelServer) serveConnect(resp http.ResponseWriter, req *http.Request) { + bconn, err := t.dialBackend(req.Host) + if err != nil { + http.Error(resp, fmt.Sprintf("no tunnels available: %v", err), http.StatusInternalServerError) + return + } + + hijacker, ok := resp.(http.Hijacker) + if !ok { + http.Error(resp, "hijacking not supported", http.StatusInternalServerError) + return + } + resp.WriteHeader(http.StatusOK) + + rconn, _, err := hijacker.Hijack() + if err != nil { + http.Error(resp, err.Error(), http.StatusInternalServerError) + return + } + + proxy.Proxy(rconn, bconn) +} + +// dialBackend determines where to route the connection request to, and returns +// a dialed connection if possible. Note that in the case of a remotedialer +// tunnel connection, the agent may return an error if the agent's authorizer +// denies the connection, or if there is some other error in actually dialing +// the requested endpoint. +func (t *TunnelServer) dialBackend(addr string) (net.Conn, error) { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + loopback := t.config.Loopback() + + var node string + var toKubelet, useTunnel bool + if ip := net.ParseIP(host); ip != nil { + // Destination is an IP address, check to see if the target is a CIDR or node address. + // We can only use the tunnel for egress to pods if the agent supports it. + if nets, err := t.cidrs.ContainingNetworks(ip); err == nil && len(nets) > 0 { + switch n := nets[0].(type) { + case *nodeAddress: + node = n.node + toKubelet = true + useTunnel = true + case *nodeCIDR: + node = n.node + useTunnel = n.clusterEgress + default: + logrus.Debugf("Tunnel server CIDR lookup returned unknown type %T for address %s", nets[0], ip) + } + } + } else { + // Destination is a kubelet by name, it is safe to use the tunnel. + node = host + toKubelet = true + useTunnel = true + } + + // Always dial kubelets via the loopback address. + if toKubelet { + addr = fmt.Sprintf("%s:%s", loopback, port) + } + + // If connecting to something hosted by the local node, don't tunnel + if node == t.config.ServerNodeName { + useTunnel = false + } + + if t.server.HasSession(node) { + if useTunnel { + // Have a session and it is safe to use for this destination, do so. + logrus.Debugf("Tunnel server dialing %s via session to %s", addr, node) + return t.server.Dial(node, 15*time.Second, "tcp", addr) + } + // Have a session but the agent doesn't support tunneling to this destination or + // the destination is local; fall back to direct connection. + logrus.Debugf("Tunnel server dialing %s directly", addr) + return net.Dial("tcp", addr) + } + + // don't provide a proxy connection for anything else + logrus.Debugf("Tunnel server rejecting connection to %s", addr) + return nil, fmt.Errorf("no sessions available for host %s", host) +} diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 32986336cf..2b86a21b91 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -570,7 +570,7 @@ func (e *ETCD) setName(force bool) error { // handler wraps the handler with routes for database info func (e *ETCD) handler(next http.Handler) http.Handler { - mux := mux.NewRouter() + mux := mux.NewRouter().SkipClean(true) mux.Handle("/db/info", e.infoHandler()) mux.NotFoundHandler = next return mux diff --git a/pkg/nodeconfig/nodeconfig.go b/pkg/nodeconfig/nodeconfig.go index 42dff107da..ee6dc46ac7 100644 --- a/pkg/nodeconfig/nodeconfig.go +++ b/pkg/nodeconfig/nodeconfig.go @@ -18,6 +18,7 @@ var ( NodeArgsAnnotation = version.Program + ".io/node-args" NodeEnvAnnotation = version.Program + ".io/node-env" NodeConfigHashAnnotation = version.Program + ".io/node-config-hash" + ClusterEgressLabel = "egress." + version.Program + ".io/cluster" ) const ( @@ -68,6 +69,10 @@ func getNodeEnv() (string, error) { return string(k3sEnvJSON), nil } +// SetNodeConfigAnnotations stores a redacted version of the k3s cli args and +// environment variables as annotations on the node object. It also stores a +// hash of the combined args + variables. These are used by other components +// to determine if the node configuration has been changed. func SetNodeConfigAnnotations(node *corev1.Node) (bool, error) { nodeArgs, err := getNodeArgs() if err != nil { @@ -97,6 +102,21 @@ func SetNodeConfigAnnotations(node *corev1.Node) (bool, error) { return true, nil } +// SetNodeConfigLabels adds labels for functionality flags +// that may not be present on down-level or up-level nodes. +// These labels are used by other components to determine whether +// or not a node supports particular functionality. +func SetNodeConfigLabels(node *corev1.Node) (bool, error) { + if node.Labels == nil { + node.Labels = make(map[string]string) + } + if _, ok := node.Labels[ClusterEgressLabel]; !ok { + node.Labels[ClusterEgressLabel] = "true" + return true, nil + } + return false, nil +} + func isSecret(key string) bool { secretData := []string{ version.ProgramUpper + "_TOKEN", diff --git a/pkg/server/router.go b/pkg/server/router.go index 71334ad234..26c4342519 100644 --- a/pkg/server/router.go +++ b/pkg/server/router.go @@ -44,7 +44,7 @@ func router(ctx context.Context, config *Config, cfg *cmds.Server) http.Handler nodeAuth := passwordBootstrap(ctx, config) prefix := "/v1-" + version.Program - authed := mux.NewRouter() + authed := mux.NewRouter().SkipClean(true) authed.Use(authMiddleware(serverConfig, version.Program+":agent")) authed.Path(prefix + "/serving-kubelet.crt").Handler(servingKubeletCert(serverConfig, serverConfig.Runtime.ServingKubeletKey, nodeAuth)) authed.Path(prefix + "/client-kubelet.crt").Handler(clientKubeletCert(serverConfig, serverConfig.Runtime.ClientKubeletKey, nodeAuth)) @@ -62,22 +62,28 @@ func router(ctx context.Context, config *Config, cfg *cmds.Server) http.Handler authed.NotFoundHandler = apiserver(serverConfig.Runtime) } - nodeAuthed := mux.NewRouter() + nodeAuthed := mux.NewRouter().SkipClean(true) + nodeAuthed.NotFoundHandler = authed nodeAuthed.Use(authMiddleware(serverConfig, user.NodesGroup)) nodeAuthed.Path(prefix + "/connect").Handler(serverConfig.Runtime.Tunnel) - nodeAuthed.NotFoundHandler = authed - serverAuthed := mux.NewRouter() - serverAuthed.Use(authMiddleware(serverConfig, version.Program+":server")) + serverAuthed := mux.NewRouter().SkipClean(true) serverAuthed.NotFoundHandler = nodeAuthed + serverAuthed.Use(authMiddleware(serverConfig, version.Program+":server")) serverAuthed.Path(prefix + "/encrypt/status").Handler(encryptionStatusHandler(serverConfig)) serverAuthed.Path(prefix + "/encrypt/config").Handler(encryptionConfigHandler(ctx, serverConfig)) serverAuthed.Path("/db/info").Handler(nodeAuthed) serverAuthed.Path(prefix + "/server-bootstrap").Handler(bootstrapHandler(serverConfig.Runtime)) + systemAuthed := mux.NewRouter().SkipClean(true) + systemAuthed.NotFoundHandler = serverAuthed + systemAuthed.MethodNotAllowedHandler = serverAuthed + systemAuthed.Use(authMiddleware(serverConfig, user.SystemPrivilegedGroup)) + systemAuthed.Methods(http.MethodConnect).Handler(serverConfig.Runtime.Tunnel) + staticDir := filepath.Join(serverConfig.DataDir, "static") - router := mux.NewRouter() - router.NotFoundHandler = serverAuthed + router := mux.NewRouter().SkipClean(true) + router.NotFoundHandler = systemAuthed router.PathPrefix(staticURL).Handler(serveStatic(staticURL, staticDir)) router.Path("/cacerts").Handler(cacerts(serverConfig.Runtime.ServerCA)) router.Path("/ping").Handler(ping()) diff --git a/tests/unit.go b/tests/unit.go index 9df0acd039..6e1ba5e5a2 100644 --- a/tests/unit.go +++ b/tests/unit.go @@ -48,6 +48,7 @@ func GenerateRuntime(cnf *config.Control) error { return err } + os.MkdirAll(filepath.Join(cnf.DataDir, "etc"), 0700) os.MkdirAll(filepath.Join(cnf.DataDir, "tls"), 0700) os.MkdirAll(filepath.Join(cnf.DataDir, "cred"), 0700)