Replace DefaultProxyDialerFn dialer injection with EgressSelector support

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
This commit is contained in:
Brad Davidson 2022-04-04 14:54:50 -07:00 committed by Brad Davidson
parent e763fadbba
commit ce5b9347c9
14 changed files with 431 additions and 50 deletions

1
go.mod
View File

@ -115,6 +115,7 @@ require (
github.com/tchap/go-patricia v2.3.0+incompatible // indirect github.com/tchap/go-patricia v2.3.0+incompatible // indirect
github.com/urfave/cli v1.22.4 github.com/urfave/cli v1.22.4
github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5
github.com/yl2chen/cidranger v1.0.2
go.etcd.io/etcd/api/v3 v3.5.4 go.etcd.io/etcd/api/v3 v3.5.4
go.etcd.io/etcd/client/pkg/v3 v3.5.4 go.etcd.io/etcd/client/pkg/v3 v3.5.4
go.etcd.io/etcd/client/v3 v3.5.4 go.etcd.io/etcd/client/v3 v3.5.4

2
go.sum
View File

@ -1262,6 +1262,8 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q
github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca h1:1CFlNzQhALwjS9mBAUkycX616GzgsuYUOCHA5+HSlXI= github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca h1:1CFlNzQhALwjS9mBAUkycX616GzgsuYUOCHA5+HSlXI=
github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yl2chen/cidranger v1.0.2 h1:lbOWZVCG1tCRX4u24kuM1Tb4nHqWkDxwLdoS+SevawU=
github.com/yl2chen/cidranger v1.0.2/go.mod h1:9U1yz7WPYDwf0vpNWFaeRh0bjwz5RVgRy/9UEQfHl0g=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=

View File

@ -319,15 +319,21 @@ func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes t
updateNode = true updateNode = true
} }
if changed, err := nodeconfig.SetNodeConfigLabels(node); err != nil {
return false, err
} else if changed {
updateNode = true
}
if updateNode { if updateNode {
if _, err := nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil { if _, err := nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil {
logrus.Infof("Failed to update node %s: %v", agentConfig.NodeName, err) logrus.Infof("Failed to set annotations and labels on node %s: %v", agentConfig.NodeName, err)
return false, nil return false, nil
} }
logrus.Infof("labels have been set successfully on node: %s", agentConfig.NodeName) logrus.Infof("Annotations and labels have been set successfully on node: %s", agentConfig.NodeName)
return true, nil return true, nil
} }
logrus.Infof("labels have already set on node: %s", agentConfig.NodeName) logrus.Infof("Annotations and labels have already set on node: %s", agentConfig.NodeName)
return true, nil return true, nil
} }

View File

