diff --git a/pkg/agent/tunnel/tunnel.go b/pkg/agent/tunnel/tunnel.go index 68f11e7702..badc519ab9 100644 --- a/pkg/agent/tunnel/tunnel.go +++ b/pkg/agent/tunnel/tunnel.go @@ -18,6 +18,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + watchtypes "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/transport" @@ -68,12 +69,10 @@ func Setup(config *config.Node) error { } addresses := []string{config.ServerAddress} - endpointResourceVersion := "" endpoint, _ := client.CoreV1().Endpoints("default").Get("kubernetes", metav1.GetOptions{}) if endpoint != nil { addresses = getAddresses(endpoint) - endpointResourceVersion = endpoint.ResourceVersion } disconnect := map[string]context.CancelFunc{} @@ -89,29 +88,28 @@ func Setup(config *config.Node) error { go func() { connect: for { + time.Sleep(5 * time.Second) watch, err := client.CoreV1().Endpoints("default").Watch(metav1.ListOptions{ FieldSelector: fields.Set{"metadata.name": "kubernetes"}.String(), - ResourceVersion: endpointResourceVersion, + ResourceVersion: "0", }) if err != nil { logrus.Errorf("Unable to watch for tunnel endpoints: %v", err) - time.Sleep(5 * time.Second) continue connect } watching: for { select { case ev, ok := <-watch.ResultChan(): - if !ok { - logrus.Error("Tunnel endpoint watch channel closed") + if !ok || ev.Type == watchtypes.Error { + logrus.Errorf("Tunnel endpoint watch channel closed: %v", ev) continue connect } endpoint, ok := ev.Object.(*v1.Endpoints) if !ok { - logrus.Error("Tunnel could not case event object to endpoint") + logrus.Errorf("Tunnel could not case event object to endpoint: %v", ev) continue watching } - endpointResourceVersion = endpoint.ResourceVersion var addresses = getAddresses(endpoint) logrus.Infof("Tunnel endpoint watch event: %v", addresses)