Add embedded etcd support

This is replaces dqlite with etcd.  The each same UX of dqlite is
followed so there is no change to the CLI args for this.
This commit is contained in:
Darren Shepherd 2020-05-05 15:02:16 -07:00
parent 39571424dd
commit 6b5b69378f
9 changed files with 642 additions and 8 deletions

2
go.mod
View File

@ -110,6 +110,8 @@ require (
github.com/spf13/pflag v1.0.5
github.com/tchap/go-patricia v2.3.0+incompatible // indirect
github.com/urfave/cli v1.22.2
// e694b7bb0875 is v3.4.7
go.etcd.io/etcd v0.5.0-alpha.5.0.20200401174654-e694b7bb0875
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975
golang.org/x/net v0.0.0-20191204025024-5ee1b9f4859a
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e

12
pkg/cluster/etcd.go Normal file
View File

@ -0,0 +1,12 @@
// +build !no_etcd
package cluster
import (
"github.com/rancher/k3s/pkg/cluster/managed"
"github.com/rancher/k3s/pkg/etcd"
)
func init() {
managed.RegisterDriver(&etcd.ETCD{})
}

View File

@ -1,6 +1,7 @@
package config
import (
"context"
"crypto/tls"
"fmt"
"net"

View File

@ -19,6 +19,8 @@ import (
"text/template"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"github.com/pkg/errors"
certutil "github.com/rancher/dynamiclistener/cert"
"github.com/rancher/k3s/pkg/clientaccess"
@ -209,7 +211,7 @@ func apiServer(ctx context.Context, cfg *config.Control, runtime *config.Control
args := config.GetArgsList(argsMap, cfg.ExtraAPIArgs)
logrus.Infof("Running kube-apiserver %s", config.ArgString(args))
return executor.APIServer(ctx, args)
return executor.APIServer(ctx, runtime.ETCDReady, args)
}
func defaults(config *config.Control) {
@ -963,6 +965,19 @@ func waitForAPIServerInBackground(ctx context.Context, runtime *config.ControlRu
go func() {
defer close(done)
etcdLoop:
for {
select {
case <-ctx.Done():
return
case <-runtime.ETCDReady:
break etcdLoop
case <-time.After(30 * time.Second):
logrus.Infof("Waiting for etcd server to become available")
}
}
logrus.Infof("Waiting for API server to become available")
for {
select {

View File

@ -45,7 +45,8 @@ func (Embedded) KubeProxy(args []string) error {
return nil
}
func (Embedded) APIServer(ctx context.Context, args []string) (authenticator.Request, http.Handler, error) {
func (Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) (authenticator.Request, http.Handler, error) {
<-etcdReady
command := app.NewAPIServerCommand(ctx.Done())
command.SetArgs(args)

View File

@ -0,0 +1,33 @@
// +build !no_embedded_executor
package executor
import (
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/embed"
)
func (e Embedded) CurrentETCDOptions() (InitialOptions, error) {
return InitialOptions{}, nil
}
func (e Embedded) ETCD(args ETCDConfig) error {
configFile, err := args.ToConfigFile()
if err != nil {
return err
}
cfg, err := embed.ConfigFromFile(configFile)
if err != nil {
return err
}
etcd, err := embed.StartEtcd(cfg)
if err != nil {
return nil
}
go func() {
err := <-etcd.Err()
logrus.Fatalf("etcd exited: %v", err)
}()
return nil
}

View File

@ -2,22 +2,76 @@ package executor
import (
"context"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"sigs.k8s.io/yaml"
"k8s.io/apiserver/pkg/authentication/authenticator"
)
var (
executor Executor
)
type Executor interface {
Kubelet(args []string) error
KubeProxy(args []string) error
APIServer(ctx context.Context, args []string) (authenticator.Request, http.Handler, error)
APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) (authenticator.Request, http.Handler, error)
Scheduler(apiReady <-chan struct{}, args []string) error
ControllerManager(apiReady <-chan struct{}, args []string) error
CurrentETCDOptions() (InitialOptions, error)
ETCD(args ETCDConfig) error
}
var (
executor Executor
)
type ETCDConfig struct {
InitialOptions `json:",inline"`
Name string `json:"name,omitempty"`
ListenClientURLs string `json:"listen-client-urls,omitempty"`
ListenMetricsURLs string `json:"listen-metrics-urls,omitempty"`
ListenPeerURLs string `json:"listen-peer-urls,omitempty"`
AdvertiseClientURLs string `json:"advertise-client-urls,omitempty"`
DataDir string `json:"data-dir,omitempty"`
SnapshotCount int `json:"snapshot-count,omitempty"`
ServerTrust ServerTrust `json:"client-transport-security"`
PeerTrust PeerTrust `json:"peer-transport-security"`
ForceNewCluster bool `json:"force-new-cluster,omitempty"`
}
type ServerTrust struct {
CertFile string `json:"cert-file"`
KeyFile string `json:"key-file"`
ClientCertAuth bool `json:"client-cert-auth"`
TrustedCAFile string `json:"trusted-ca-file"`
}
type PeerTrust struct {
CertFile string `json:"cert-file"`
KeyFile string `json:"key-file"`
ClientCertAuth bool `json:"client-cert-auth"`
TrustedCAFile string `json:"trusted-ca-file"`
}
type InitialOptions struct {
AdvertisePeerURL string `json:"initial-advertise-peer-urls,omitempty"`
Cluster string `json:"initial-cluster,omitempty"`
State string `json:"initial-cluster-state,omitempty"`
}
func (e ETCDConfig) ToConfigFile() (string, error) {
confFile := filepath.Join(e.DataDir, "config")
bytes, err := yaml.Marshal(&e)
if err != nil {
return "", err
}
if err := os.MkdirAll(e.DataDir, 0700); err != nil {
return "", err
}
return confFile, ioutil.WriteFile(confFile, bytes, 0600)
}
func Set(driver Executor) {
executor = driver
@ -31,8 +85,8 @@ func KubeProxy(args []string) error {
return executor.KubeProxy(args)
}
func APIServer(ctx context.Context, args []string) (authenticator.Request, http.Handler, error) {
return executor.APIServer(ctx, args)
func APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) (authenticator.Request, http.Handler, error) {
return executor.APIServer(ctx, etcdReady, args)
}
func Scheduler(apiReady <-chan struct{}, args []string) error {
@ -42,3 +96,11 @@ func Scheduler(apiReady <-chan struct{}, args []string) error {
func ControllerManager(apiReady <-chan struct{}, args []string) error {
return executor.ControllerManager(apiReady, args)
}
func CurrentETCDOptions() (InitialOptions, error) {
return executor.CurrentETCDOptions()
}
func ETCD(args ETCDConfig) error {
return executor.ETCD(args)
}

89
pkg/etcd/controller.go Normal file
View File

@ -0,0 +1,89 @@
package etcd
import (
"context"
"os"
"time"
controllerv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
)
const (
nodeID = "etcd.k3s.cattle.io/node-name"
nodeAddress = "etcd.k3s.cattle.io/node-address"
master = "node-role.kubernetes.io/master"
etcdRole = "node-role.kubernetes.io/etcd"
)
type NodeControllerGetter func() controllerv1.NodeController
func Register(ctx context.Context, etcd *ETCD, nodes controllerv1.NodeController) {
h := &handler{
etcd: etcd,
nodeController: nodes,
ctx: ctx,
}
nodes.OnChange(ctx, "managed-etcd-controller", h.sync)
nodes.OnRemove(ctx, "managed-etcd-controller", h.onRemove)
}
type handler struct {
etcd *ETCD
nodeController controllerv1.NodeController
ctx context.Context
}
func (h *handler) sync(key string, node *v1.Node) (*v1.Node, error) {
if node == nil {
return nil, nil
}
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
logrus.Debug("waiting for node to be assigned for etcd controller")
h.nodeController.EnqueueAfter(key, 5*time.Second)
return node, nil
}
if key == nodeName {
return h.handleSelf(node)
}
return node, nil
}
func (h *handler) handleSelf(node *v1.Node) (*v1.Node, error) {
if node.Annotations[nodeID] == h.etcd.name &&
node.Annotations[nodeAddress] == h.etcd.address &&
node.Labels[etcdRole] == "true" &&
node.Labels[master] == "true" {
return node, nil
}
node = node.DeepCopy()
if node.Annotations == nil {
node.Annotations = map[string]string{}
}
node.Annotations[nodeID] = h.etcd.name
node.Annotations[nodeAddress] = h.etcd.address
node.Labels[etcdRole] = "true"
node.Labels[master] = "true"
return h.nodeController.Update(node)
}
func (h *handler) onRemove(key string, node *v1.Node) (*v1.Node, error) {
if _, ok := node.Labels[etcdRole]; !ok {
return node, nil
}
id := node.Annotations[nodeID]
address := node.Annotations[nodeAddress]
if address == "" {
return node, nil
}
return node, h.etcd.removePeer(h.ctx, id, address)
}

419
pkg/etcd/etcd.go Normal file
View File

@ -0,0 +1,419 @@
package etcd
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"time"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/pkg/errors"
certutil "github.com/rancher/dynamiclistener/cert"
"github.com/rancher/k3s/pkg/clientaccess"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/daemons/executor"
"github.com/sirupsen/logrus"
etcd "go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
utilnet "k8s.io/apimachinery/pkg/util/net"
)
type ETCD struct {
client *etcd.Client
config *config.Control
name string
runtime *config.ControlRuntime
address string
}
type Members struct {
Members []*etcdserverpb.Member `json:"members"`
}
func (e *ETCD) EndpointName() string {
return "etcd"
}
func (e *ETCD) Test(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
members, err := e.client.MemberList(ctx)
if err != nil {
return err
}
var cluster []string
for _, member := range members.Members {
for _, peerURL := range member.PeerURLs {
if peerURL == e.peerURL() && e.name == member.Name {
return nil
}
}
if len(member.PeerURLs) > 0 {
cluster = append(cluster, fmt.Sprintf("%s=%s", member.Name, member.PeerURLs[0]))
}
}
msg := fmt.Sprintf("This server is a not a member of the etcd cluster "+"found %v and expecting to contain %s=%s", cluster, e.name, e.address)
logrus.Error(msg)
return fmt.Errorf(msg)
}
func walDir(config *config.Control) string {
return filepath.Join(dataDir(config), "member", "wal")
}
func dataDir(config *config.Control) string {
return filepath.Join(config.DataDir, "db", "etcd")
}
func nameFile(config *config.Control) string {
return filepath.Join(dataDir(config), "name")
}
func (e *ETCD) IsInitialized(ctx context.Context, config *config.Control) (bool, error) {
if s, err := os.Stat(walDir(config)); err == nil && s.IsDir() {
return true, nil
} else if os.IsNotExist(err) {
return false, nil
} else {
return false, errors.Wrapf(err, "failed to test if etcd is initialized")
}
}
func (e *ETCD) Reset(ctx context.Context) error {
go func() {
for {
time.Sleep(5 * time.Second)
if err := e.Test(ctx); err == nil {
members, err := e.client.MemberList(ctx)
if err != nil {
continue
}
if len(members.Members) == 1 && members.Members[0].Name == e.name {
logrus.Infof("etcd is running, restart without --cluster-reset flag now. Backup and delete ${datadir}/server/db on each peer etcd server and rejoin the nodes")
os.Exit(0)
}
}
}
}()
return e.newCluster(ctx, true)
}
func (e *ETCD) Start(ctx context.Context, clientAccess *clientaccess.Info) error {
existingCluster, err := e.IsInitialized(ctx, e.config)
if err != nil {
return errors.Wrapf(err, "failed to validation")
}
e.config.Runtime.ClusterControllerStart = func(ctx context.Context) error {
Register(ctx, e, e.config.Runtime.Core.Core().V1().Node())
return nil
}
if existingCluster {
opt, err := executor.CurrentETCDOptions()
if err != nil {
return err
}
return e.cluster(ctx, false, opt)
}
if clientAccess == nil {
return e.newCluster(ctx, false)
}
err = e.join(ctx, clientAccess)
return errors.Wrap(err, "joining etcd cluster")
}
func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) error {
resp, err := clientaccess.Get("/db/info", clientAccessInfo)
if err != nil {
return err
}
var memberList Members
if err := json.Unmarshal(resp, &memberList); err != nil {
return err
}
var clientURLs []string
for _, member := range memberList.Members {
clientURLs = append(clientURLs, member.ClientURLs...)
}
client, err := joinClient(ctx, e.runtime, clientURLs)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
var (
cluster []string
add = true
)
members, err := client.MemberList(ctx)
if err != nil {
logrus.Errorf("failed to get member list from cluster, will assume this member is already added")
members = &etcd.MemberListResponse{
Members: append(memberList.Members, &etcdserverpb.Member{
Name: e.name,
PeerURLs: []string{e.peerURL()},
}),
}
add = false
}
for _, member := range members.Members {
for _, peer := range member.PeerURLs {
u, err := url.Parse(peer)
if err != nil {
return err
}
// An uninitialized member won't have a name
if u.Hostname() == e.address && (member.Name == e.name || member.Name == "") {
add = false
}
if member.Name == "" && u.Hostname() == e.address {
member.Name = e.name
}
if len(member.PeerURLs) > 0 {
cluster = append(cluster, fmt.Sprintf("%s=%s", member.Name, member.PeerURLs[0]))
}
}
}
if add {
logrus.Infof("Adding %s to etcd cluster %v", e.peerURL(), cluster)
if _, err = client.MemberAdd(ctx, []string{e.peerURL()}); err != nil {
return err
}
cluster = append(cluster, fmt.Sprintf("%s=%s", e.name, e.peerURL()))
}
logrus.Infof("Starting etcd for cluster %v", cluster)
return e.cluster(ctx, false, executor.InitialOptions{
Cluster: strings.Join(cluster, ","),
State: "existing",
})
}
func (e *ETCD) Register(ctx context.Context, config *config.Control, l net.Listener, handler http.Handler) (net.Listener, http.Handler, error) {
e.config = config
e.runtime = config.Runtime
client, err := newClient(ctx, e.runtime)
if err != nil {
return nil, nil, err
}
e.client = client
address, err := getAdvertiseAddress(config.AdvertiseIP)
if err != nil {
return nil, nil, err
}
e.address = address
e.config.Datastore.Endpoint = "https://127.0.0.1:2379"
e.config.Datastore.Config.CAFile = e.runtime.ETCDServerCA
e.config.Datastore.Config.CertFile = e.runtime.ClientETCDCert
e.config.Datastore.Config.KeyFile = e.runtime.ClientETCDKey
if err := e.setName(); err != nil {
return nil, nil, err
}
return l, e.handler(handler), err
}
func (e *ETCD) setName() error {
fileName := nameFile(e.config)
data, err := ioutil.ReadFile(fileName)
if os.IsNotExist(err) {
h, err := os.Hostname()
if err != nil {
return err
}
e.name = strings.SplitN(h, ".", 2)[0] + "-" + uuid.New().String()[:8]
if err := os.MkdirAll(filepath.Dir(fileName), 0755); err != nil {
return err
}
return ioutil.WriteFile(fileName, []byte(e.name), 0655)
} else if err != nil {
return err
}
e.name = string(data)
return nil
}
func (e *ETCD) handler(next http.Handler) http.Handler {
mux := mux.NewRouter()
mux.Handle("/db/info", e.infoHandler())
mux.NotFoundHandler = next
return mux
}
func (e *ETCD) infoHandler() http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 2*time.Second)
defer cancel()
members, err := e.client.MemberList(ctx)
if err != nil {
json.NewEncoder(rw).Encode(&Members{
Members: []*etcdserverpb.Member{
{
Name: e.name,
PeerURLs: []string{e.peerURL()},
ClientURLs: []string{e.clientURL()},
},
},
})
return
}
rw.Header().Set("Content-Type", "application/json")
json.NewEncoder(rw).Encode(&Members{
Members: members.Members,
})
})
}
func joinClient(ctx context.Context, runtime *config.ControlRuntime, peers []string) (*etcd.Client, error) {
tlsConfig, err := toTLSConfig(runtime)
if err != nil {
return nil, err
}
cfg := etcd.Config{
Endpoints: peers,
TLS: tlsConfig,
Context: ctx,
}
return etcd.New(cfg)
}
func newClient(ctx context.Context, runtime *config.ControlRuntime) (*etcd.Client, error) {
tlsConfig, err := toTLSConfig(runtime)
if err != nil {
return nil, err
}
cfg := etcd.Config{
Context: ctx,
Endpoints: []string{"https://127.0.0.1:2379"},
TLS: tlsConfig,
}
return etcd.New(cfg)
}
func toTLSConfig(runtime *config.ControlRuntime) (*tls.Config, error) {
clientCert, err := tls.LoadX509KeyPair(runtime.ClientETCDCert, runtime.ClientETCDKey)
if err != nil {
return nil, err
}
pool, err := certutil.NewPool(runtime.ETCDServerCA)
if err != nil {
return nil, err
}
return &tls.Config{
RootCAs: pool,
Certificates: []tls.Certificate{clientCert},
}, nil
}
func getAdvertiseAddress(advertiseIP string) (string, error) {
ip := advertiseIP
if ip == "" {
ipAddr, err := utilnet.ChooseHostInterface()
if err != nil {
return "", err
}
ip = ipAddr.String()
}
return ip, nil
}
func (e *ETCD) newCluster(ctx context.Context, reset bool) error {
return e.cluster(ctx, reset, executor.InitialOptions{
AdvertisePeerURL: fmt.Sprintf("https://%s:2380", e.address),
Cluster: fmt.Sprintf("%s=https://%s:2380", e.name, e.address),
State: "new",
})
}
func (e *ETCD) peerURL() string {
return fmt.Sprintf("https://%s:2380", e.address)
}
func (e *ETCD) clientURL() string {
return fmt.Sprintf("https://%s:2379", e.address)
}
func (e *ETCD) cluster(ctx context.Context, forceNew bool, options executor.InitialOptions) error {
return executor.ETCD(executor.ETCDConfig{
Name: e.name,
InitialOptions: options,
ForceNewCluster: forceNew,
ListenClientURLs: fmt.Sprintf(e.clientURL() + ",https://127.0.0.1:2379"),
ListenMetricsURLs: fmt.Sprintf("http://127.0.0.1:2381"),
ListenPeerURLs: e.peerURL(),
AdvertiseClientURLs: e.clientURL(),
DataDir: dataDir(e.config),
ServerTrust: executor.ServerTrust{
CertFile: e.config.Runtime.ServerETCDCert,
KeyFile: e.config.Runtime.ServerETCDKey,
ClientCertAuth: true,
TrustedCAFile: e.config.Runtime.ETCDServerCA,
},
PeerTrust: executor.PeerTrust{
CertFile: e.config.Runtime.PeerServerClientETCDCert,
KeyFile: e.config.Runtime.PeerServerClientETCDKey,
ClientCertAuth: true,
TrustedCAFile: e.config.Runtime.ETCDPeerCA,
},
})
}
func (e *ETCD) removePeer(ctx context.Context, id, address string) error {
members, err := e.client.MemberList(ctx)
if err != nil {
return err
}
for _, member := range members.Members {
if member.Name != id {
continue
}
for _, peerURL := range member.PeerURLs {
u, err := url.Parse(peerURL)
if err != nil {
return err
}
if u.Hostname() == address {
logrus.Infof("Removing name=%s id=%d address=%s from etcd", member.Name, member.ID, address)
_, err := e.client.MemberRemove(ctx, member.ID)
return err
}
}
}
return nil
}