@ -5,6 +5,7 @@ import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net" "net"
"os"
"reflect" "reflect"
"sync" "sync"
@ -16,6 +17,7 @@ import (
"github.com/k3s-io/k3s/pkg/version" "github.com/k3s-io/k3s/pkg/version"
"github.com/rancher/remotedialer" "github.com/rancher/remotedialer"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/yl2chen/cidranger"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
@ -56,6 +58,11 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
return err return err
} }
tunnel := &agentTunnel{
client: client,
cidrs: cidranger.NewPCTrieRanger(),
}
// The loadbalancer is only disabled when there is a local apiserver. Servers without a local // The loadbalancer is only disabled when there is a local apiserver. Servers without a local
// apiserver load-balance to themselves initially, then switch over to an apiserver node as soon // apiserver load-balance to themselves initially, then switch over to an apiserver node as soon
// as we get some addresses from the code below. // as we get some addresses from the code below.
@ -83,7 +90,7 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for _, address := range proxy.SupervisorAddresses() { for _, address := range proxy.SupervisorAddresses() {
if _, ok := disconnect[address]; !ok { if _, ok := disconnect[address]; !ok {
disconnect[address] = connect(ctx, wg, address, tlsConfig) disconnect[address] = tunnel.connect(ctx, wg, address, tlsConfig)
} }
} }
@ -136,7 +143,7 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
for _, address := range proxy.SupervisorAddresses() { for _, address := range proxy.SupervisorAddresses() {
validEndpoint[address] = true validEndpoint[address] = true
if _, ok := disconnect[address]; !ok { if _, ok := disconnect[address]; !ok {
disconnect[address] = connect(ctx, nil, address, tlsConfig) disconnect[address] = tunnel.connect(ctx, nil, address, tlsConfig)
} }
} }
@ -167,7 +174,49 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
return nil return nil
} }
func connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, tlsConfig *tls.Config) context.CancelFunc { type agentTunnel struct {
client kubernetes.Interface
cidrs cidranger.Ranger
}
// authorized determines whether or not a dial request is authorized.
// Connections to the local kubelet ports are allowed.
// Connections to other IPs are allowed if they are contained in a CIDR managed by this node.
// All other requests are rejected.
func (a *agentTunnel) authorized(ctx context.Context, proto, address string) bool {
logrus.Debugf("Tunnel authorizer checking dial request for %s", address)
host, port, err := net.SplitHostPort(address)
if err == nil {
if proto == "tcp" && ports[port] && (host == "127.0.0.1" || host == "::1") {
return true
}
if ip := net.ParseIP(host); ip != nil {
// lazy populate the cidrs from the node object
if a.cidrs.Len() == 0 {
logrus.Debugf("Tunnel authorizer getting Pod CIDRs for %s", os.Getenv("NODE_NAME"))
node, err := a.client.CoreV1().Nodes().Get(ctx, os.Getenv("NODE_NAME"), metav1.GetOptions{})
if err != nil {
logrus.Warnf("Tunnel authorizer failed to get Pod CIDRs: %v", err)
return false
}
for _, cidr := range node.Spec.PodCIDRs {
if _, n, err := net.ParseCIDR(cidr); err == nil {
logrus.Infof("Tunnel authorizer added Pod CIDR %s", cidr)
a.cidrs.Insert(cidranger.NewBasicRangerEntry(*n))
}
}
}
if nets, err := a.cidrs.ContainingNetworks(ip); err == nil && len(nets) > 0 {
return true
}
}
}
return false
}
// connect initiates a connection to the remotedialer server. Incoming dial requests from
// the server will be checked by the authorizer function prior to being fulfilled.
func (a *agentTunnel) connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, tlsConfig *tls.Config) context.CancelFunc {
wsURL := fmt.Sprintf("wss://%s/v1-"+version.Program+"/connect", address) wsURL := fmt.Sprintf("wss://%s/v1-"+version.Program+"/connect", address)
ws := &websocket.Dialer{ ws := &websocket.Dialer{
TLSClientConfig: tlsConfig, TLSClientConfig: tlsConfig,
@ -183,8 +232,7 @@ func connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string,
go func() { go func() {
for { for {
remotedialer.ClientConnect(ctx, wsURL, nil, ws, func(proto, address string) bool { remotedialer.ClientConnect(ctx, wsURL, nil, ws, func(proto, address string) bool {
host, port, err := net.SplitHostPort(address) return a.authorized(rootCtx, proto, address)
return err == nil && proto == "tcp" && ports[port] && (host == "127.0.0.1" || host == "::1")
}, func(_ context.Context) error { }, func(_ context.Context) error {
if waitGroup != nil { if waitGroup != nil {
once.Do(waitGroup.Done) once.Do(waitGroup.Done)

View File

@ -50,7 +50,7 @@ func (c *Cluster) newListener(ctx context.Context) (net.Listener, http.Handler,
return nil, nil, err return nil, nil, err
} }
storage := tlsStorage(ctx, c.config.DataDir, c.config.Runtime) storage := tlsStorage(ctx, c.config.DataDir, c.config.Runtime)
return dynamiclistener.NewListener(tcp, storage, cert, key, dynamiclistener.Config{ return wrapHandler(dynamiclistener.NewListener(tcp, storage, cert, key, dynamiclistener.Config{
ExpirationDaysCheck: config.CertificateRenewDays, ExpirationDaysCheck: config.CertificateRenewDays,
Organization: []string{version.Program}, Organization: []string{version.Program},
SANs: append(c.config.SANs, "kubernetes", "kubernetes.default", "kubernetes.default.svc", "kubernetes.default.svc."+c.config.ClusterDomain), SANs: append(c.config.SANs, "kubernetes", "kubernetes.default", "kubernetes.default.svc", "kubernetes.default.svc."+c.config.ClusterDomain),
@ -70,7 +70,7 @@ func (c *Cluster) newListener(ctx context.Context) (net.Listener, http.Handler,
} }
return false return false
}, },
}) }))
} }
// initClusterAndHTTPS sets up the dynamic tls listener, request router, // initClusterAndHTTPS sets up the dynamic tls listener, request router,
@ -131,3 +131,18 @@ func tlsStorage(ctx context.Context, dataDir string, runtime *config.ControlRunt
return runtime.Core return runtime.Core
}, metav1.NamespaceSystem, version.Program+"-serving", cache) }, metav1.NamespaceSystem, version.Program+"-serving", cache)
} }
// wrapHandler wraps the dynamiclistener request handler, adding a User-Agent value to
// CONNECT requests that will prevent DynamicListener from adding the request's Host
// header to the SAN list. CONNECT requests set the Host header to the target of the
// proxy connection, so it is not correct to add this value to the certificate. It would
// be nice if we could do this with with the FilterCN callback, but unfortunately that
// callback does not offer access to the request that triggered the change.
func wrapHandler(listener net.Listener, handler http.Handler, err error) (net.Listener, http.Handler, error) {
return listener, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodConnect {
r.Header.Add("User-Agent", "mozilla")
}
handler.ServeHTTP(w, r)
}), err
}

