k3s/pkg/agent/tunnel/tunnel.go
Brad Davidson 5ab6d21a7d
Increase agent's apiserver ready timeout (#4454)
Since we now start the server's agent sooner and in the background, we
may need to wait longer than 30 seconds for the apiserver to become
ready on downstream projects such as RKE2.

Since this essentially just serves as an analogue for the server's
apiReady channel, there's little danger in setting it to something
relatively high.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
2021-11-11 14:01:49 -07:00

189 lines
4.9 KiB
Go

package tunnel
import (
"context"
"crypto/tls"
"fmt"
"net"
"reflect"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/rancher/k3s/pkg/agent/proxy"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/util"
"github.com/rancher/k3s/pkg/version"
"github.com/rancher/remotedialer"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
watchtypes "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
var (
ports = map[string]bool{
"10250": true,
"10010": true,
}
)
func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
restConfig, err := clientcmd.BuildConfigFromFlags("", config.AgentConfig.KubeConfigK3sController)
if err != nil {
return err
}
client, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return err
}
nodeRestConfig, err := clientcmd.BuildConfigFromFlags("", config.AgentConfig.KubeConfigKubelet)
if err != nil {
return err
}
tlsConfig, err := rest.TLSConfigFor(nodeRestConfig)
if err != nil {
return err
}
// Do an immediate fill of proxy addresses from the server endpoint list, before going into the
// watch loop. This will fail on the first server, as the apiserver won't be started yet - but
// that's fine because the local server is already seeded into the proxy address list.
endpoint, _ := client.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{})
if endpoint != nil {
addresses := util.GetAddresses(endpoint)
if len(addresses) > 0 {
proxy.Update(util.GetAddresses(endpoint))
}
}
// Attempt to connect to supervisors, storing their cancellation function for later when we
// need to disconnect.
disconnect := map[string]context.CancelFunc{}
wg := &sync.WaitGroup{}
for _, address := range proxy.SupervisorAddresses() {
if _, ok := disconnect[address]; !ok {
disconnect[address] = connect(ctx, wg, address, tlsConfig)
}
}
// Once the apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come
// and go from the cluster. We go into a faster but noisier connect loop if the watch fails
// following a successful connection.
go func() {
if err := util.WaitForAPIServerReady(ctx, client, util.DefaultAPIServerReadyTimeout); err != nil {
logrus.Warnf("Tunnel endpoint watch failed to wait for apiserver ready: %v", err)
}
connect:
for {
time.Sleep(5 * time.Second)
watch, err := client.CoreV1().Endpoints("default").Watch(ctx, metav1.ListOptions{
FieldSelector: fields.Set{"metadata.name": "kubernetes"}.String(),
ResourceVersion: "0",
})
if err != nil {
logrus.Warnf("Unable to watch for tunnel endpoints: %v", err)
continue connect
}
watching:
for {
ev, ok := <-watch.ResultChan()
if !ok || ev.Type == watchtypes.Error {
if ok {
logrus.Errorf("Tunnel endpoint watch channel closed: %v", ev)
}
watch.Stop()
continue connect
}
endpoint, ok := ev.Object.(*v1.Endpoints)
if !ok {
logrus.Errorf("Tunnel could not convert event object to endpoint: %v", ev)
continue watching
}
newAddresses := util.GetAddresses(endpoint)
if reflect.DeepEqual(newAddresses, proxy.SupervisorAddresses()) {
continue watching
}
proxy.Update(newAddresses)
validEndpoint := map[string]bool{}
for _, address := range proxy.SupervisorAddresses() {
validEndpoint[address] = true
if _, ok := disconnect[address]; !ok {
disconnect[address] = connect(ctx, nil, address, tlsConfig)
}
}
for address, cancel := range disconnect {
if !validEndpoint[address] {
cancel()
delete(disconnect, address)
logrus.Infof("Stopped tunnel to %s", address)
}
}
}
}
}()
wait := make(chan int, 1)
go func() {
wg.Wait()
wait <- 0
}()
select {
case <-ctx.Done():
logrus.Error("Tunnel context canceled while waiting for connection")
return ctx.Err()
case <-wait:
}
return nil
}
func connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, tlsConfig *tls.Config) context.CancelFunc {
wsURL := fmt.Sprintf("wss://%s/v1-"+version.Program+"/connect", address)
ws := &websocket.Dialer{
TLSClientConfig: tlsConfig,
}
once := sync.Once{}
if waitGroup != nil {
waitGroup.Add(1)
}
ctx, cancel := context.WithCancel(rootCtx)
go func() {
for {
remotedialer.ClientConnect(ctx, wsURL, nil, ws, func(proto, address string) bool {
host, port, err := net.SplitHostPort(address)
return err == nil && proto == "tcp" && ports[port] && host == "127.0.0.1"
}, func(_ context.Context) error {
if waitGroup != nil {
once.Do(waitGroup.Done)
}
return nil
})
if ctx.Err() != nil {
if waitGroup != nil {
once.Do(waitGroup.Done)
}
return
}
}
}()
return cancel
}