Port to wrangler

This commit is contained in:
Darren Shepherd 2019-05-09 15:05:51 -07:00
parent 16f7aaab66
commit c0702b0492
26 changed files with 716 additions and 275 deletions

View File

@ -1,5 +1,6 @@
//go:generate go run types/codegen/cleanup/main.go
//go:generate go run types/codegen/main.go
//go:generate go run pkg/codegen/cleanup/main.go
//go:generate /bin/rm -rf pkg/generated
//go:generate go run pkg/codegen/main.go
//go:generate go fmt pkg/deploy/zz_generated_bindata.go
//go:generate go fmt pkg/static/zz_generated_bindata.go
//go:generate go fmt pkg/openapi/zz_generated_bindata.go

View File

@ -21,8 +21,8 @@ import (
"github.com/pkg/errors"
"github.com/rancher/k3s/pkg/cli/cmds"
"github.com/rancher/k3s/pkg/clientaccess"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/norman/pkg/clientaccess"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/net"

View File

@ -6,7 +6,7 @@ import (
"github.com/pkg/errors"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/norman/pkg/proxy"
"github.com/rancher/k3s/pkg/proxy"
"github.com/sirupsen/logrus"
)

View File

@ -16,9 +16,9 @@ import (
"github.com/rancher/k3s/pkg/agent/syssetup"
"github.com/rancher/k3s/pkg/agent/tunnel"
"github.com/rancher/k3s/pkg/cli/cmds"
"github.com/rancher/k3s/pkg/clientaccess"
"github.com/rancher/k3s/pkg/daemons/agent"
"github.com/rancher/k3s/pkg/rootless"
"github.com/rancher/norman/pkg/clientaccess"
"github.com/sirupsen/logrus"
)

View File

@ -13,7 +13,7 @@ import (
"github.com/gorilla/websocket"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/norman/pkg/remotedialer"
"github.com/rancher/remotedialer"
"github.com/sirupsen/logrus"
"k8s.io/client-go/tools/clientcmd"
)

View File

@ -0,0 +1,21 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by main. DO NOT EDIT.
// +k8s:deepcopy-gen=package
// +groupName=k3s.cattle.io
package v1

View File

@ -7,6 +7,9 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type ListenerConfig struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
@ -14,6 +17,9 @@ type ListenerConfig struct {
Status dynamiclistener.ListenerStatus `json:"status,omitempty"`
}
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type Addon struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
@ -31,6 +37,9 @@ type AddonStatus struct {
GVKs []schema.GroupVersionKind `json:"gvks,omitempty"`
}
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type HelmChart struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

View File

@ -8,10 +8,11 @@ import (
"strings"
"time"
"github.com/rancher/wrangler/pkg/signals"
"github.com/rancher/k3s/pkg/agent"
"github.com/rancher/k3s/pkg/cli/cmds"
"github.com/rancher/k3s/pkg/datadir"
"github.com/rancher/norman/signal"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
)
@ -67,7 +68,7 @@ func Run(ctx *cli.Context) error {
cfg.DataDir = dataDir
cfg.Labels = append(cfg.Labels, "node-role.kubernetes.io/worker=true")
contextCtx := signal.SigTermCancelContext(context.Background())
contextCtx := signals.SetupSignalHandler(context.Background())
return agent.Run(contextCtx, cfg)
}

View File

@ -19,7 +19,7 @@ import (
"github.com/rancher/k3s/pkg/datadir"
"github.com/rancher/k3s/pkg/rootless"
"github.com/rancher/k3s/pkg/server"
"github.com/rancher/norman/signal"
"github.com/rancher/wrangler/pkg/signals"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
"k8s.io/apimachinery/pkg/util/net"
@ -145,7 +145,7 @@ func run(app *cli.Context, cfg *cmds.Server) error {
notifySocket := os.Getenv("NOTIFY_SOCKET")
os.Unsetenv("NOTIFY_SOCKET")
ctx := signal.SigTermCancelContext(context.Background())
ctx := signals.SetupSignalHandler(context.Background())
certs, err := server.StartServer(ctx, &serverConfig)
if err != nil {
return err

View File

@ -0,0 +1,277 @@
package clientaccess
import (
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"encoding/hex"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
"github.com/pkg/errors"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)
var (
insecureClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}
)
type OverrideURLCallback func(config []byte) (*url.URL, error)
type clientToken struct {
caHash string
username string
password string
}
func AgentAccessInfoToTempKubeConfig(tempDir, server, token string) (string, error) {
f, err := ioutil.TempFile(tempDir, "tmp-")
if err != nil {
return "", err
}
if err := f.Close(); err != nil {
return "", err
}
err = accessInfoToKubeConfig(f.Name(), server, token)
if err != nil {
os.Remove(f.Name())
}
return f.Name(), err
}
func AgentAccessInfoToKubeConfig(destFile, server, token string) error {
return accessInfoToKubeConfig(destFile, server, token)
}
type Info struct {
URL string `json:"url,omitempty"`
CACerts []byte `json:"cacerts,omitempty"`
username string
password string
Token string `json:"token,omitempty"`
}
func (i *Info) WriteKubeConfig(destFile string) error {
return clientcmd.WriteToFile(*i.KubeConfig(), destFile)
}
func (i *Info) KubeConfig() *clientcmdapi.Config {
config := clientcmdapi.NewConfig()
cluster := clientcmdapi.NewCluster()
cluster.CertificateAuthorityData = i.CACerts
cluster.Server = i.URL
authInfo := clientcmdapi.NewAuthInfo()
if i.password != "" {
authInfo.Username = i.username
authInfo.Password = i.password
} else if i.Token != "" {
if username, pass, ok := ParseUsernamePassword(i.Token); ok {
authInfo.Username = username
authInfo.Password = pass
} else {
authInfo.Token = i.Token
}
}
context := clientcmdapi.NewContext()
context.AuthInfo = "default"
context.Cluster = "default"
config.Clusters["default"] = cluster
config.AuthInfos["default"] = authInfo
config.Contexts["default"] = context
config.CurrentContext = "default"
return config
}
func ParseAndValidateToken(server, token string) (*Info, error) {
url, err := url.Parse(server)
if err != nil {
return nil, errors.Wrapf(err, "Invalid url, failed to parse %s", server)
}
if url.Scheme != "https" {
return nil, fmt.Errorf("only https:// URLs are supported, invalid scheme: %s", server)
}
for strings.HasSuffix(url.Path, "/") {
url.Path = url.Path[:len(url.Path)-1]
}
parsedToken, err := parseToken(token)
if err != nil {
return nil, err
}
cacerts, err := GetCACerts(*url)
if err != nil {
return nil, err
}
if len(cacerts) > 0 && len(parsedToken.caHash) > 0 {
if ok, hash, newHash := validateCACerts(cacerts, parsedToken.caHash); !ok {
return nil, fmt.Errorf("token does not match the server %s != %s", hash, newHash)
}
}
if err := validateToken(*url, cacerts, parsedToken.username, parsedToken.password); err != nil {
return nil, err
}
return &Info{
URL: url.String(),
CACerts: cacerts,
username: parsedToken.username,
password: parsedToken.password,
Token: token,
}, nil
}
func accessInfoToKubeConfig(destFile, server, token string) error {
info, err := ParseAndValidateToken(server, token)
if err != nil {
return err
}
return info.WriteKubeConfig(destFile)
}
func validateToken(u url.URL, cacerts []byte, username, password string) error {
u.Path = "/apis"
_, err := get(u.String(), GetHTTPClient(cacerts), username, password)
if err != nil {
return errors.Wrap(err, "token is not valid")
}
return nil
}
func validateCACerts(cacerts []byte, hash string) (bool, string, string) {
if len(cacerts) == 0 && hash == "" {
return true, "", ""
}
digest := sha256.Sum256([]byte(cacerts))
newHash := hex.EncodeToString(digest[:])
return hash == newHash, hash, newHash
}
func ParseUsernamePassword(token string) (string, string, bool) {
parsed, err := parseToken(token)
if err != nil {
return "", "", false
}
return parsed.username, parsed.password, true
}
func parseToken(token string) (clientToken, error) {
var result clientToken
if !strings.HasPrefix(token, "K10") {
return result, fmt.Errorf("token is not a valid token format")
}
token = token[3:]
parts := strings.SplitN(token, "::", 2)
token = parts[0]
if len(parts) > 1 {
result.caHash = parts[0]
token = parts[1]
}
parts = strings.SplitN(token, ":", 2)
if len(parts) != 2 {
return result, fmt.Errorf("token credentials are the wrong format")
}
result.username = parts[0]
result.password = parts[1]
return result, nil
}
func GetHTTPClient(cacerts []byte) *http.Client {
if len(cacerts) == 0 {
return http.DefaultClient
}
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(cacerts)
return &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: &tls.Config{
RootCAs: pool,
},
},
}
}
func Get(path string, info *Info) ([]byte, error) {
u, err := url.Parse(info.URL)
if err != nil {
return nil, err
}
u.Path = path
return get(u.String(), GetHTTPClient(info.CACerts), info.username, info.password)
}
func GetCACerts(u url.URL) ([]byte, error) {
u.Path = "/cacerts"
url := u.String()
_, err := get(url, http.DefaultClient, "", "")
if err == nil {
return nil, nil
}
cacerts, err := get(url, insecureClient, "", "")
if err != nil {
return nil, errors.Wrapf(err, "failed to get CA certs at %s", url)
}
_, err = get(url, GetHTTPClient(cacerts), "", "")
if err != nil {
return nil, errors.Wrapf(err, "server %s is not trusted", url)
}
return cacerts, nil
}
func get(u string, client *http.Client, username, password string) ([]byte, error) {
req, err := http.NewRequest(http.MethodGet, u, nil)
if err != nil {
return nil, err
}
if username != "" {
req.SetBasicAuth(username, password)
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("%s: %s", u, resp.Status)
}
return ioutil.ReadAll(resp.Body)
}

View File

@ -1,12 +1,12 @@
package main
import (
"github.com/rancher/norman/generator/cleanup"
"github.com/rancher/wrangler/pkg/cleanup"
"github.com/sirupsen/logrus"
)
func main() {
if err := cleanup.Cleanup("./types"); err != nil {
if err := cleanup.Cleanup("./pkg/apis"); err != nil {
logrus.Fatal(err)
}
}

View File

@ -1,9 +1,12 @@
package main
import (
"os"
bindata "github.com/jteeuwen/go-bindata"
v1 "github.com/rancher/k3s/types/apis/k3s.cattle.io/v1"
"github.com/rancher/norman/generator"
v1 "github.com/rancher/k3s/pkg/apis/k3s.cattle.io/v1"
controllergen "github.com/rancher/wrangler/pkg/controller-gen"
"github.com/rancher/wrangler/pkg/controller-gen/args"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
@ -16,6 +19,7 @@ var (
)
func main() {
os.Unsetenv("GOPATH")
bc := &bindata.Config{
Input: []bindata.InputConfig{
{
@ -82,38 +86,56 @@ func main() {
logrus.Fatal(err)
}
if err := generator.DefaultGenerate(v1.Schemas, basePackage, false, nil); err != nil {
logrus.Fatal(err)
}
if err := generator.ControllersForForeignTypes(basePackage, corev1.SchemeGroupVersion, []interface{}{
corev1.ServiceAccount{},
corev1.Endpoints{},
corev1.Service{},
corev1.Pod{},
corev1.ConfigMap{},
}, []interface{}{
corev1.Node{},
}); err != nil {
logrus.Fatal(err)
}
if err := generator.ControllersForForeignTypes(basePackage, appsv1.SchemeGroupVersion, []interface{}{
appsv1.DaemonSet{},
appsv1.Deployment{},
}, nil); err != nil {
logrus.Fatal(err)
}
if err := generator.ControllersForForeignTypes(basePackage, batchv1.SchemeGroupVersion, []interface{}{
batchv1.Job{},
}, nil); err != nil {
logrus.Fatal(err)
}
if err := generator.ControllersForForeignTypes(basePackage, rbacv1.SchemeGroupVersion, nil, []interface{}{
rbacv1.ClusterRoleBinding{},
}); err != nil {
logrus.Fatal(err)
}
controllergen.Run(args.Options{
OutputPackage: "github.com/rancher/k3s/pkg/generated",
Boilerplate: "scripts/boilerplate.go.txt",
Groups: map[string]args.Group{
"k3s.cattle.io": {
Types: []interface{}{
v1.ListenerConfig{},
v1.Addon{},
v1.HelmChart{},
},
GenerateTypes: true,
},
"": {
Types: []interface{}{
corev1.ServiceAccount{},
corev1.Endpoints{},
corev1.Service{},
corev1.Pod{},
corev1.ConfigMap{},
corev1.Node{},
},
InformersPackage: "k8s.io/client-go/informers",
ClientSetPackage: "k8s.io/client-go/kubernetes",
ListersPackage: "k8s.io/client-go/listers",
},
"apps": {
Types: []interface{}{
appsv1.Deployment{},
appsv1.DaemonSet{},
},
InformersPackage: "k8s.io/client-go/informers",
ClientSetPackage: "k8s.io/client-go/kubernetes",
ListersPackage: "k8s.io/client-go/listers",
},
"batch": {
Types: []interface{}{
batchv1.Job{},
},
InformersPackage: "k8s.io/client-go/informers",
ClientSetPackage: "k8s.io/client-go/kubernetes",
ListersPackage: "k8s.io/client-go/listers",
},
"rbac": {
Types: []interface{}{
rbacv1.ClusterRoleBinding{},
},
InformersPackage: "k8s.io/client-go/informers",
ClientSetPackage: "k8s.io/client-go/kubernetes",
ListersPackage: "k8s.io/client-go/listers",
},
},
})
}

View File

@ -23,8 +23,8 @@ import (
"strings"
"time"
certutil "github.com/rancher/dynamiclistener/cert"
"github.com/rancher/k3s/pkg/daemons/config"
certutil "github.com/rancher/norman/pkg/cert"
"github.com/sirupsen/logrus"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/kubernetes/cmd/kube-apiserver/app"

View File

@ -6,8 +6,8 @@ import (
"net/http"
"time"
"github.com/rancher/norman/pkg/kv"
"github.com/rancher/norman/pkg/remotedialer"
"github.com/rancher/remotedialer"
"github.com/rancher/wrangler/pkg/kv"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/kubernetes/cmd/kube-apiserver/app"

View File

@ -4,7 +4,7 @@ import (
"os"
"github.com/pkg/errors"
"github.com/rancher/norman/pkg/resolvehome"
"github.com/rancher/wrangler/pkg/resolvehome"
)
const (

View File

@ -14,20 +14,16 @@ import (
"time"
errors2 "github.com/pkg/errors"
v1 "github.com/rancher/k3s/types/apis/k3s.cattle.io/v1"
"github.com/rancher/norman"
"github.com/rancher/norman/objectclient"
"github.com/rancher/norman/pkg/objectset"
"github.com/rancher/norman/types"
v12 "github.com/rancher/k3s/pkg/apis/k3s.cattle.io/v1"
v1 "github.com/rancher/k3s/pkg/generated/controllers/k3s.cattle.io/v1"
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/merr"
"github.com/rancher/wrangler/pkg/objectset"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
yamlDecoder "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
)
const (
@ -35,21 +31,15 @@ const (
startKey = "_start_"
)
func WatchFiles(ctx context.Context, bases ...string) error {
server := norman.GetServer(ctx)
addons := v1.ClientsFrom(ctx).Addon
func WatchFiles(ctx context.Context, addons v1.AddonController, bases ...string) error {
w := &watcher{
addonCache: addons.Cache(),
addons: addons,
bases: bases,
restConfig: *server.Runtime.LocalConfig,
discovery: server.K8sClient.Discovery(),
clients: map[schema.GroupVersionKind]*objectclient.ObjectClient{},
}
addons.Enqueue("", startKey)
addons.Interface().AddHandler(ctx, "addon-start", func(key string, _ *v1.Addon) (runtime.Object, error) {
addons.OnChange(ctx, "addon-start", func(key string, _ *v12.Addon) (*v12.Addon, error) {
if key == startKey {
go w.start(ctx)
}
@ -60,13 +50,10 @@ func WatchFiles(ctx context.Context, bases ...string) error {
}
type watcher struct {
addonCache v1.AddonClientCache
apply apply.Apply
addonCache v1.AddonCache
addons v1.AddonClient
bases []string
restConfig rest.Config
discovery discovery.DiscoveryInterface
clients map[schema.GroupVersionKind]*objectclient.ObjectClient
namespaced map[schema.GroupVersionKind]bool
}
func (w *watcher) start(ctx context.Context) {
@ -93,7 +80,7 @@ func (w *watcher) listFiles(force bool) error {
}
}
return types.NewErrors(errs...)
return merr.NewErrors(errs...)
}
func (w *watcher) listFilesIn(base string, force bool) error {
@ -122,7 +109,7 @@ func (w *watcher) listFilesIn(base string, force bool) error {
}
}
return types.NewErrors(errs...)
return merr.NewErrors(errs...)
}
func (w *watcher) deploy(path string, compareChecksum bool) error {
@ -148,24 +135,14 @@ func (w *watcher) deploy(path string, compareChecksum bool) error {
return err
}
clients, err := w.apply(addon, objectSet)
if err != nil {
if err := w.apply.WithOwner(&addon).Apply(objectSet); err != nil {
return err
}
if w.clients == nil {
w.clients = map[schema.GroupVersionKind]*objectclient.ObjectClient{}
}
addon.Spec.Source = path
addon.Spec.Checksum = checksum
addon.Status.GVKs = nil
for gvk, client := range clients {
addon.Status.GVKs = append(addon.Status.GVKs, gvk)
w.clients[gvk] = client
}
if addon.UID == "" {
_, err := w.addons.Create(&addon)
return err
@ -175,55 +152,16 @@ func (w *watcher) deploy(path string, compareChecksum bool) error {
return err
}
func (w *watcher) addon(name string) (v1.Addon, error) {
func (w *watcher) addon(name string) (v12.Addon, error) {
addon, err := w.addonCache.Get(ns, name)
if errors.IsNotFound(err) {
addon = v1.NewAddon(ns, name, v1.Addon{})
addon = v12.NewAddon(ns, name, v12.Addon{})
} else if err != nil {
return v1.Addon{}, err
return v12.Addon{}, err
}
return *addon, nil
}
func (w *watcher) apply(addon v1.Addon, set *objectset.ObjectSet) (map[schema.GroupVersionKind]*objectclient.ObjectClient, error) {
var (
err error
)
op := objectset.NewProcessor(addon.Name)
op.AllowDiscovery(w.discovery, w.restConfig)
ds := op.NewDesiredSet(nil, set)
for _, gvk := range addon.Status.GVKs {
var (
namespaced bool
)
client, ok := w.clients[gvk]
if ok {
namespaced = w.namespaced[gvk]
} else {
client, namespaced, err = objectset.NewDiscoveredClient(gvk, w.restConfig, w.discovery)
if err != nil {
return nil, err
}
if w.namespaced == nil {
w.namespaced = map[schema.GroupVersionKind]bool{}
}
w.namespaced[gvk] = namespaced
}
ds.AddDiscoveredClient(gvk, client, namespaced)
}
if err := ds.Apply(); err != nil {
return nil, err
}
return ds.DiscoveredClients(), nil
}
func objectSet(content []byte) (*objectset.ObjectSet, error) {
objs, err := yamlToObjects(bytes.NewBuffer(content))
if err != nil {

View File

@ -5,38 +5,39 @@ import (
"strings"
"github.com/pkg/errors"
coreclient "github.com/rancher/k3s/types/apis/core/v1"
coreclient "github.com/rancher/k3s/pkg/generated/controllers/core/v1"
"github.com/sirupsen/logrus"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
)
func Register(ctx context.Context) error {
clients := coreclient.ClientsFrom(ctx)
func Register(ctx context.Context, configMap coreclient.ConfigMapController, nodes coreclient.NodeController) error {
h := &handler{
configCache: clients.ConfigMap.Cache(),
configClient: clients.ConfigMap,
configCache: configMap.Cache(),
configClient: configMap,
}
clients.Node.OnChange(ctx, "node", h.onChange)
clients.Node.OnRemove(ctx, "node", h.onRemove)
nodes.OnChange(ctx, "node", h.onChange)
nodes.OnRemove(ctx, "node", h.onRemove)
return nil
}
type handler struct {
configCache coreclient.ConfigMapClientCache
configCache coreclient.ConfigMapCache
configClient coreclient.ConfigMapClient
}
func (h *handler) onChange(node *core.Node) (runtime.Object, error) {
func (h *handler) onChange(key string, node *core.Node) (*core.Node, error) {
if node == nil {
return nil, nil
}
return h.updateHosts(node, false)
}
func (h *handler) onRemove(node *core.Node) (runtime.Object, error) {
func (h *handler) onRemove(key string, node *core.Node) (*core.Node, error) {
return h.updateHosts(node, true)
}
func (h *handler) updateHosts(node *core.Node, removed bool) (runtime.Object, error) {
func (h *handler) updateHosts(node *core.Node, removed bool) (*core.Node, error) {
var (
newHosts string
nodeAddress string

65
pkg/proxy/proxy_server.go Normal file
View File

@ -0,0 +1,65 @@
package proxy
import (
"crypto/tls"
"crypto/x509"
"net/http"
"net/url"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/proxy"
"k8s.io/client-go/rest"
)
var (
er = &errorResponder{}
)
type errorResponder struct {
}
func (e *errorResponder) Error(w http.ResponseWriter, req *http.Request, err error) {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
}
type SimpleProxy struct {
url *url.URL
transport http.RoundTripper
overrideHostHeader bool
}
func NewSimpleProxy(host string, caData []byte, overrideHostHeader bool) (*SimpleProxy, error) {
hostURL, _, err := rest.DefaultServerURL(host, "", schema.GroupVersion{}, true)
if err != nil {
return nil, err
}
ht := &http.Transport{}
if len(caData) > 0 {
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(caData)
ht.TLSClientConfig = &tls.Config{
RootCAs: certPool,
}
}
return &SimpleProxy{
url: hostURL,
transport: ht,
overrideHostHeader: overrideHostHeader,
}, nil
}
func (s *SimpleProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
u := *s.url
u.Path = req.URL.Path
u.RawQuery = req.URL.RawQuery
req.URL.Scheme = "https"
req.URL.Host = req.Host
if s.overrideHostHeader {
req.Host = u.Host
}
httpProxy := proxy.NewUpgradeAwareHandler(&u, s.transport, false, false, er)
httpProxy.ServeHTTP(rw, req)
}

View File

@ -4,21 +4,20 @@ import (
"context"
"time"
coreClients "github.com/rancher/k3s/pkg/generated/controllers/core/v1"
"github.com/rancher/k3s/pkg/rootless"
coreClients "github.com/rancher/k3s/types/apis/core/v1"
"github.com/rootless-containers/rootlesskit/pkg/api/client"
"github.com/rootless-containers/rootlesskit/pkg/port"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
)
var (
all = "_all_"
)
func Register(ctx context.Context, httpsPort int) error {
func Register(ctx context.Context, serviceController coreClients.ServiceController, httpsPort int) error {
var (
err error
rootlessClient client.Client
@ -28,7 +27,6 @@ func Register(ctx context.Context, httpsPort int) error {
return nil
}
coreClients := coreClients.ClientsFrom(ctx)
for i := 0; i < 30; i++ {
rootlessClient, err = client.New(rootless.Sock)
if err == nil {
@ -44,26 +42,26 @@ func Register(ctx context.Context, httpsPort int) error {
h := &handler{
rootlessClient: rootlessClient,
serviceClient: coreClients.Service,
serviceCache: coreClients.Service.Cache(),
serviceClient: serviceController,
serviceCache: serviceController.Cache(),
httpsPort: httpsPort,
ctx: ctx,
}
coreClients.Service.Interface().Controller().AddHandler(ctx, "rootlessports", h.serviceChanged)
coreClients.Service.Enqueue("", all)
serviceController.OnChange(ctx, "rootlessports", h.serviceChanged)
serviceController.Enqueue("", all)
return nil
}
type handler struct {
rootlessClient client.Client
serviceClient coreClients.ServiceClient
serviceCache coreClients.ServiceClientCache
serviceClient coreClients.ServiceController
serviceCache coreClients.ServiceCache
httpsPort int
ctx context.Context
}
func (h *handler) serviceChanged(key string, svc *v1.Service) (runtime.Object, error) {
func (h *handler) serviceChanged(key string, svc *v1.Service) (*v1.Service, error) {
if key != all {
h.serviceClient.Enqueue("", all)
return svc, nil

61
pkg/server/context.go Normal file
View File

@ -0,0 +1,61 @@
package server
import (
"context"
"github.com/rancher/k3s/pkg/generated/controllers/apps"
"github.com/rancher/k3s/pkg/generated/controllers/core"
"github.com/rancher/k3s/pkg/generated/controllers/k3s.cattle.io"
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/crd"
"github.com/rancher/wrangler/pkg/start"
"k8s.io/client-go/rest"
"k8s.io/kubernetes/staging/src/k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/clientcmd"
)
type Context struct {
K3s *k3s.Factory
Apps *apps.Factory
Core *core.Factory
K8s kubernetes.Interface
Apply apply.Apply
}
func (c *Context) Start(ctx context.Context) error {
return start.All(ctx, 5, c.K3s, c.Apps, c.Core)
}
func newContext(ctx context.Context, cfg string) (*Context, error) {
restConfig, err := clientcmd.BuildConfigFromFlags("", cfg)
if err != nil {
return nil, err
}
if err := crds(ctx, restConfig); err != nil {
return nil, err
}
k8s := kubernetes.NewForConfigOrDie(restConfig)
return &Context{
K3s: k3s.NewFactoryFromConfigOrDie(restConfig),
K8s: k8s,
Apps: apps.NewFactoryFromConfigOrDie(restConfig),
Core: core.NewFactoryFromConfigOrDie(restConfig),
Apply: apply.New(k8s, apply.NewClientFactory(restConfig)),
}, nil
}
func crds(ctx context.Context, config *rest.Config) error {
factory, err := crd.NewFactoryFromClient(config)
if err != nil {
return err
}
factory.BatchCreateCRDs(ctx, crd.NamespacedTypes(
"ListenerConfig.k3s.cattle.io/v1",
"Addon.k3s.cattle.io/v1",
"HelmChart.k3s.cattle.io/v1")...)
return factory.BatchWait()
}

View File

@ -15,9 +15,9 @@ import (
"strings"
"github.com/gorilla/mux"
certutil "github.com/rancher/dynamiclistener/cert"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/openapi"
certutil "github.com/rancher/norman/pkg/cert"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/kubernetes/pkg/master"

View File

@ -14,26 +14,19 @@ import (
"time"
"github.com/pkg/errors"
"github.com/rancher/dynamiclistener"
"github.com/rancher/k3s/pkg/clientaccess"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/daemons/control"
"github.com/rancher/k3s/pkg/datadir"
"github.com/rancher/k3s/pkg/deploy"
"github.com/rancher/k3s/pkg/helm"
"github.com/rancher/k3s/pkg/node"
"github.com/rancher/k3s/pkg/rootlessports"
"github.com/rancher/k3s/pkg/servicelb"
"github.com/rancher/k3s/pkg/static"
"github.com/rancher/k3s/pkg/tls"
appsv1 "github.com/rancher/k3s/types/apis/apps/v1"
batchv1 "github.com/rancher/k3s/types/apis/batch/v1"
corev1 "github.com/rancher/k3s/types/apis/core/v1"
v1 "github.com/rancher/k3s/types/apis/k3s.cattle.io/v1"
rbacv1 "github.com/rancher/k3s/types/apis/rbac.authorization.k8s.io/v1"
"github.com/rancher/norman"
"github.com/rancher/norman/pkg/clientaccess"
"github.com/rancher/norman/pkg/dynamiclistener"
"github.com/rancher/norman/pkg/resolvehome"
"github.com/rancher/norman/types"
"github.com/rancher/wrangler/pkg/leader"
"github.com/rancher/wrangler/pkg/resolvehome"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/net"
)
@ -44,7 +37,6 @@ func resolveDataDir(dataDir string) (string, error) {
}
func StartServer(ctx context.Context, config *Config) (string, error) {
if err := setupDataDirAndChdir(&config.ControlConfig); err != nil {
return "", err
}
@ -57,7 +49,7 @@ func StartServer(ctx context.Context, config *Config) (string, error) {
return "", errors.Wrap(err, "starting kubernetes")
}
certs, err := startNorman(ctx, config)
certs, err := startWrangler(ctx, config)
if err != nil {
return "", errors.Wrap(err, "starting tls server")
}
@ -76,7 +68,7 @@ func StartServer(ctx context.Context, config *Config) (string, error) {
return certs, nil
}
func startNorman(ctx context.Context, config *Config) (string, error) {
func startWrangler(ctx context.Context, config *Config) (string, error) {
var (
err error
tlsServer dynamiclistener.ServerInterface
@ -91,76 +83,90 @@ func startNorman(ctx context.Context, config *Config) (string, error) {
return tlsServer.CACert()
})
normanConfig := &norman.Config{
Name: "k3s",
KubeConfig: controlConfig.Runtime.KubeConfigSystem,
Clients: []norman.ClientFactory{
v1.Factory,
appsv1.Factory,
corev1.Factory,
batchv1.Factory,
rbacv1.Factory,
},
Schemas: []*types.Schemas{
v1.Schemas,
},
CRDs: map[*types.APIVersion][]string{
&v1.APIVersion: {
v1.ListenerConfigGroupVersionKind.Kind,
v1.AddonGroupVersionKind.Kind,
v1.HelmChartGroupVersionKind.Kind,
},
},
IgnoredKubeConfigEnv: true,
GlobalSetup: func(ctx context.Context) (context.Context, error) {
tlsServer, err = tls.NewServer(ctx, v1.ClientsFrom(ctx).ListenerConfig, *tlsConfig)
return ctx, err
},
DisableLeaderElection: true,
MasterControllers: []norman.ControllerRegister{
node.Register,
helm.Register,
func(ctx context.Context) error {
return servicelb.Register(ctx, norman.GetServer(ctx).K8sClient, !config.DisableServiceLB,
config.Rootless)
},
func(ctx context.Context) error {
dataDir := filepath.Join(controlConfig.DataDir, "static")
return static.Stage(dataDir)
},
func(ctx context.Context) error {
dataDir := filepath.Join(controlConfig.DataDir, "manifests")
templateVars := map[string]string{"%{CLUSTER_DNS}%": controlConfig.ClusterDNS.String(), "%{CLUSTER_DOMAIN}%": controlConfig.ClusterDomain}
if err := deploy.Stage(dataDir, templateVars, controlConfig.Skips); err != nil {
return err
}
if err := deploy.WatchFiles(ctx, dataDir); err != nil {
return err
}
return nil
},
func(ctx context.Context) error {
if !config.DisableServiceLB && config.Rootless {
return rootlessports.Register(ctx, config.TLSConfig.HTTPSPort)
}
return nil
},
},
}
if _, _, err = normanConfig.Build(ctx, nil); err != nil {
sc, err := newContext(ctx, controlConfig.Runtime.KubeConfigSystem)
if err != nil {
return "", err
}
for {
certs, err := tlsServer.CACert()
if err := stageFiles(ctx, sc, controlConfig); err != nil {
return "", err
}
tlsServer, err = tls.NewServer(ctx, sc.K3s.K3s().V1().ListenerConfig(), *tlsConfig)
if err != nil {
return "", err
}
if err := sc.Start(ctx); err != nil {
return "", err
}
certs := ""
for certs == "" {
certs, err = tlsServer.CACert()
if err != nil {
logrus.Infof("waiting to generate CA certs")
time.Sleep(time.Second)
continue
}
return certs, nil
}
go leader.RunOrDie(ctx, "", "k3s", sc.K8s, func(ctx context.Context) {
if err := masterControllers(ctx, sc, config); err != nil {
panic(err)
}
if err := sc.Start(ctx); err != nil {
panic(err)
}
})
return certs, nil
}
func masterControllers(ctx context.Context, sc *Context, config *Config) error {
if err := node.Register(ctx, sc.Core.Core().V1().ConfigMap(), sc.Core.Core().V1().Node()); err != nil {
return err
}
//helm.Register
if err := servicelb.Register(ctx,
sc.K8s,
sc.Apply,
sc.Apps.Apps().V1().DaemonSet(),
sc.Apps.Apps().V1().Deployment(),
sc.Core.Core().V1().Node(),
sc.Core.Core().V1().Pod(),
sc.Core.Core().V1().Service(),
sc.Core.Core().V1().Endpoints(),
!config.DisableServiceLB, config.Rootless); err != nil {
return err
}
if !config.DisableServiceLB && config.Rootless {
return rootlessports.Register(ctx, sc.Core.Core().V1().Service(), config.TLSConfig.HTTPSPort)
}
return nil
}
func stageFiles(ctx context.Context, sc *Context, controlConfig *config.Control) error {
dataDir := filepath.Join(controlConfig.DataDir, "static")
if err := static.Stage(dataDir); err != nil {
return err
}
dataDir = filepath.Join(controlConfig.DataDir, "manifests")
templateVars := map[string]string{
"%{CLUSTER_DNS}%": controlConfig.ClusterDNS.String(),
"%{CLUSTER_DOMAIN}%": controlConfig.ClusterDomain,
}
if err := deploy.Stage(dataDir, templateVars, controlConfig.Skips); err != nil {
return err
}
return deploy.WatchFiles(ctx, sc.K3s.K3s().V1().Addon(), dataDir)
}
func HomeKubeConfig(write, rootless bool) (string, error) {
@ -198,7 +204,6 @@ func printTokens(certs, advertiseIP string, tlsConfig *dynamiclistener.UserConfi
if len(nodeFile) > 0 {
printToken(tlsConfig.HTTPSPort, advertiseIP, "To join node to cluster:", "agent")
}
}
func writeKubeConfig(certs string, tlsConfig *dynamiclistener.UserConfig, config *Config) {

View File

@ -1,8 +1,8 @@
package server
import (
"github.com/rancher/dynamiclistener"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/norman/pkg/dynamiclistener"
)
type Config struct {

View File

@ -6,12 +6,15 @@ import (
"sort"
"strconv"
appclient "github.com/rancher/k3s/types/apis/apps/v1"
coreclient "github.com/rancher/k3s/types/apis/core/v1"
"github.com/rancher/norman/condition"
"github.com/rancher/norman/pkg/changeset"
"github.com/rancher/norman/pkg/objectset"
"github.com/rancher/norman/types/slice"
"github.com/rancher/wrangler/pkg/slice"
"github.com/rancher/wrangler/pkg/relatedresource"
appclient "github.com/rancher/k3s/pkg/generated/controllers/apps/v1"
coreclient "github.com/rancher/k3s/pkg/generated/controllers/core/v1"
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/condition"
"github.com/rancher/wrangler/pkg/objectset"
"github.com/sirupsen/logrus"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
@ -37,31 +40,37 @@ var (
trueVal = true
)
func Register(ctx context.Context, kubernetes kubernetes.Interface, enabled, rootless bool) error {
clients := coreclient.ClientsFrom(ctx)
appClients := appclient.ClientsFrom(ctx)
func Register(ctx context.Context,
kubernetes kubernetes.Interface,
apply apply.Apply,
daemonSetController appclient.DaemonSetController,
deployments appclient.DeploymentController,
nodes coreclient.NodeController,
pods coreclient.PodController,
services coreclient.ServiceController,
endpoints coreclient.EndpointsController,
enabled, rootless bool) error {
h := &handler{
rootless: rootless,
enabled: enabled,
nodeCache: clients.Node.Cache(),
podCache: clients.Pod.Cache(),
deploymentCache: appClients.Deployment.Cache(),
processor: objectset.NewProcessor("svccontroller").
Client(appClients.DaemonSet),
serviceCache: clients.Service.Cache(),
nodeCache: nodes.Cache(),
podCache: pods.Cache(),
deploymentCache: deployments.Cache(),
processor: apply.WithSetID("svccontroller").
WithCacheTypes(daemonSetController),
serviceCache: services.Cache(),
services: kubernetes.CoreV1(),
daemonsets: kubernetes.AppsV1(),
deployments: kubernetes.AppsV1(),
}
clients.Service.OnChange(ctx, "svccontroller", h.onChangeService)
clients.Node.OnChange(ctx, "svccontroller", h.onChangeNode)
changeset.Watch(ctx, "svccontroller-watcher",
services.OnChange(ctx, "svccontroller", h.onChangeService)
nodes.OnChange(ctx, "svccontroller", h.onChangeNode)
relatedresource.Watch(ctx, "svccontroller-watcher",
h.onResourceChange,
clients.Service,
clients.Pod,
clients.Endpoints)
services,
pods,
endpoints)
return nil
}
@ -69,19 +78,19 @@ func Register(ctx context.Context, kubernetes kubernetes.Interface, enabled, roo
type handler struct {
rootless bool
enabled bool
nodeCache coreclient.NodeClientCache
podCache coreclient.PodClientCache
deploymentCache appclient.DeploymentClientCache
processor *objectset.Processor
serviceCache coreclient.ServiceClientCache
nodeCache coreclient.NodeCache
podCache coreclient.PodCache
deploymentCache appclient.DeploymentCache
processor apply.Apply
serviceCache coreclient.ServiceCache
services coregetter.ServicesGetter
daemonsets v1getter.DaemonSetsGetter
deployments v1getter.DeploymentsGetter
}
func (h *handler) onResourceChange(name, namespace string, obj runtime.Object) ([]changeset.Key, error) {
func (h *handler) onResourceChange(name, namespace string, obj runtime.Object) ([]relatedresource.Key, error) {
if ep, ok := obj.(*core.Endpoints); ok {
return []changeset.Key{
return []relatedresource.Key{
{
Name: ep.Name,
Namespace: ep.Namespace,
@ -103,7 +112,7 @@ func (h *handler) onResourceChange(name, namespace string, obj runtime.Object) (
return nil, nil
}
return []changeset.Key{
return []relatedresource.Key{
{
Name: serviceName,
Namespace: pod.Namespace,
@ -111,7 +120,11 @@ func (h *handler) onResourceChange(name, namespace string, obj runtime.Object) (
}, nil
}
func (h *handler) onChangeService(svc *core.Service) (runtime.Object, error) {
func (h *handler) onChangeService(key string, svc *core.Service) (*core.Service, error) {
if svc == nil {
return nil, nil
}
if svc.Spec.Type != core.ServiceTypeLoadBalancer || svc.Spec.ClusterIP == "" ||
svc.Spec.ClusterIP == "None" {
return svc, nil
@ -126,7 +139,10 @@ func (h *handler) onChangeService(svc *core.Service) (runtime.Object, error) {
return nil, err
}
func (h *handler) onChangeNode(node *core.Node) (runtime.Object, error) {
func (h *handler) onChangeNode(key string, node *core.Node) (*core.Node, error) {
if node == nil {
return nil, nil
}
if _, ok := node.Labels[daemonsetNodeLabel]; !ok {
return node, nil
}
@ -195,7 +211,7 @@ func (h *handler) podIPs(pods []*core.Pod) ([]string, error) {
continue
}
node, err := h.nodeCache.Get("", pod.Spec.NodeName)
node, err := h.nodeCache.Get(pod.Spec.NodeName)
if errors.IsNotFound(err) {
continue
} else if err != nil {
@ -227,7 +243,7 @@ func (h *handler) deployPod(svc *core.Service) error {
}
objs := objectset.NewObjectSet()
if !h.enabled {
return h.processor.NewDesiredSet(svc, objs).Apply()
return h.processor.WithOwner(svc).Apply(objs)
}
ds, err := h.newDaemonSet(svc)
@ -237,7 +253,7 @@ func (h *handler) deployPod(svc *core.Service) error {
if ds != nil {
objs.Add(ds)
}
return h.processor.NewDesiredSet(svc, objs).Apply()
return h.processor.WithOwner(svc).Apply(objs)
}
func (h *handler) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
@ -335,7 +351,7 @@ func (h *handler) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
if err != nil {
return nil, err
}
nodesWithLabel, err := h.nodeCache.List("", selector)
nodesWithLabel, err := h.nodeCache.List(selector)
if err != nil {
return nil, err
}

View File

@ -3,11 +3,11 @@ package tls
import (
"context"
v1 "github.com/rancher/k3s/types/apis/k3s.cattle.io/v1"
"github.com/rancher/norman/pkg/dynamiclistener"
"github.com/rancher/dynamiclistener"
v1 "github.com/rancher/k3s/pkg/apis/k3s.cattle.io/v1"
k3sclient "github.com/rancher/k3s/pkg/generated/controllers/k3s.cattle.io/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
const (
@ -15,14 +15,21 @@ const (
name = "tls-config"
)
func NewServer(ctx context.Context, listenerClient v1.ListenerConfigClient, config dynamiclistener.UserConfig) (dynamiclistener.ServerInterface, error) {
func NewServer(ctx context.Context, listenerConfigs k3sclient.ListenerConfigController, config dynamiclistener.UserConfig) (dynamiclistener.ServerInterface, error) {
storage := &listenerConfigStorage{
client: listenerClient,
cache: listenerClient.Cache(),
client: listenerConfigs,
cache: listenerConfigs.Cache(),
}
server, err := dynamiclistener.NewServer(storage, config)
listenerClient.OnChange(ctx, "listen-config", func(obj *v1.ListenerConfig) (runtime.Object, error) {
if err != nil {
return nil, err
}
listenerConfigs.OnChange(ctx, "listen-config", func(key string, obj *v1.ListenerConfig) (*v1.ListenerConfig, error) {
if obj == nil {
return nil, nil
}
return obj, server.Update(fromStorage(obj))
})
@ -30,8 +37,8 @@ func NewServer(ctx context.Context, listenerClient v1.ListenerConfigClient, conf
}
type listenerConfigStorage struct {
cache v1.ListenerConfigClientCache
client v1.ListenerConfigClient
cache k3sclient.ListenerConfigCache
client k3sclient.ListenerConfigClient
}
func (l *listenerConfigStorage) Set(config *dynamiclistener.ListenerStatus) (*dynamiclistener.ListenerStatus, error) {
@ -65,6 +72,9 @@ func (l *listenerConfigStorage) Get() (*dynamiclistener.ListenerStatus, error) {
if errors.IsNotFound(err) {
obj, err = l.client.Get(ns, name, metav1.GetOptions{})
}
if errors.IsNotFound(err) {
return &dynamiclistener.ListenerStatus{}, nil
}
return fromStorage(obj), err
}

View File

@ -0,0 +1,16 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/