View File

@ -276,6 +276,8 @@ type ControlRuntime struct {
Tunnel http.Handler Tunnel http.Handler
Authenticator authenticator.Request Authenticator authenticator.Request
EgressSelectorConfig string
ClientAuthProxyCert string ClientAuthProxyCert string
ClientAuthProxyKey string ClientAuthProxyKey string

View File

@ -22,12 +22,12 @@ import (
"github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/passwd" "github.com/k3s-io/k3s/pkg/passwd"
"github.com/k3s-io/k3s/pkg/token" "github.com/k3s-io/k3s/pkg/token"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version" "github.com/k3s-io/k3s/pkg/version"
certutil "github.com/rancher/dynamiclistener/cert" certutil "github.com/rancher/dynamiclistener/cert"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/apis/apiserver"
apiserverconfigv1 "k8s.io/apiserver/pkg/apis/config/v1" apiserverconfigv1 "k8s.io/apiserver/pkg/apis/config/v1"
"k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authentication/user"
) )
@ -137,6 +137,8 @@ func CreateRuntimeCertFiles(config *config.Control) {
runtime.ClientKubeletKey = filepath.Join(config.DataDir, "tls", "client-kubelet.key") runtime.ClientKubeletKey = filepath.Join(config.DataDir, "tls", "client-kubelet.key")
runtime.ServingKubeletKey = filepath.Join(config.DataDir, "tls", "serving-kubelet.key") runtime.ServingKubeletKey = filepath.Join(config.DataDir, "tls", "serving-kubelet.key")
runtime.EgressSelectorConfig = filepath.Join(config.DataDir, "etc", "egress-selector-config.yaml")
runtime.ClientAuthProxyCert = filepath.Join(config.DataDir, "tls", "client-auth-proxy.crt") runtime.ClientAuthProxyCert = filepath.Join(config.DataDir, "tls", "client-auth-proxy.crt")
runtime.ClientAuthProxyKey = filepath.Join(config.DataDir, "tls", "client-auth-proxy.key") runtime.ClientAuthProxyKey = filepath.Join(config.DataDir, "tls", "client-auth-proxy.key")
@ -181,6 +183,10 @@ func GenServerDeps(config *config.Control) error {
return err return err
} }
if err := genEgressSelectorConfig(config); err != nil {
return err
}
return readTokens(runtime) return readTokens(runtime)
} }
@ -308,12 +314,7 @@ func genClientCerts(config *config.Control) error {
var certGen bool var certGen bool
IPv6OnlyService, _ := util.IsIPv6OnlyCIDRs(config.ServiceIPRanges) apiEndpoint := fmt.Sprintf("https://%s:%d", config.Loopback(), config.APIServerPort)
ip := "127.0.0.1"
if IPv6OnlyService {
ip = "[::1]"
}
apiEndpoint := fmt.Sprintf("https://%s:%d", ip, config.APIServerPort)
certGen, err = factory("system:admin", []string{user.SystemPrivilegedGroup}, runtime.ClientAdminCert, runtime.ClientAdminKey) certGen, err = factory("system:admin", []string{user.SystemPrivilegedGroup}, runtime.ClientAdminCert, runtime.ClientAdminKey)
if err != nil { if err != nil {
@ -720,3 +721,42 @@ func genEncryptionConfigAndState(controlConfig *config.Control) error {
ann := "start-" + hex.EncodeToString(encryptionConfigHash[:]) ann := "start-" + hex.EncodeToString(encryptionConfigHash[:])
return ioutil.WriteFile(controlConfig.Runtime.EncryptionHash, []byte(ann), 0600) return ioutil.WriteFile(controlConfig.Runtime.EncryptionHash, []byte(ann), 0600)
} }
func genEgressSelectorConfig(controlConfig *config.Control) error {
connection := apiserver.Connection{
ProxyProtocol: apiserver.ProtocolHTTPConnect,
Transport: &apiserver.Transport{
TCP: &apiserver.TCPTransport{
URL: fmt.Sprintf("https://%s:%d", controlConfig.Loopback(), controlConfig.SupervisorPort),
TLSConfig: &apiserver.TLSConfig{
CABundle: controlConfig.Runtime.ServerCA,
ClientKey: controlConfig.Runtime.ClientKubeAPIKey,
ClientCert: controlConfig.Runtime.ClientKubeAPICert,
},
},
},
}
egressConfig := apiserver.EgressSelectorConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "EgressSelectorConfiguration",
APIVersion: "apiserver.k8s.io/v1beta1",
},
EgressSelections: []apiserver.EgressSelection{
{
Name: "cluster",
Connection: connection,
},
{
Name: "controlplane",
Connection: connection,
},
},
}
b, err := json.Marshal(egressConfig)
if err != nil {
return err
}
return ioutil.WriteFile(controlConfig.Runtime.EgressSelectorConfig, b, 0600)
}

