mirror of
https://github.com/k3s-io/k3s.git
synced 2024-06-07 19:41:36 +00:00
commit
906dc2fba9
@ -15,7 +15,12 @@ import (
|
||||
"github.com/rancher/k3s/pkg/daemons/config"
|
||||
"github.com/rancher/remotedialer"
|
||||
"github.com/sirupsen/logrus"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/client-go/transport"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -25,6 +30,27 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
func getAddresses(endpoint *v1.Endpoints) []string {
|
||||
serverAddresses := []string{}
|
||||
if endpoint == nil {
|
||||
return serverAddresses
|
||||
}
|
||||
for _, subset := range endpoint.Subsets {
|
||||
var port string
|
||||
if len(subset.Ports) > 0 {
|
||||
port = fmt.Sprint(subset.Ports[0].Port)
|
||||
}
|
||||
for _, address := range subset.Addresses {
|
||||
serverAddress := address.IP
|
||||
if port != "" {
|
||||
serverAddress += ":" + port
|
||||
}
|
||||
serverAddresses = append(serverAddresses, serverAddress)
|
||||
}
|
||||
}
|
||||
return serverAddresses
|
||||
}
|
||||
|
||||
func Setup(config *config.Node) error {
|
||||
restConfig, err := clientcmd.BuildConfigFromFlags("", config.AgentConfig.KubeConfigNode)
|
||||
if err != nil {
|
||||
@ -36,7 +62,78 @@ func Setup(config *config.Node) error {
|
||||
return err
|
||||
}
|
||||
|
||||
wsURL := fmt.Sprintf("wss://%s/v1-k3s/connect", config.ServerAddress)
|
||||
client, err := kubernetes.NewForConfig(restConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
addresses := []string{config.ServerAddress}
|
||||
|
||||
endpoint, _ := client.CoreV1().Endpoints("default").Get("kubernetes", metav1.GetOptions{})
|
||||
if endpoint != nil {
|
||||
addresses = getAddresses(endpoint)
|
||||
}
|
||||
|
||||
disconnect := map[string]context.CancelFunc{}
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
for _, address := range addresses {
|
||||
if _, ok := disconnect[address]; !ok {
|
||||
disconnect[address] = connect(wg, address, config, transportConfig)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
go func() {
|
||||
connect:
|
||||
for {
|
||||
watch, err := client.CoreV1().Endpoints("default").Watch(metav1.ListOptions{
|
||||
FieldSelector: fields.Set{"metadata.name": "kubernetes"}.String(),
|
||||
})
|
||||
if err != nil {
|
||||
logrus.Errorf("Unable to watch for endpoints: %v", err)
|
||||
time.Sleep(5 * time.Second)
|
||||
continue connect
|
||||
}
|
||||
watching:
|
||||
for {
|
||||
select {
|
||||
case ev, ok := <-watch.ResultChan():
|
||||
if !ok {
|
||||
logrus.Error("endpoint watch channel closed")
|
||||
continue connect
|
||||
}
|
||||
endpoint, ok := ev.Object.(*v1.Endpoints)
|
||||
if !ok {
|
||||
logrus.Error("could not case event object to endpoint")
|
||||
continue watching
|
||||
}
|
||||
|
||||
validEndpoint := map[string]bool{}
|
||||
var addresses = getAddresses(endpoint)
|
||||
for _, address := range addresses {
|
||||
validEndpoint[address] = true
|
||||
if _, ok := disconnect[address]; !ok {
|
||||
disconnect[address] = connect(wg, address, config, transportConfig)
|
||||
}
|
||||
}
|
||||
|
||||
for address, cancel := range disconnect {
|
||||
if !validEndpoint[address] {
|
||||
cancel()
|
||||
delete(disconnect, address)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func connect(waitGroup *sync.WaitGroup, address string, config *config.Node, transportConfig *transport.Config) context.CancelFunc {
|
||||
wsURL := fmt.Sprintf("wss://%s/v1-k3s/connect", address)
|
||||
headers := map[string][]string{
|
||||
"X-K3s-NodeName": {config.AgentConfig.NodeName},
|
||||
}
|
||||
@ -57,23 +154,30 @@ func Setup(config *config.Node) error {
|
||||
}
|
||||
|
||||
once := sync.Once{}
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
if waitGroup != nil {
|
||||
waitGroup.Add(1)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
go func() {
|
||||
for {
|
||||
logrus.Infof("Connecting to %s", wsURL)
|
||||
remotedialer.ClientConnect(wsURL, http.Header(headers), ws, func(proto, address string) bool {
|
||||
remotedialer.ClientConnect(ctx, wsURL, http.Header(headers), ws, func(proto, address string) bool {
|
||||
host, port, err := net.SplitHostPort(address)
|
||||
return err == nil && proto == "tcp" && ports[port] && host == "127.0.0.1"
|
||||
}, func(_ context.Context) error {
|
||||
once.Do(wg.Done)
|
||||
if waitGroup != nil {
|
||||
once.Do(waitGroup.Done)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
if ctx.Err() != nil {
|
||||
logrus.Infof("Stopping tunnel to %s", wsURL)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
return nil
|
||||
return cancel
|
||||
}
|
||||
|
@ -225,12 +225,11 @@ import:
|
||||
- package: github.com/prometheus/procfs
|
||||
version: 65c1f6f8f0fc1e2185eb9863a3bc751496404259
|
||||
- package: github.com/rancher/dynamiclistener
|
||||
version: 03208cf106d553d58d3a73267aa473a45af63120
|
||||
repo: https://github.com/erikwilson/rancher-dynamiclistener.git
|
||||
version: 4716ac2362986f28bede3f3caf5d1ce347da55b0
|
||||
- package: github.com/rancher/helm-controller
|
||||
version: d5f5c830231110722f14d446d3b2038e5cdf1532
|
||||
- package: github.com/rancher/remotedialer
|
||||
version: 20ec38853712bb6d348f0db9ac47d34c954c6b00
|
||||
version: 66218bc42b4fa27c34523c0d19a41a0e2b74983d
|
||||
- package: github.com/rancher/wrangler
|
||||
version: d53925110e19e055d1c21df3b49021833e883f33
|
||||
- package: github.com/rancher/wrangler-api
|
||||
|
@ -10,12 +10,11 @@ package=github.com/opencontainers/runc/libcontainer/specconv
|
||||
package=github.com/opencontainers/runc/contrib/cmd/recvtty
|
||||
|
||||
k8s.io/kubernetes v1.14.3-k3s.2 https://github.com/rancher/k3s.git transitive=true,staging=true
|
||||
github.com/rancher/dynamiclistener 03208cf106d553d58d3a73267aa473a45af63120 https://github.com/erikwilson/rancher-dynamiclistener.git
|
||||
|
||||
github.com/rancher/wrangler d53925110e19e055d1c21df3b49021833e883f33
|
||||
github.com/rancher/wrangler-api efe26ac6a9d720e1bfa5a8cc5f8dce5ad598ce26
|
||||
github.com/rancher/dynamiclistener 077eb13a904f2c62496f31b158135d9743526f82
|
||||
github.com/rancher/remotedialer 20ec38853712bb6d348f0db9ac47d34c954c6b00
|
||||
github.com/rancher/dynamiclistener 4716ac2362986f28bede3f3caf5d1ce347da55b0
|
||||
github.com/rancher/remotedialer 66218bc42b4fa27c34523c0d19a41a0e2b74983d
|
||||
github.com/rancher/helm-controller d5f5c830231110722f14d446d3b2038e5cdf1532
|
||||
github.com/matryer/moq ee5226d43009 https://github.com/rancher/moq.git
|
||||
github.com/coreos/flannel 823afe66b2266bf71f5bec24e6e28b26d70cfc7c https://github.com/ibuildthecloud/flannel.git
|
||||
|
25
vendor/github.com/rancher/remotedialer/client.go
generated
vendored
25
vendor/github.com/rancher/remotedialer/client.go
generated
vendored
@ -11,14 +11,14 @@ import (
|
||||
|
||||
type ConnectAuthorizer func(proto, address string) bool
|
||||
|
||||
func ClientConnect(wsURL string, headers http.Header, dialer *websocket.Dialer, auth ConnectAuthorizer, onConnect func(context.Context) error) {
|
||||
if err := connectToProxy(wsURL, headers, auth, dialer, onConnect); err != nil {
|
||||
logrus.WithError(err).Error("Failed to connect to proxy")
|
||||
func ClientConnect(ctx context.Context, wsURL string, headers http.Header, dialer *websocket.Dialer, auth ConnectAuthorizer, onConnect func(context.Context) error) {
|
||||
if err := connectToProxy(ctx, wsURL, headers, auth, dialer, onConnect); err != nil {
|
||||
logrus.WithError(err).Error("Remotedialer proxy error")
|
||||
time.Sleep(time.Duration(5) * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func connectToProxy(proxyURL string, headers http.Header, auth ConnectAuthorizer, dialer *websocket.Dialer, onConnect func(context.Context) error) error {
|
||||
func connectToProxy(ctx context.Context, proxyURL string, headers http.Header, auth ConnectAuthorizer, dialer *websocket.Dialer, onConnect func(context.Context) error) error {
|
||||
logrus.WithField("url", proxyURL).Info("Connecting to proxy")
|
||||
|
||||
if dialer == nil {
|
||||
@ -41,7 +41,18 @@ func connectToProxy(proxyURL string, headers http.Header, auth ConnectAuthorizer
|
||||
}
|
||||
|
||||
session := NewClientSession(auth, ws)
|
||||
_, err = session.Serve()
|
||||
session.Close()
|
||||
return err
|
||||
defer session.Close()
|
||||
|
||||
result := make(chan error, 1)
|
||||
go func() {
|
||||
_, err = session.Serve()
|
||||
result <- err
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err := <-result:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user