diff --git a/pkg/agent/tunnel/tunnel.go b/pkg/agent/tunnel/tunnel.go index f7884591c5..be39e21f19 100644 --- a/pkg/agent/tunnel/tunnel.go +++ b/pkg/agent/tunnel/tunnel.go @@ -8,6 +8,7 @@ import ( "fmt" "net" "net/http" + "reflect" "sync" "time" @@ -80,7 +81,7 @@ func Setup(ctx context.Context, config *config.Node) error { wg := &sync.WaitGroup{} for _, address := range addresses { if _, ok := disconnect[address]; !ok { - disconnect[address] = connect(wg, address, config, transportConfig) + disconnect[address] = connect(ctx, wg, address, config, transportConfig) } } @@ -101,7 +102,10 @@ func Setup(ctx context.Context, config *config.Node) error { select { case ev, ok := <-watch.ResultChan(): if !ok || ev.Type == watchtypes.Error { - logrus.Errorf("Tunnel endpoint watch channel closed: %v", ev) + if ok { + logrus.Errorf("Tunnel endpoint watch channel closed: %v", ev) + } + watch.Stop() continue connect } endpoint, ok := ev.Object.(*v1.Endpoints) @@ -110,7 +114,11 @@ func Setup(ctx context.Context, config *config.Node) error { continue watching } - var addresses = getAddresses(endpoint) + newAddresses := getAddresses(endpoint) + if reflect.DeepEqual(newAddresses, addresses) { + continue watching + } + addresses = newAddresses logrus.Infof("Tunnel endpoint watch event: %v", addresses) validEndpoint := map[string]bool{} @@ -118,7 +126,7 @@ func Setup(ctx context.Context, config *config.Node) error { for _, address := range addresses { validEndpoint[address] = true if _, ok := disconnect[address]; !ok { - disconnect[address] = connect(nil, address, config, transportConfig) + disconnect[address] = connect(ctx, nil, address, config, transportConfig) } } @@ -126,6 +134,7 @@ func Setup(ctx context.Context, config *config.Node) error { if !validEndpoint[address] { cancel() delete(disconnect, address) + logrus.Infof("Stopped tunnel to %s", address) } } } @@ -149,7 +158,7 @@ func Setup(ctx context.Context, config *config.Node) error { return nil } -func connect(waitGroup *sync.WaitGroup, address string, config *config.Node, transportConfig *transport.Config) context.CancelFunc { +func connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, config *config.Node, transportConfig *transport.Config) context.CancelFunc { wsURL := fmt.Sprintf("wss://%s/v1-k3s/connect", address) headers := map[string][]string{ "X-K3s-NodeName": {config.AgentConfig.NodeName}, @@ -175,7 +184,7 @@ func connect(waitGroup *sync.WaitGroup, address string, config *config.Node, tra waitGroup.Add(1) } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(rootCtx) go func() { for { @@ -193,7 +202,6 @@ func connect(waitGroup *sync.WaitGroup, address string, config *config.Node, tra if waitGroup != nil { once.Do(waitGroup.Done) } - logrus.Infof("Stopped tunnel to %s", wsURL) return } } diff --git a/trash.lock b/trash.lock index e62623839e..612a6ea5ff 100755 --- a/trash.lock +++ b/trash.lock @@ -142,7 +142,7 @@ import: - package: github.com/hashicorp/golang-lru version: v0.5.0 - package: github.com/ibuildthecloud/kvsql - version: 79f1f6881e28b90976f070aad6edad8e259057c1 + version: 9f00ccc82235f0433c736306d091abd2939b7449 repo: https://github.com/erikwilson/rancher-kvsql.git - package: github.com/imdario/mergo version: v0.3.5 @@ -230,7 +230,8 @@ import: - package: github.com/rancher/helm-controller version: v0.2.1 - package: github.com/rancher/remotedialer - version: 4a5a661be67697d6369df54ef62d5a30b0385697 + version: 7c71ffa8f5d7a181704d92bb8a33b0c7d07dccaa + repo: https://github.com/erikwilson/rancher-remotedialer.git - package: github.com/rancher/wrangler version: 7737c167e16514a38229bc64c839cee8cd14e6d3 - package: github.com/rancher/wrangler-api diff --git a/vendor.conf b/vendor.conf index 026c040647..e30760ab58 100644 --- a/vendor.conf +++ b/vendor.conf @@ -14,7 +14,7 @@ k8s.io/kubernetes v1.14.4-k3s.1 ht github.com/rancher/wrangler 7737c167e16514a38229bc64c839cee8cd14e6d3 github.com/rancher/wrangler-api v0.1.4 github.com/rancher/dynamiclistener c08b499d17195fbc2c1764b21c322951811629a5 https://github.com/erikwilson/rancher-dynamiclistener.git -github.com/rancher/remotedialer 4a5a661be67697d6369df54ef62d5a30b0385697 +github.com/rancher/remotedialer 7c71ffa8f5d7a181704d92bb8a33b0c7d07dccaa https://github.com/erikwilson/rancher-remotedialer.git github.com/rancher/helm-controller v0.2.1 github.com/matryer/moq ee5226d43009 https://github.com/rancher/moq.git github.com/coreos/flannel 823afe66b2266bf71f5bec24e6e28b26d70cfc7c https://github.com/ibuildthecloud/flannel.git diff --git a/vendor/github.com/rancher/remotedialer/client.go b/vendor/github.com/rancher/remotedialer/client.go index d56635e5f0..2dee3b0b4c 100644 --- a/vendor/github.com/rancher/remotedialer/client.go +++ b/vendor/github.com/rancher/remotedialer/client.go @@ -18,7 +18,7 @@ func ClientConnect(ctx context.Context, wsURL string, headers http.Header, diale } } -func connectToProxy(ctx context.Context, proxyURL string, headers http.Header, auth ConnectAuthorizer, dialer *websocket.Dialer, onConnect func(context.Context) error) error { +func connectToProxy(rootCtx context.Context, proxyURL string, headers http.Header, auth ConnectAuthorizer, dialer *websocket.Dialer, onConnect func(context.Context) error) error { logrus.WithField("url", proxyURL).Info("Connecting to proxy") if dialer == nil { @@ -31,11 +31,11 @@ func connectToProxy(ctx context.Context, proxyURL string, headers http.Header, a } defer ws.Close() - if onConnect != nil { - ctxOnConnect, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx, cancel := context.WithCancel(rootCtx) + defer cancel() - if err := onConnect(ctxOnConnect); err != nil { + if onConnect != nil { + if err := onConnect(ctx); err != nil { return err } } @@ -45,7 +45,7 @@ func connectToProxy(ctx context.Context, proxyURL string, headers http.Header, a result := make(chan error, 1) go func() { - _, err = session.Serve() + _, err = session.Serve(ctx) result <- err }() diff --git a/vendor/github.com/rancher/remotedialer/peer.go b/vendor/github.com/rancher/remotedialer/peer.go index 17de7d7f98..bbaea463e8 100644 --- a/vendor/github.com/rancher/remotedialer/peer.go +++ b/vendor/github.com/rancher/remotedialer/peer.go @@ -106,7 +106,7 @@ outer: } s.sessions.addListener(session) - _, err = session.Serve() + _, err = session.Serve(context.Background()) s.sessions.removeListener(session) session.Close() diff --git a/vendor/github.com/rancher/remotedialer/server.go b/vendor/github.com/rancher/remotedialer/server.go index dc87202d1c..1c4995424e 100644 --- a/vendor/github.com/rancher/remotedialer/server.go +++ b/vendor/github.com/rancher/remotedialer/server.go @@ -1,6 +1,7 @@ package remotedialer import ( + "context" "net/http" "sync" "time" @@ -71,7 +72,7 @@ func (s *Server) ServeHTTP(rw http.ResponseWriter, req *http.Request) { defer s.sessions.remove(session) // Don't need to associate req.Context() to the Session, it will cancel otherwise - code, err := session.Serve() + code, err := session.Serve(context.Background()) if err != nil { // Hijacked so we can't write to the client logrus.Infof("error in remotedialer server [%d]: %v", code, err) diff --git a/vendor/github.com/rancher/remotedialer/session.go b/vendor/github.com/rancher/remotedialer/session.go index 5f440d4dc6..a33a78c412 100644 --- a/vendor/github.com/rancher/remotedialer/session.go +++ b/vendor/github.com/rancher/remotedialer/session.go @@ -63,8 +63,8 @@ func newSession(sessionKey int64, clientKey string, conn *websocket.Conn) *Sessi } } -func (s *Session) startPings() { - ctx, cancel := context.WithCancel(context.Background()) +func (s *Session) startPings(rootCtx context.Context) { + ctx, cancel := context.WithCancel(rootCtx) s.pingCancel = cancel s.pingWait.Add(1) @@ -99,9 +99,9 @@ func (s *Session) stopPings() { s.pingWait.Wait() } -func (s *Session) Serve() (int, error) { +func (s *Session) Serve(ctx context.Context) (int, error) { if s.client { - s.startPings() + s.startPings(ctx) } for {