View File

@ -0,0 +1,57 @@
package proxy
import (
"io"
"net"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type proxy struct {
lconn, rconn io.ReadWriteCloser
done bool
errc chan error
}
func Proxy(lconn, rconn net.Conn) error {
p := &proxy{
lconn: lconn,
rconn: rconn,
errc: make(chan error),
}
defer p.rconn.Close()
defer p.lconn.Close()
go p.pipe(p.lconn, p.rconn)
go p.pipe(p.rconn, p.lconn)
return <-p.errc
}
func (p *proxy) err(err error) {
if p.done {
return
}
if !errors.Is(err, io.EOF) {
logrus.Warnf("Proxy error: %v", err)
}
p.done = true
p.errc <- err
}
func (p *proxy) pipe(src, dst io.ReadWriter) {
buff := make([]byte, 1<<15)
for {
n, err := src.Read(buff)
if err != nil {
p.err(errors.Wrap(err, "read failed"))
return
}
_, err = dst.Write(buff[:n])
if err != nil {
p.err(errors.Wrap(err, "write failed"))
return
}
}
}

View File

@ -46,7 +46,12 @@ func Server(ctx context.Context, cfg *config.Control) error {
return errors.Wrap(err, "preparing server") return errors.Wrap(err, "preparing server")
} }
cfg.Runtime.Tunnel = setupTunnel() tunnel, err := setupTunnel(ctx, cfg)
if err != nil {
return errors.Wrap(err, "setup tunnel server")
}
cfg.Runtime.Tunnel = tunnel
proxyutil.DisableProxyHostnameCheck = true proxyutil.DisableProxyHostnameCheck = true
authArgs := []string{ authArgs := []string{
@ -178,6 +183,8 @@ func apiServer(ctx context.Context, cfg *config.Control) error {
} else { } else {
argsMap["bind-address"] = cfg.APIServerBindAddress argsMap["bind-address"] = cfg.APIServerBindAddress
} }
argsMap["enable-aggregator-routing"] = "true"
argsMap["egress-selector-config-file"] = runtime.EgressSelectorConfig
argsMap["tls-cert-file"] = runtime.ServingKubeAPICert argsMap["tls-cert-file"] = runtime.ServingKubeAPICert
argsMap["tls-private-key-file"] = runtime.ServingKubeAPIKey argsMap["tls-private-key-file"] = runtime.ServingKubeAPIKey
argsMap["service-account-key-file"] = runtime.ServiceKey argsMap["service-account-key-file"] = runtime.ServiceKey
@ -253,6 +260,7 @@ func prepare(ctx context.Context, config *config.Control) error {
return err return err
} }
os.MkdirAll(filepath.Join(config.DataDir, "etc"), 0700)
os.MkdirAll(filepath.Join(config.DataDir, "tls"), 0700) os.MkdirAll(filepath.Join(config.DataDir, "tls"), 0700)
os.MkdirAll(filepath.Join(config.DataDir, "cred"), 0700) os.MkdirAll(filepath.Join(config.DataDir, "cred"), 0700)

