Add support for configuring the EgressSelector mode

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
This commit is contained in:
Brad Davidson 2022-05-17 12:25:43 -07:00 committed by Brad Davidson
parent aa9065749c
commit 9d7230496d
7 changed files with 175 additions and 62 deletions

View File

@ -441,6 +441,7 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
ContainerRuntimeEndpoint: envInfo.ContainerRuntimeEndpoint,
FlannelBackend: controlConfig.FlannelBackend,
FlannelIPv6Masq: controlConfig.FlannelIPv6Masq,
EgressSelectorMode: controlConfig.EgressSelectorMode,
ServerHTTPSPort: controlConfig.HTTPSPort,
Token: info.String(),
}

View File

@ -12,9 +12,10 @@ import (
"github.com/gorilla/websocket"
agentconfig "github.com/k3s-io/k3s/pkg/agent/config"
"github.com/k3s-io/k3s/pkg/agent/proxy"
"github.com/k3s-io/k3s/pkg/daemons/config"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/pkg/errors"
"github.com/rancher/remotedialer"
"github.com/sirupsen/logrus"
"github.com/yl2chen/cidranger"
@ -37,7 +38,7 @@ var (
}
)
func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) error {
restConfig, err := clientcmd.BuildConfigFromFlags("", config.AgentConfig.KubeConfigK3sController)
if err != nil {
return err
@ -61,6 +62,14 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
tunnel := &agentTunnel{
client: client,
cidrs: cidranger.NewPCTrieRanger(),
mode: config.EgressSelectorMode,
}
if tunnel.mode == daemonconfig.EgressSelectorModeCluster {
for _, cidr := range config.AgentConfig.ClusterCIDRs {
logrus.Infof("Tunnel authorizer added Cluster CIDR %s", cidr)
tunnel.cidrs.Insert(cidranger.NewBasicRangerEntry(*cidr))
}
}
// The loadbalancer is only disabled when there is a local apiserver. Servers without a local
@ -175,8 +184,10 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
}
type agentTunnel struct {
sync.Mutex
client kubernetes.Interface
cidrs cidranger.Ranger
mode string
}
// authorized determines whether or not a dial request is authorized.
@ -191,19 +202,10 @@ func (a *agentTunnel) authorized(ctx context.Context, proto, address string) boo
return true
}
if ip := net.ParseIP(host); ip != nil {
// lazy populate the cidrs from the node object
if a.cidrs.Len() == 0 {
logrus.Debugf("Tunnel authorizer getting Pod CIDRs for %s", os.Getenv("NODE_NAME"))
node, err := a.client.CoreV1().Nodes().Get(ctx, os.Getenv("NODE_NAME"), metav1.GetOptions{})
if err != nil {
logrus.Warnf("Tunnel authorizer failed to get Pod CIDRs: %v", err)
return false
}
for _, cidr := range node.Spec.PodCIDRs {
if _, n, err := net.ParseCIDR(cidr); err == nil {
logrus.Infof("Tunnel authorizer added Pod CIDR %s", cidr)
a.cidrs.Insert(cidranger.NewBasicRangerEntry(*n))
}
// lazy populate pod cidrs from the node object
if a.mode == daemonconfig.EgressSelectorModePod && a.cidrs.Len() == 0 {
if err := a.updatePodCIDRs(ctx); err != nil {
logrus.Warnf("Tunnel authorizer failed to update CIDRs: %v", err)
}
}
if nets, err := a.cidrs.ContainingNetworks(ip); err == nil && len(nets) > 0 {
@ -214,6 +216,28 @@ func (a *agentTunnel) authorized(ctx context.Context, proto, address string) boo
return false
}
func (a *agentTunnel) updatePodCIDRs(ctx context.Context) error {
a.Lock()
defer a.Unlock()
// Return early if another goroutine updated the CIDRs while we were waiting to lock
if a.cidrs.Len() != 0 {
return nil
}
nodeName := os.Getenv("NODE_NAME")
logrus.Debugf("Tunnel authorizer getting Pod CIDRs for %s", nodeName)
node, err := a.client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return errors.Wrap(err, "failed to get Pod CIDRs")
}
for _, cidr := range node.Spec.PodCIDRs {
if _, n, err := net.ParseCIDR(cidr); err == nil {
logrus.Infof("Tunnel authorizer added Pod CIDR %s", cidr)
a.cidrs.Insert(cidranger.NewBasicRangerEntry(*n))
}
}
return nil
}
// connect initiates a connection to the remotedialer server. Incoming dial requests from
// the server will be checked by the authorizer function prior to being fulfilled.
func (a *agentTunnel) connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, tlsConfig *tls.Config) context.CancelFunc {

View File

@ -63,6 +63,7 @@ type Server struct {
ServerURL string
FlannelBackend string
FlannelIPv6Masq bool
EgressSelectorMode string
DefaultLocalStoragePath string
DisableCCM bool
DisableNPC bool
@ -213,6 +214,12 @@ var ServerFlags = []cli.Flag{
Usage: "(networking) Enable IPv6 masquerading for pod",
Destination: &ServerConfig.FlannelIPv6Masq,
},
cli.StringFlag{
Name: "egress-selector-mode",
Usage: "(networking) One of 'agent', cluster', 'pod', 'disabled'",
Destination: &ServerConfig.EgressSelectorMode,
Value: "pod",
},
ServerToken,
cli.StringFlag{
Name: "token-file",

View File

@ -134,6 +134,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
serverConfig.ControlConfig.AdvertisePort = cfg.AdvertisePort
serverConfig.ControlConfig.FlannelBackend = cfg.FlannelBackend
serverConfig.ControlConfig.FlannelIPv6Masq = cfg.FlannelIPv6Masq
serverConfig.ControlConfig.EgressSelectorMode = cfg.EgressSelectorMode
serverConfig.ControlConfig.ExtraCloudControllerArgs = cfg.ExtraCloudControllerArgs
serverConfig.ControlConfig.DisableCCM = cfg.DisableCCM
serverConfig.ControlConfig.DisableNPC = cfg.DisableNPC
@ -528,6 +529,13 @@ func validateNetworkConfiguration(serverConfig server.Config) error {
return errors.New("dual-stack cluster-dns is not supported")
}
switch serverConfig.ControlConfig.EgressSelectorMode {
case config.EgressSelectorModeAgent, config.EgressSelectorModeCluster,
config.EgressSelectorModeDisabled, config.EgressSelectorModePod:
default:
return fmt.Errorf("invalid egress-selector-mode %s", serverConfig.ControlConfig.EgressSelectorMode)
}
return nil
}

View File

@ -25,6 +25,10 @@ const (
FlannelBackendIPSEC = "ipsec"
FlannelBackendWireguard = "wireguard"
FlannelBackendWireguardNative = "wireguard-native"
EgressSelectorModeAgent = "agent"
EgressSelectorModeCluster = "cluster"
EgressSelectorModeDisabled = "disabled"
EgressSelectorModePod = "pod"
CertificateRenewDays = 90
)
@ -37,6 +41,7 @@ type Node struct {
FlannelConfOverride bool
FlannelIface *net.Interface
FlannelIPv6Masq bool
EgressSelectorMode string
Containerd Containerd
Images string
AgentConfig Agent
@ -122,6 +127,7 @@ type CriticalControlArgs struct {
DisableServiceLB bool
FlannelBackend string
FlannelIPv6Masq bool
EgressSelectorMode string
NoCoreDNS bool
ServiceIPRange *net.IPNet
ServiceIPRanges []*net.IPNet

View File

@ -723,7 +723,11 @@ func genEncryptionConfigAndState(controlConfig *config.Control) error {
}
func genEgressSelectorConfig(controlConfig *config.Control) error {
connection := apiserver.Connection{
direct := apiserver.Connection{
ProxyProtocol: apiserver.ProtocolDirect,
}
proxy := apiserver.Connection{
ProxyProtocol: apiserver.ProtocolHTTPConnect,
Transport: &apiserver.Transport{
TCP: &apiserver.TCPTransport{
@ -737,6 +741,17 @@ func genEgressSelectorConfig(controlConfig *config.Control) error {
},
}
clusterConn := direct
controlConn := direct
switch controlConfig.EgressSelectorMode {
case config.EgressSelectorModeAgent:
controlConn = proxy
case config.EgressSelectorModeCluster, config.EgressSelectorModePod:
clusterConn = proxy
controlConn = proxy
}
egressConfig := apiserver.EgressSelectorConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "EgressSelectorConfiguration",
@ -745,11 +760,11 @@ func genEgressSelectorConfig(controlConfig *config.Control) error {
EgressSelections: []apiserver.EgressSelection{
{
Name: "cluster",
Connection: connection,
Connection: clusterConn,
},
{
Name: "controlplane",
Connection: connection,
Connection: controlConn,
},
},
}

View File

@ -6,6 +6,7 @@ import (
"net"
"net/http"
"strings"
"sync"
"time"
"github.com/k3s-io/k3s/pkg/daemons/config"
@ -31,8 +32,9 @@ func setupTunnel(ctx context.Context, cfg *config.Control) (http.Handler, error)
cidrs: cidranger.NewPCTrieRanger(),
config: cfg,
server: remotedialer.New(authorizer, loggingErrorWriter),
egress: map[string]bool{},
}
go tunnel.watchNodes(ctx)
go tunnel.watch(ctx)
return tunnel, nil
}
@ -53,32 +55,24 @@ func authorizer(req *http.Request) (clientKey string, authed bool, err error) {
var _ http.Handler = &TunnelServer{}
type TunnelServer struct {
sync.Mutex
cidrs cidranger.Ranger
client kubernetes.Interface
config *config.Control
server *remotedialer.Server
egress map[string]bool
}
// explicit interface check
var _ cidranger.RangerEntry = &nodeAddress{}
var _ cidranger.RangerEntry = &nodeCIDR{}
var _ cidranger.RangerEntry = &tunnelEntry{}
type nodeAddress struct {
cidr net.IPNet
node string
type tunnelEntry struct {
cidr net.IPNet
node string
kubelet bool
}
func (n *nodeAddress) Network() net.IPNet {
return n.cidr
}
type nodeCIDR struct {
cidr net.IPNet
clusterEgress bool
node string
}
func (n *nodeCIDR) Network() net.IPNet {
func (n *tunnelEntry) Network() net.IPNet {
return n.cidr
}
@ -92,24 +86,39 @@ func (t *TunnelServer) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
}
}
// watchNodes waits for the runtime core to become available,
// and registers an OnChange handler to observe PodCIDR changes.
func (t *TunnelServer) watchNodes(ctx context.Context) {
// watch waits for the runtime core to become available,
// and registers OnChange handlers to observe changes to Nodes (and Endpoints if necessary).
func (t *TunnelServer) watch(ctx context.Context) {
logrus.Infof("Tunnel server egress proxy mode: %s", t.config.EgressSelectorMode)
if t.config.EgressSelectorMode == config.EgressSelectorModeDisabled {
return
}
for {
if t.config.Runtime.Core != nil {
t.config.Runtime.Core.Core().V1().Node().OnChange(ctx, version.Program+"-tunnel-server", t.onChangeNode)
if t.config.EgressSelectorMode == config.EgressSelectorModeCluster {
// Cluster mode watches Endpoints to find what Node is hosting an Endpoint address, as the CNI
// may be using its own IPAM that does not repsect the Node's PodCIDR.
t.config.Runtime.Core.Core().V1().Endpoints().OnChange(ctx, version.Program+"-tunnel-server", t.onChangeEndpoints)
}
return
}
logrus.Infof("Tunnel server waiting for runtime core to become available")
logrus.Infof("Tunnel server egress proxy waiting for runtime core to become available")
time.Sleep(5 * time.Second)
}
}
// onChangeNode updates the node address/CIDR mappings by observing changes to nodes.
// Node addresses are updated in Agent, Cluster, and Pod mode.
// Pod CIDRs are updated only in Pod mode
func (t *TunnelServer) onChangeNode(nodeName string, node *v1.Node) (*v1.Node, error) {
if node != nil {
logrus.Debugf("Tunnel server updating node %s", nodeName)
_, clusterEgress := node.Labels[nodeconfig.ClusterEgressLabel]
t.Lock()
defer t.Unlock()
logrus.Debugf("Tunnel server egress proxy updating node %s", nodeName)
_, t.egress[nodeName] = node.Labels[nodeconfig.ClusterEgressLabel]
// Add all node IP addresses
for _, addr := range node.Status.Addresses {
if addr.Type == v1.NodeInternalIP || addr.Type == v1.NodeExternalIP {
@ -123,18 +132,20 @@ func (t *TunnelServer) onChangeNode(nodeName string, node *v1.Node) (*v1.Node, e
if node.DeletionTimestamp != nil {
t.cidrs.Remove(*n)
} else {
t.cidrs.Insert(&nodeAddress{cidr: *n, node: nodeName})
t.cidrs.Insert(&tunnelEntry{cidr: *n, node: nodeName, kubelet: true})
}
}
}
}
// Add all node PodCIDRs
for _, cidr := range node.Spec.PodCIDRs {
if _, n, err := net.ParseCIDR(cidr); err == nil {
if node.DeletionTimestamp != nil {
t.cidrs.Remove(*n)
} else {
t.cidrs.Insert(&nodeCIDR{cidr: *n, clusterEgress: clusterEgress, node: nodeName})
// Add all Node PodCIDRs, if in pod mode
if t.config.EgressSelectorMode == config.EgressSelectorModePod {
for _, cidr := range node.Spec.PodCIDRs {
if _, n, err := net.ParseCIDR(cidr); err == nil {
if node.DeletionTimestamp != nil {
t.cidrs.Remove(*n)
} else {
t.cidrs.Insert(&tunnelEntry{cidr: *n, node: nodeName})
}
}
}
}
@ -142,6 +153,47 @@ func (t *TunnelServer) onChangeNode(nodeName string, node *v1.Node) (*v1.Node, e
return node, nil
}
// onChangeEndpoits updates the pod address mappings by observing changes to endpoints.
// Only Pod endpoints with a defined NodeName are used, and only in Cluster mode.
func (t *TunnelServer) onChangeEndpoints(endpointsName string, endpoints *v1.Endpoints) (*v1.Endpoints, error) {
if endpoints != nil {
t.Lock()
defer t.Unlock()
logrus.Debugf("Tunnel server egress proxy updating endpoints %s", endpointsName)
// Add all Pod endpoints
for _, subset := range endpoints.Subsets {
for _, addr := range subset.Addresses {
if addr.NodeName != nil && addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
nodeName := *addr.NodeName
address := addr.IP
if strings.Contains(address, ":") {
address += "/128"
} else {
address += "/32"
}
if _, n, err := net.ParseCIDR(address); err == nil {
t.cidrs.Insert(&tunnelEntry{cidr: *n, node: nodeName})
}
}
}
for _, addr := range subset.NotReadyAddresses {
if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
address := addr.IP
if strings.Contains(address, ":") {
address += "/128"
} else {
address += "/32"
}
if _, n, err := net.ParseCIDR(address); err == nil {
t.cidrs.Remove(*n)
}
}
}
}
}
return endpoints, nil
}
// serveConnect attempts to handle the HTTP CONNECT request by dialing
// a connection, either locally or via the remotedialer tunnel.
func (t *TunnelServer) serveConnect(resp http.ResponseWriter, req *http.Request) {
@ -182,19 +234,19 @@ func (t *TunnelServer) dialBackend(addr string) (net.Conn, error) {
var node string
var toKubelet, useTunnel bool
if ip := net.ParseIP(host); ip != nil {
// Destination is an IP address, check to see if the target is a CIDR or node address.
// Destination is an IP address, check to see if the target is a kubelet or pod address.
// We can only use the tunnel for egress to pods if the agent supports it.
if nets, err := t.cidrs.ContainingNetworks(ip); err == nil && len(nets) > 0 {
switch n := nets[0].(type) {
case *nodeAddress:
if n, ok := nets[0].(*tunnelEntry); ok {
node = n.node
toKubelet = true
useTunnel = true
case *nodeCIDR:
node = n.node
useTunnel = n.clusterEgress
default:
logrus.Debugf("Tunnel server CIDR lookup returned unknown type %T for address %s", nets[0], ip)
if n.kubelet {
toKubelet = true
useTunnel = true
} else {
useTunnel = t.egress[node]
}
} else {
logrus.Debugf("Tunnel server egress proxy CIDR lookup returned unknown type for address %s", ip)
}
}
} else {
@ -217,16 +269,16 @@ func (t *TunnelServer) dialBackend(addr string) (net.Conn, error) {
if t.server.HasSession(node) {
if useTunnel {
// Have a session and it is safe to use for this destination, do so.
logrus.Debugf("Tunnel server dialing %s via session to %s", addr, node)
logrus.Debugf("Tunnel server egress proxy dialing %s via session to %s", addr, node)
return t.server.Dial(node, 15*time.Second, "tcp", addr)
}
// Have a session but the agent doesn't support tunneling to this destination or
// the destination is local; fall back to direct connection.
logrus.Debugf("Tunnel server dialing %s directly", addr)
logrus.Debugf("Tunnel server egress proxy dialing %s directly", addr)
return net.Dial("tcp", addr)
}
// don't provide a proxy connection for anything else
logrus.Debugf("Tunnel server rejecting connection to %s", addr)
logrus.Debugf("Tunnel server egress proxy rejecting connection to %s", addr)
return nil, fmt.Errorf("no sessions available for host %s", host)
}