View File

@ -2,45 +2,38 @@ package control
import ( import (
"context" "context"
"fmt"
"net" "net"
"net/http" "net/http"
"strings" "strings"
"time" "time"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/control/proxy"
"github.com/k3s-io/k3s/pkg/nodeconfig"
"github.com/k3s-io/k3s/pkg/version"
"github.com/rancher/remotedialer" "github.com/rancher/remotedialer"
"github.com/rancher/wrangler/pkg/kv"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
utilnet "k8s.io/apimachinery/pkg/util/net" "github.com/yl2chen/cidranger"
v1 "k8s.io/api/core/v1"
"k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/kubernetes/cmd/kube-apiserver/app" "k8s.io/client-go/kubernetes"
) )
func loggingErrorWriter(rw http.ResponseWriter, req *http.Request, code int, err error) { func loggingErrorWriter(rw http.ResponseWriter, req *http.Request, code int, err error) {
logrus.Debugf("remoteDialer error: %d %v", code, err) logrus.Debugf("Tunnel server error: %d %v", code, err)
rw.WriteHeader(code) rw.WriteHeader(code)
rw.Write([]byte(err.Error())) rw.Write([]byte(err.Error()))
} }
func setupTunnel() http.Handler { func setupTunnel(ctx context.Context, cfg *config.Control) (http.Handler, error) {
tunnelServer := remotedialer.New(authorizer, loggingErrorWriter) tunnel := &TunnelServer{
setupProxyDialer(tunnelServer) cidrs: cidranger.NewPCTrieRanger(),
return tunnelServer config: cfg,
} server: remotedialer.New(authorizer, loggingErrorWriter),
}
func setupProxyDialer(tunnelServer *remotedialer.Server) { go tunnel.watchNodes(ctx)
app.DefaultProxyDialerFn = utilnet.DialFunc(func(ctx context.Context, network, address string) (net.Conn, error) { return tunnel, nil
_, port, _ := net.SplitHostPort(address)
addr := "127.0.0.1"
if port != "" {
addr += ":" + port
}
nodeName, _ := kv.Split(address, ":")
if tunnelServer.HasSession(nodeName) {
return tunnelServer.Dial(nodeName, 15*time.Second, "tcp", addr)
}
var d net.Dialer
return d.DialContext(ctx, network, address)
})
} }
func authorizer(req *http.Request) (clientKey string, authed bool, err error) { func authorizer(req *http.Request) (clientKey string, authed bool, err error) {
@ -55,3 +48,185 @@ func authorizer(req *http.Request) (clientKey string, authed bool, err error) {
return "", false, nil return "", false, nil
} }
// explicit interface check
var _ http.Handler = &TunnelServer{}
type TunnelServer struct {
cidrs cidranger.Ranger
client kubernetes.Interface
config *config.Control
server *remotedialer.Server
}
// explicit interface check
var _ cidranger.RangerEntry = &nodeAddress{}
var _ cidranger.RangerEntry = &nodeCIDR{}
type nodeAddress struct {
cidr net.IPNet
node string
}
func (n *nodeAddress) Network() net.IPNet {
return n.cidr
}
type nodeCIDR struct {
cidr net.IPNet
clusterEgress bool
node string
}
func (n *nodeCIDR) Network() net.IPNet {
return n.cidr
}
// ServeHTTP handles either CONNECT requests, or websocket requests to the remotedialer server
func (t *TunnelServer) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
logrus.Debugf("Tunnel server handing %s %s request for %s from %s", req.Proto, req.Method, req.URL, req.RemoteAddr)
if req.Method == http.MethodConnect {
t.serveConnect(resp, req)
} else {
t.server.ServeHTTP(resp, req)
}
}
// watchNodes waits for the runtime core to become available,
// and registers an OnChange handler to observe PodCIDR changes.
func (t *TunnelServer) watchNodes(ctx context.Context) {
for {
if t.config.Runtime.Core != nil {
t.config.Runtime.Core.Core().V1().Node().OnChange(ctx, version.Program+"-tunnel-server", t.onChangeNode)
return
}
logrus.Infof("Tunnel server waiting for runtime core to become available")
time.Sleep(5 * time.Second)
}
}
// onChangeNode updates the node address/CIDR mappings by observing changes to nodes.
func (t *TunnelServer) onChangeNode(nodeName string, node *v1.Node) (*v1.Node, error) {
if node != nil {
logrus.Debugf("Tunnel server updating node %s", nodeName)
_, clusterEgress := node.Labels[nodeconfig.ClusterEgressLabel]
// Add all node IP addresses
for _, addr := range node.Status.Addresses {
if addr.Type == v1.NodeInternalIP || addr.Type == v1.NodeExternalIP {
address := addr.Address
if strings.Contains(address, ":") {
address += "/128"
} else {
address += "/32"
}
if _, n, err := net.ParseCIDR(address); err == nil {
if node.DeletionTimestamp != nil {
t.cidrs.Remove(*n)
} else {
t.cidrs.Insert(&nodeAddress{cidr: *n, node: nodeName})
}
}
}
}
// Add all node PodCIDRs
for _, cidr := range node.Spec.PodCIDRs {
if _, n, err := net.ParseCIDR(cidr); err == nil {
if node.DeletionTimestamp != nil {
t.cidrs.Remove(*n)
} else {
t.cidrs.Insert(&nodeCIDR{cidr: *n, clusterEgress: clusterEgress, node: nodeName})
}
}
}
}
return node, nil
}
// serveConnect attempts to handle the HTTP CONNECT request by dialing
// a connection, either locally or via the remotedialer tunnel.
func (t *TunnelServer) serveConnect(resp http.ResponseWriter, req *http.Request) {
bconn, err := t.dialBackend(req.Host)
if err != nil {
http.Error(resp, fmt.Sprintf("no tunnels available: %v", err), http.StatusInternalServerError)
return
}
hijacker, ok := resp.(http.Hijacker)
if !ok {
http.Error(resp, "hijacking not supported", http.StatusInternalServerError)
return
}
resp.WriteHeader(http.StatusOK)
rconn, _, err := hijacker.Hijack()
if err != nil {
http.Error(resp, err.Error(), http.StatusInternalServerError)
return
}
proxy.Proxy(rconn, bconn)
}
// dialBackend determines where to route the connection request to, and returns
// a dialed connection if possible. Note that in the case of a remotedialer
// tunnel connection, the agent may return an error if the agent's authorizer
// denies the connection, or if there is some other error in actually dialing
// the requested endpoint.
func (t *TunnelServer) dialBackend(addr string) (net.Conn, error) {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
loopback := t.config.Loopback()
var node string
var toKubelet, useTunnel bool
if ip := net.ParseIP(host); ip != nil {
// Destination is an IP address, check to see if the target is a CIDR or node address.
// We can only use the tunnel for egress to pods if the agent supports it.
if nets, err := t.cidrs.ContainingNetworks(ip); err == nil && len(nets) > 0 {
switch n := nets[0].(type) {
case *nodeAddress:
node = n.node
toKubelet = true
useTunnel = true
case *nodeCIDR:
node = n.node
useTunnel = n.clusterEgress
default:
logrus.Debugf("Tunnel server CIDR lookup returned unknown type %T for address %s", nets[0], ip)
}
}
} else {
// Destination is a kubelet by name, it is safe to use the tunnel.
node = host
toKubelet = true
useTunnel = true
}
// Always dial kubelets via the loopback address.
if toKubelet {
addr = fmt.Sprintf("%s:%s", loopback, port)
}
// If connecting to something hosted by the local node, don't tunnel
if node == t.config.ServerNodeName {
useTunnel = false
}
if t.server.HasSession(node) {
if useTunnel {
// Have a session and it is safe to use for this destination, do so.
logrus.Debugf("Tunnel server dialing %s via session to %s", addr, node)
return t.server.Dial(node, 15*time.Second, "tcp", addr)
}
// Have a session but the agent doesn't support tunneling to this destination or
// the destination is local; fall back to direct connection.
logrus.Debugf("Tunnel server dialing %s directly", addr)
return net.Dial("tcp", addr)
}
// don't provide a proxy connection for anything else
logrus.Debugf("Tunnel server rejecting connection to %s", addr)
return nil, fmt.Errorf("no sessions available for host %s", host)
}

View File

@ -570,7 +570,7 @@ func (e *ETCD) setName(force bool) error {
// handler wraps the handler with routes for database info // handler wraps the handler with routes for database info
func (e *ETCD) handler(next http.Handler) http.Handler { func (e *ETCD) handler(next http.Handler) http.Handler {
mux := mux.NewRouter() mux := mux.NewRouter().SkipClean(true)
mux.Handle("/db/info", e.infoHandler()) mux.Handle("/db/info", e.infoHandler())
mux.NotFoundHandler = next mux.NotFoundHandler = next
return mux return mux

View File

@ -18,6 +18,7 @@ var (
NodeArgsAnnotation = version.Program + ".io/node-args" NodeArgsAnnotation = version.Program + ".io/node-args"
NodeEnvAnnotation = version.Program + ".io/node-env" NodeEnvAnnotation = version.Program + ".io/node-env"
NodeConfigHashAnnotation = version.Program + ".io/node-config-hash" NodeConfigHashAnnotation = version.Program + ".io/node-config-hash"
ClusterEgressLabel = "egress." + version.Program + ".io/cluster"
) )
const ( const (
@ -68,6 +69,10 @@ func getNodeEnv() (string, error) {
return string(k3sEnvJSON), nil return string(k3sEnvJSON), nil
} }
// SetNodeConfigAnnotations stores a redacted version of the k3s cli args and
// environment variables as annotations on the node object. It also stores a
// hash of the combined args + variables. These are used by other components
// to determine if the node configuration has been changed.
func SetNodeConfigAnnotations(node *corev1.Node) (bool, error) { func SetNodeConfigAnnotations(node *corev1.Node) (bool, error) {
nodeArgs, err := getNodeArgs() nodeArgs, err := getNodeArgs()
if err != nil { if err != nil {
@ -97,6 +102,21 @@ func SetNodeConfigAnnotations(node *corev1.Node) (bool, error) {
return true, nil return true, nil
} }
// SetNodeConfigLabels adds labels for functionality flags
// that may not be present on down-level or up-level nodes.
// These labels are used by other components to determine whether
// or not a node supports particular functionality.
func SetNodeConfigLabels(node *corev1.Node) (bool, error) {
if node.Labels == nil {
node.Labels = make(map[string]string)
}
if _, ok := node.Labels[ClusterEgressLabel]; !ok {
node.Labels[ClusterEgressLabel] = "true"
return true, nil
}
return false, nil
}
func isSecret(key string) bool { func isSecret(key string) bool {
secretData := []string{ secretData := []string{
version.ProgramUpper + "_TOKEN", version.ProgramUpper + "_TOKEN",

View File

@ -44,7 +44,7 @@ func router(ctx context.Context, config *Config, cfg *cmds.Server) http.Handler
nodeAuth := passwordBootstrap(ctx, config) nodeAuth := passwordBootstrap(ctx, config)
prefix := "/v1-" + version.Program prefix := "/v1-" + version.Program
authed := mux.NewRouter() authed := mux.NewRouter().SkipClean(true)
authed.Use(authMiddleware(serverConfig, version.Program+":agent")) authed.Use(authMiddleware(serverConfig, version.Program+":agent"))
authed.Path(prefix + "/serving-kubelet.crt").Handler(servingKubeletCert(serverConfig, serverConfig.Runtime.ServingKubeletKey, nodeAuth)) authed.Path(prefix + "/serving-kubelet.crt").Handler(servingKubeletCert(serverConfig, serverConfig.Runtime.ServingKubeletKey, nodeAuth))
authed.Path(prefix + "/client-kubelet.crt").Handler(clientKubeletCert(serverConfig, serverConfig.Runtime.ClientKubeletKey, nodeAuth)) authed.Path(prefix + "/client-kubelet.crt").Handler(clientKubeletCert(serverConfig, serverConfig.Runtime.ClientKubeletKey, nodeAuth))
@ -62,22 +62,28 @@ func router(ctx context.Context, config *Config, cfg *cmds.Server) http.Handler
authed.NotFoundHandler = apiserver(serverConfig.Runtime) authed.NotFoundHandler = apiserver(serverConfig.Runtime)
} }
nodeAuthed := mux.NewRouter() nodeAuthed := mux.NewRouter().SkipClean(true)
nodeAuthed.NotFoundHandler = authed
nodeAuthed.Use(authMiddleware(serverConfig, user.NodesGroup)) nodeAuthed.Use(authMiddleware(serverConfig, user.NodesGroup))
nodeAuthed.Path(prefix + "/connect").Handler(serverConfig.Runtime.Tunnel) nodeAuthed.Path(prefix + "/connect").Handler(serverConfig.Runtime.Tunnel)
nodeAuthed.NotFoundHandler = authed
serverAuthed := mux.NewRouter() serverAuthed := mux.NewRouter().SkipClean(true)
serverAuthed.Use(authMiddleware(serverConfig, version.Program+":server"))
serverAuthed.NotFoundHandler = nodeAuthed serverAuthed.NotFoundHandler = nodeAuthed
serverAuthed.Use(authMiddleware(serverConfig, version.Program+":server"))
serverAuthed.Path(prefix + "/encrypt/status").Handler(encryptionStatusHandler(serverConfig)) serverAuthed.Path(prefix + "/encrypt/status").Handler(encryptionStatusHandler(serverConfig))
serverAuthed.Path(prefix + "/encrypt/config").Handler(encryptionConfigHandler(ctx, serverConfig)) serverAuthed.Path(prefix + "/encrypt/config").Handler(encryptionConfigHandler(ctx, serverConfig))
serverAuthed.Path("/db/info").Handler(nodeAuthed) serverAuthed.Path("/db/info").Handler(nodeAuthed)
serverAuthed.Path(prefix + "/server-bootstrap").Handler(bootstrapHandler(serverConfig.Runtime)) serverAuthed.Path(prefix + "/server-bootstrap").Handler(bootstrapHandler(serverConfig.Runtime))
systemAuthed := mux.NewRouter().SkipClean(true)
systemAuthed.NotFoundHandler = serverAuthed
systemAuthed.MethodNotAllowedHandler = serverAuthed
systemAuthed.Use(authMiddleware(serverConfig, user.SystemPrivilegedGroup))
systemAuthed.Methods(http.MethodConnect).Handler(serverConfig.Runtime.Tunnel)
staticDir := filepath.Join(serverConfig.DataDir, "static") staticDir := filepath.Join(serverConfig.DataDir, "static")
router := mux.NewRouter() router := mux.NewRouter().SkipClean(true)
router.NotFoundHandler = serverAuthed router.NotFoundHandler = systemAuthed
router.PathPrefix(staticURL).Handler(serveStatic(staticURL, staticDir)) router.PathPrefix(staticURL).Handler(serveStatic(staticURL, staticDir))
router.Path("/cacerts").Handler(cacerts(serverConfig.Runtime.ServerCA)) router.Path("/cacerts").Handler(cacerts(serverConfig.Runtime.ServerCA))
router.Path("/ping").Handler(ping()) router.Path("/ping").Handler(ping())

View File

@ -48,6 +48,7 @@ func GenerateRuntime(cnf *config.Control) error {
return err return err
} }
os.MkdirAll(filepath.Join(cnf.DataDir, "etc"), 0700)
os.MkdirAll(filepath.Join(cnf.DataDir, "tls"), 0700) os.MkdirAll(filepath.Join(cnf.DataDir, "tls"), 0700)
os.MkdirAll(filepath.Join(cnf.DataDir, "cred"), 0700) os.MkdirAll(filepath.Join(cnf.DataDir, "cred"), 0700)