k3s/pkg/etcd/etcd.go
Vitor Savian e53c189587
Handle nil pointer when runtime core is not ready in etcd
Signed-off-by: Vitor <vitor.savian@suse.com>
2023-11-16 15:58:42 -08:00

1484 lines
44 KiB
Go

package etcd
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/fs"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"time"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/k3s-io/k3s/pkg/clientaccess"
"github.com/k3s-io/k3s/pkg/cluster/managed"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/control/deps"
"github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/k3s-io/kine/pkg/client"
endpoint2 "github.com/k3s-io/kine/pkg/endpoint"
cp "github.com/otiai10/copy"
"github.com/pkg/errors"
certutil "github.com/rancher/dynamiclistener/cert"
controllerv1 "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/etcdutl/v3/snapshot"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
nodeHelper "k8s.io/component-helpers/node/util"
nodeUtil "k8s.io/kubernetes/pkg/controller/util/node"
)
const (
testTimeout = time.Second * 30
manageTickerTime = time.Second * 15
learnerMaxStallTime = time.Minute * 5
memberRemovalTimeout = time.Minute * 1
// snapshotJitterMax defines the maximum time skew on cron-triggered snapshots. The actual jitter
// will be a random Duration somewhere between 0 and snapshotJitterMax.
snapshotJitterMax = time.Second * 5
// defaultDialTimeout is intentionally short so that connections timeout within the testTimeout defined above
defaultDialTimeout = 2 * time.Second
// other defaults from k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go
defaultKeepAliveTime = 30 * time.Second
defaultKeepAliveTimeout = 10 * time.Second
maxBackupRetention = 5
)
var (
learnerProgressKey = version.Program + "/etcd/learnerProgress"
// AddressKey will contain the value of api addresses list
AddressKey = version.Program + "/apiaddresses"
NodeNameAnnotation = "etcd." + version.Program + ".cattle.io/node-name"
NodeAddressAnnotation = "etcd." + version.Program + ".cattle.io/node-address"
ErrAddressNotSet = errors.New("apiserver addresses not yet set")
ErrNotMember = errNotMember()
invalidKeyChars = regexp.MustCompile(`[^-._a-zA-Z0-9]`)
)
type NodeControllerGetter func() controllerv1.NodeController
// explicit interface check
var _ managed.Driver = &ETCD{}
type ETCD struct {
client *clientv3.Client
config *config.Control
name string
address string
cron *cron.Cron
s3 *S3
cancel context.CancelFunc
snapshotSem *semaphore.Weighted
}
type learnerProgress struct {
ID uint64 `json:"id,omitempty"`
Name string `json:"name,omitempty"`
RaftAppliedIndex uint64 `json:"raftAppliedIndex,omitempty"`
LastProgress metav1.Time `json:"lastProgress,omitempty"`
}
// Members contains a slice that holds all
// members of the cluster.
type Members struct {
Members []*etcdserverpb.Member `json:"members"`
}
type MembershipError struct {
Self string
Members []string
}
func (e *MembershipError) Error() string {
return fmt.Sprintf("this server is a not a member of the etcd cluster. Found %v, expect: %s", e.Members, e.Self)
}
func (e *MembershipError) Is(target error) bool {
switch target {
case ErrNotMember:
return true
}
return false
}
func errNotMember() error { return &MembershipError{} }
// NewETCD creates a new value of type
// ETCD with an initialized cron value.
func NewETCD() *ETCD {
return &ETCD{
cron: cron.New(cron.WithLogger(cronLogger)),
}
}
// EndpointName returns the name of the endpoint.
func (e *ETCD) EndpointName() string {
return "etcd"
}
// SetControlConfig passes the cluster config into the etcd datastore. This is necessary
// because the config may not yet be fully built at the time the Driver instance is registered.
func (e *ETCD) SetControlConfig(config *config.Control) error {
if e.config != nil {
return errors.New("control config already set")
}
e.config = config
address, err := getAdvertiseAddress(e.config.PrivateIP)
if err != nil {
return err
}
e.address = address
return e.setName(false)
}
// Test ensures that the local node is a voting member of the target cluster,
// and that the datastore is defragmented and not in maintenance mode due to alarms.
// If it is still a learner or not a part of the cluster, an error is raised.
// If it cannot be defragmented or has any alarms that cannot be disarmed, an error is raised.
func (e *ETCD) Test(ctx context.Context) error {
if e.config == nil {
return errors.New("control config not set")
}
if e.client == nil {
return errors.New("etcd datastore is not started")
}
ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()
endpoints := getEndpoints(e.config)
status, err := e.client.Status(ctx, endpoints[0])
if err != nil {
return err
}
if status.IsLearner {
return errors.New("this server has not yet been promoted from learner to voting member")
}
if err := e.defragment(ctx); err != nil {
return errors.Wrap(err, "failed to defragment etcd database")
}
if err := e.clearAlarms(ctx); err != nil {
return errors.Wrap(err, "failed to report and disarm etcd alarms")
}
// refresh status to see if any errors remain after clearing alarms
status, err = e.client.Status(ctx, endpoints[0])
if err != nil {
return err
}
if len(status.Errors) > 0 {
return fmt.Errorf("etcd cluster errors: %s", strings.Join(status.Errors, ", "))
}
members, err := e.client.MemberList(ctx)
if err != nil {
return err
}
var memberNameUrls []string
for _, member := range members.Members {
for _, peerURL := range member.PeerURLs {
if peerURL == e.peerURL() && e.name == member.Name {
return nil
}
}
if len(member.PeerURLs) > 0 {
memberNameUrls = append(memberNameUrls, member.Name+"="+member.PeerURLs[0])
}
}
return &MembershipError{Members: memberNameUrls, Self: e.name + "=" + e.peerURL()}
}
// dbDir returns the path to dataDir/db/etcd
func dbDir(config *config.Control) string {
return filepath.Join(config.DataDir, "db", "etcd")
}
// walDir returns the path to etcdDBDir/member/wal
func walDir(config *config.Control) string {
return filepath.Join(dbDir(config), "member", "wal")
}
func sqliteFile(config *config.Control) string {
return filepath.Join(config.DataDir, "db", "state.db")
}
// nameFile returns the path to etcdDBDir/name.
func nameFile(config *config.Control) string {
return filepath.Join(dbDir(config), "name")
}
// clearReset removes the reset file
func (e *ETCD) clearReset() error {
if err := os.Remove(e.ResetFile()); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
// IsReset checks to see if the reset file exists, indicating that a cluster-reset has been completed successfully.
func (e *ETCD) IsReset() (bool, error) {
if e.config == nil {
return false, errors.New("control config not set")
}
if _, err := os.Stat(e.ResetFile()); err != nil {
if !os.IsNotExist(err) {
return false, err
}
return false, nil
}
return true, nil
}
// ResetFile returns the path to etcdDBDir/reset-flag.
func (e *ETCD) ResetFile() string {
if e.config == nil {
panic("control config not set")
}
return filepath.Join(e.config.DataDir, "db", "reset-flag")
}
// IsInitialized checks to see if a WAL directory exists. If so, we assume that etcd
// has already been brought up at least once.
func (e *ETCD) IsInitialized() (bool, error) {
if e.config == nil {
return false, errors.New("control config not set")
}
dir := walDir(e.config)
if s, err := os.Stat(dir); err == nil && s.IsDir() {
return true, nil
} else if os.IsNotExist(err) {
return false, nil
} else {
return false, errors.Wrap(err, "invalid state for wal directory "+dir)
}
}
// Reset resets an etcd node to a single node cluster.
func (e *ETCD) Reset(ctx context.Context, rebootstrap func() error) error {
// Wait for etcd to come up as a new single-node cluster, then exit
go func() {
<-e.config.Runtime.AgentReady
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for range t.C {
if err := e.Test(ctx); err == nil {
// reset the apiaddresses to nil since we are doing a restoration
if _, err := e.client.Put(ctx, AddressKey, ""); err != nil {
logrus.Warnf("failed to reset api addresses key in etcd: %v", err)
continue
}
members, err := e.client.MemberList(ctx)
if err != nil {
continue
}
if rebootstrap != nil {
// storageBootstrap() - runtime structure has been written with correct certificate data
if err := rebootstrap(); err != nil {
logrus.Fatal(err)
}
}
// call functions to rewrite them from daemons/control/server.go (prepare())
if err := deps.GenServerDeps(e.config); err != nil {
logrus.Fatal(err)
}
if len(members.Members) == 1 && members.Members[0].Name == e.name {
// Cancel the etcd server context and allow it time to shutdown cleanly.
// Ideally we would use a waitgroup and properly sequence shutdown of the various components.
e.cancel()
time.Sleep(time.Second * 5)
logrus.Infof("Managed etcd cluster membership has been reset, restart without --cluster-reset flag now. Backup and delete ${datadir}/server/db on each peer etcd server and rejoin the nodes")
os.Exit(0)
}
} else {
// make sure that peer ips are updated to the node ip in case the test fails
members, err := e.client.MemberList(ctx)
if err != nil {
logrus.Warnf("failed to list etcd members: %v", err)
continue
}
if len(members.Members) > 1 {
logrus.Warnf("failed to update peer url: etcd still has more than one member")
continue
}
if _, err := e.client.MemberUpdate(ctx, members.Members[0].ID, []string{e.peerURL()}); err != nil {
logrus.Warnf("failed to update peer url: %v", err)
continue
}
}
}
}()
if err := e.startClient(ctx); err != nil {
return err
}
// If asked to restore from a snapshot, do so
if e.config.ClusterResetRestorePath != "" {
if e.config.EtcdS3 {
logrus.Infof("Retrieving etcd snapshot %s from S3", e.config.ClusterResetRestorePath)
if err := e.initS3IfNil(ctx); err != nil {
return err
}
if err := e.s3.Download(ctx); err != nil {
return err
}
logrus.Infof("S3 download complete for %s", e.config.ClusterResetRestorePath)
}
info, err := os.Stat(e.config.ClusterResetRestorePath)
if os.IsNotExist(err) {
return fmt.Errorf("etcd: snapshot path does not exist: %s", e.config.ClusterResetRestorePath)
}
if info.IsDir() {
return fmt.Errorf("etcd: snapshot path must be a file, not a directory: %s", e.config.ClusterResetRestorePath)
}
if err := e.Restore(ctx); err != nil {
return err
}
}
if err := e.setName(true); err != nil {
return err
}
// touch a file to avoid multiple resets
if err := os.WriteFile(e.ResetFile(), []byte{}, 0600); err != nil {
return err
}
return e.newCluster(ctx, true)
}
// Start starts the datastore
func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) error {
isInitialized, err := e.IsInitialized()
if err != nil {
return errors.Wrapf(err, "failed to check for initialized etcd datastore")
}
if err := e.startClient(ctx); err != nil {
return err
}
if !e.config.EtcdDisableSnapshots {
e.setSnapshotFunction(ctx)
e.cron.Start()
}
go e.manageLearners(ctx)
if isInitialized {
//check etcd dir permission
etcdDir := dbDir(e.config)
info, err := os.Stat(etcdDir)
if err != nil {
return err
}
if info.Mode() != 0700 {
if err := os.Chmod(etcdDir, 0700); err != nil {
return err
}
}
opt, err := executor.CurrentETCDOptions()
if err != nil {
return err
}
logrus.Infof("Starting etcd for existing cluster member")
return e.cluster(ctx, false, opt)
}
if clientAccessInfo == nil {
return e.newCluster(ctx, false)
}
go func() {
for {
select {
case <-time.After(30 * time.Second):
logrus.Infof("Waiting for agent to become ready before joining etcd cluster")
case <-e.config.Runtime.AgentReady:
if err := wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
if err := e.join(ctx, clientAccessInfo); err != nil {
// Retry the join if waiting for another member to be promoted, or waiting for peers to connect after promotion
if errors.Is(err, rpctypes.ErrTooManyLearners) || errors.Is(err, rpctypes.ErrUnhealthy) {
logrus.Infof("Waiting for other members to finish joining etcd cluster: %v", err)
return false, nil
}
return false, err
}
return true, nil
}); err != nil {
logrus.Fatalf("etcd cluster join failed: %v", err)
}
return
case <-ctx.Done():
return
}
}
}()
return nil
}
// startClient sets up the config's datastore endpoints, and starts an etcd client connected to the server endpoint.
// The client is destroyed when the context is closed.
func (e *ETCD) startClient(ctx context.Context) error {
if e.client != nil {
return errors.New("etcd datastore already started")
}
endpoints := getEndpoints(e.config)
e.config.Datastore.Endpoint = endpoints[0]
e.config.Datastore.BackendTLSConfig.CAFile = e.config.Runtime.ETCDServerCA
e.config.Datastore.BackendTLSConfig.CertFile = e.config.Runtime.ClientETCDCert
e.config.Datastore.BackendTLSConfig.KeyFile = e.config.Runtime.ClientETCDKey
client, err := getClient(ctx, e.config, endpoints...)
if err != nil {
return err
}
e.client = client
go func() {
<-ctx.Done()
client := e.client
e.client = nil
client.Close()
}()
return nil
}
// join attempts to add a member to an existing cluster
func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) error {
clientCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
var (
cluster []string
add = true
)
clientURLs, memberList, err := ClientURLs(clientCtx, clientAccessInfo, e.config.PrivateIP)
if err != nil {
return err
}
client, err := getClient(clientCtx, e.config, clientURLs...)
if err != nil {
return err
}
defer client.Close()
for _, member := range memberList.Members {
for _, peer := range member.PeerURLs {
u, err := url.Parse(peer)
if err != nil {
return err
}
// An uninitialized joining member won't have a name; if it has our
// address it must be us.
if member.Name == "" && u.Hostname() == e.address {
member.Name = e.name
}
// If we're already in the cluster, don't try to add ourselves.
if member.Name == e.name && u.Hostname() == e.address {
add = false
}
if len(member.PeerURLs) > 0 {
cluster = append(cluster, fmt.Sprintf("%s=%s", member.Name, member.PeerURLs[0]))
}
}
// Try to get the node name from the member name
memberNodeName := member.Name
if lastHyphen := strings.LastIndex(member.Name, "-"); lastHyphen > 1 {
memberNodeName = member.Name[:lastHyphen]
}
// Make sure there's not already a member in the cluster with a duplicate node name
if member.Name != e.name && memberNodeName == e.config.ServerNodeName {
// make sure to remove the name file if a duplicate node name is used, so that we
// generate a new member name when our node name is fixed.
nameFile := nameFile(e.config)
if err := os.Remove(nameFile); err != nil {
logrus.Errorf("Failed to remove etcd name file %s: %v", nameFile, err)
}
return errors.New("duplicate node name found, please use a unique name for this node")
}
}
if add {
logrus.Infof("Adding member %s=%s to etcd cluster %v", e.name, e.peerURL(), cluster)
if _, err = client.MemberAddAsLearner(clientCtx, []string{e.peerURL()}); err != nil {
return err
}
cluster = append(cluster, fmt.Sprintf("%s=%s", e.name, e.peerURL()))
}
logrus.Infof("Starting etcd to join cluster with members %v", cluster)
return e.cluster(ctx, false, executor.InitialOptions{
Cluster: strings.Join(cluster, ","),
State: "existing",
})
}
// Register adds db info routes for the http request handler, and registers cluster controller callbacks
func (e *ETCD) Register(handler http.Handler) (http.Handler, error) {
e.config.Runtime.ClusterControllerStarts["etcd-node-metadata"] = func(ctx context.Context) {
registerMetadataHandlers(ctx, e)
}
// The apiserver endpoint controller needs to run on a node with a local apiserver,
// in order to successfully seed etcd with the endpoint list. The member removal controller
// also needs to run on a non-etcd node as to avoid disruption if running on the node that
// is being removed from the cluster.
if !e.config.DisableAPIServer {
e.config.Runtime.LeaderElectedClusterControllerStarts[version.Program+"-etcd"] = func(ctx context.Context) {
registerEndpointsHandlers(ctx, e)
registerMemberHandlers(ctx, e)
registerSnapshotHandlers(ctx, e)
}
}
// Tombstone file checking is unnecessary if we're not running etcd.
if !e.config.DisableETCD {
tombstoneFile := filepath.Join(dbDir(e.config), "tombstone")
if _, err := os.Stat(tombstoneFile); err == nil {
logrus.Infof("tombstone file has been detected, removing data dir to rejoin the cluster")
if _, err := backupDirWithRetention(dbDir(e.config), maxBackupRetention); err != nil {
return nil, err
}
}
if err := e.setName(false); err != nil {
return nil, err
}
}
return e.handler(handler), nil
}
// setName sets a unique name for this cluster member. The first time this is called,
// or if force is set to true, a new name will be generated and written to disk. The persistent
// name is used on subsequent calls.
func (e *ETCD) setName(force bool) error {
fileName := nameFile(e.config)
data, err := os.ReadFile(fileName)
if os.IsNotExist(err) || force {
e.name = e.config.ServerNodeName + "-" + uuid.New().String()[:8]
if err := os.MkdirAll(filepath.Dir(fileName), 0700); err != nil {
return err
}
return os.WriteFile(fileName, []byte(e.name), 0600)
} else if err != nil {
return err
}
e.name = string(data)
return nil
}
// handler wraps the handler with routes for database info
func (e *ETCD) handler(next http.Handler) http.Handler {
mux := mux.NewRouter().SkipClean(true)
mux.Handle("/db/info", e.infoHandler())
mux.NotFoundHandler = next
return mux
}
// infoHandler returns etcd cluster information. This is used by new members when joining the cluster.
// If we can't retrieve an actual MemberList from etcd, we return a canned response with only the local node listed.
func (e *ETCD) infoHandler() http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 2*time.Second)
defer cancel()
members, err := e.client.MemberList(ctx)
if err != nil {
logrus.Warnf("Failed to get etcd MemberList for %s: %v", req.RemoteAddr, err)
members = &clientv3.MemberListResponse{
Members: []*etcdserverpb.Member{
{
Name: e.name,
PeerURLs: []string{e.peerURL()},
ClientURLs: []string{e.clientURL()},
},
},
}
}
rw.Header().Set("Content-Type", "application/json")
json.NewEncoder(rw).Encode(&Members{
Members: members.Members,
})
})
}
// getClient returns an etcd client connected to the specified endpoints.
// If no endpoints are provided, endpoints are retrieved from the provided runtime config.
// If the runtime config does not list any endpoints, the default endpoint is used.
// The returned client should be closed when no longer needed, in order to avoid leaking GRPC
// client goroutines.
func getClient(ctx context.Context, control *config.Control, endpoints ...string) (*clientv3.Client, error) {
cfg, err := getClientConfig(ctx, control, endpoints...)
if err != nil {
return nil, err
}
return clientv3.New(*cfg)
}
// getClientConfig generates an etcd client config connected to the specified endpoints.
// If no endpoints are provided, getEndpoints is called to provide defaults.
func getClientConfig(ctx context.Context, control *config.Control, endpoints ...string) (*clientv3.Config, error) {
runtime := control.Runtime
if len(endpoints) == 0 {
endpoints = getEndpoints(control)
}
config := &clientv3.Config{
Endpoints: endpoints,
Context: ctx,
DialTimeout: defaultDialTimeout,
DialKeepAliveTime: defaultKeepAliveTime,
DialKeepAliveTimeout: defaultKeepAliveTimeout,
AutoSyncInterval: defaultKeepAliveTimeout,
PermitWithoutStream: true,
}
var err error
if strings.HasPrefix(endpoints[0], "https://") {
config.TLS, err = toTLSConfig(runtime)
}
return config, err
}
// getEndpoints returns the endpoints from the runtime config if set, otherwise the default endpoint.
func getEndpoints(control *config.Control) []string {
runtime := control.Runtime
if len(runtime.EtcdConfig.Endpoints) > 0 {
return runtime.EtcdConfig.Endpoints
}
return []string{fmt.Sprintf("https://%s:2379", control.Loopback(true))}
}
// toTLSConfig converts the ControlRuntime configuration to TLS configuration suitable
// for use by etcd.
func toTLSConfig(runtime *config.ControlRuntime) (*tls.Config, error) {
if runtime.ClientETCDCert == "" || runtime.ClientETCDKey == "" || runtime.ETCDServerCA == "" {
return nil, errors.New("runtime is not ready yet")
}
clientCert, err := tls.LoadX509KeyPair(runtime.ClientETCDCert, runtime.ClientETCDKey)
if err != nil {
return nil, err
}
pool, err := certutil.NewPool(runtime.ETCDServerCA)
if err != nil {
return nil, err
}
return &tls.Config{
RootCAs: pool,
Certificates: []tls.Certificate{clientCert},
}, nil
}
// getAdvertiseAddress returns the IP address best suited for advertising to clients
func getAdvertiseAddress(advertiseIP string) (string, error) {
ip := advertiseIP
if ip == "" {
ipAddr, err := utilnet.ChooseHostInterface()
if err != nil {
return "", err
}
ip = ipAddr.String()
}
return ip, nil
}
// newCluster returns options to set up etcd for a new cluster
func (e *ETCD) newCluster(ctx context.Context, reset bool) error {
logrus.Infof("Starting etcd for new cluster")
err := e.cluster(ctx, reset, executor.InitialOptions{
AdvertisePeerURL: e.peerURL(),
Cluster: fmt.Sprintf("%s=%s", e.name, e.peerURL()),
State: "new",
})
if err != nil {
return err
}
if err := e.migrateFromSQLite(ctx); err != nil {
return fmt.Errorf("failed to migrate content from sqlite to etcd: %w", err)
}
return nil
}
func (e *ETCD) migrateFromSQLite(ctx context.Context) error {
_, err := os.Stat(sqliteFile(e.config))
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
logrus.Infof("Migrating content from sqlite to etcd")
ctx, cancel := context.WithCancel(ctx)
defer cancel()
_, err = endpoint2.Listen(ctx, endpoint2.Config{
Endpoint: endpoint2.SQLiteBackend,
})
if err != nil {
return err
}
sqliteClient, err := client.New(endpoint2.ETCDConfig{
Endpoints: []string{"unix://kine.sock"},
})
if err != nil {
return err
}
defer sqliteClient.Close()
etcdClient, err := getClient(ctx, e.config)
if err != nil {
return err
}
defer etcdClient.Close()
values, err := sqliteClient.List(ctx, "/registry/", 0)
if err != nil {
return err
}
for _, value := range values {
logrus.Infof("Migrating etcd key %s", value.Key)
_, err := etcdClient.Put(ctx, string(value.Key), string(value.Data))
if err != nil {
return err
}
}
return os.Rename(sqliteFile(e.config), sqliteFile(e.config)+".migrated")
}
// peerURL returns the external peer access address for the local node.
func (e *ETCD) peerURL() string {
return fmt.Sprintf("https://%s", net.JoinHostPort(e.address, "2380"))
}
// listenClientURLs returns a list of URLs to bind to for peer connections.
// During cluster reset/restore, we only listen on loopback to avoid having peers
// connect mid-process.
func (e *ETCD) listenPeerURLs(reset bool) string {
peerURLs := fmt.Sprintf("https://%s:2380", e.config.Loopback(true))
if !reset {
peerURLs += "," + e.peerURL()
}
return peerURLs
}
// clientURL returns the external client access address for the local node.
func (e *ETCD) clientURL() string {
return fmt.Sprintf("https://%s", net.JoinHostPort(e.address, "2379"))
}
// advertiseClientURLs returns the advertised addresses for the local node.
// During cluster reset/restore we only listen on loopback to avoid having apiservers
// on other nodes connect mid-process.
func (e *ETCD) advertiseClientURLs(reset bool) string {
if reset {
return fmt.Sprintf("https://%s", net.JoinHostPort(e.config.Loopback(true), "2379"))
}
return e.clientURL()
}
// listenClientURLs returns a list of URLs to bind to for client connections.
// During cluster reset/restore, we only listen on loopback to avoid having apiservers
// on other nodes connect mid-process.
func (e *ETCD) listenClientURLs(reset bool) string {
clientURLs := fmt.Sprintf("https://%s:2379", e.config.Loopback(true))
if !reset {
clientURLs += "," + e.clientURL()
}
return clientURLs
}
// listenMetricsURLs returns a list of URLs to bind to for metrics connections.
func (e *ETCD) listenMetricsURLs(reset bool) string {
metricsURLs := fmt.Sprintf("http://%s:2381", e.config.Loopback(true))
if !reset && e.config.EtcdExposeMetrics {
metricsURLs += "," + fmt.Sprintf("http://%s", net.JoinHostPort(e.address, "2381"))
}
return metricsURLs
}
// listenClientHTTPURLs returns a list of URLs to bind to for http client connections.
// This should no longer be used, but we must set it in order to free the listen URLs
// for dedicated use by GRPC.
// Ref: https://github.com/etcd-io/etcd/issues/15402
func (e *ETCD) listenClientHTTPURLs() string {
return fmt.Sprintf("https://%s:2382", e.config.Loopback(true))
}
// cluster calls the executor to start etcd running with the provided configuration.
func (e *ETCD) cluster(ctx context.Context, reset bool, options executor.InitialOptions) error {
ctx, e.cancel = context.WithCancel(ctx)
return executor.ETCD(ctx, executor.ETCDConfig{
Name: e.name,
InitialOptions: options,
ForceNewCluster: reset,
ListenClientURLs: e.listenClientURLs(reset),
ListenMetricsURLs: e.listenMetricsURLs(reset),
ListenPeerURLs: e.listenPeerURLs(reset),
AdvertiseClientURLs: e.advertiseClientURLs(reset),
DataDir: dbDir(e.config),
ServerTrust: executor.ServerTrust{
CertFile: e.config.Runtime.ServerETCDCert,
KeyFile: e.config.Runtime.ServerETCDKey,
ClientCertAuth: true,
TrustedCAFile: e.config.Runtime.ETCDServerCA,
},
PeerTrust: executor.PeerTrust{
CertFile: e.config.Runtime.PeerServerClientETCDCert,
KeyFile: e.config.Runtime.PeerServerClientETCDKey,
ClientCertAuth: true,
TrustedCAFile: e.config.Runtime.ETCDPeerCA,
},
SnapshotCount: 10000,
ElectionTimeout: 5000,
HeartbeatInterval: 500,
Logger: "zap",
LogOutputs: []string{"stderr"},
ExperimentalInitialCorruptCheck: true,
ListenClientHTTPURLs: e.listenClientHTTPURLs(),
}, e.config.ExtraEtcdArgs)
}
func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error {
etcdDataDir := dbDir(e.config)
tmpDataDir := etcdDataDir + "-tmp"
os.RemoveAll(tmpDataDir)
go func() {
<-ctx.Done()
if err := os.RemoveAll(tmpDataDir); err != nil {
logrus.Warnf("Failed to remove etcd temp dir: %v", err)
}
}()
if e.client != nil {
return errors.New("etcd datastore already started")
}
client, err := getClient(ctx, e.config)
if err != nil {
return err
}
e.client = client
go func() {
<-ctx.Done()
client := e.client
e.client = nil
client.Close()
}()
if err := cp.Copy(etcdDataDir, tmpDataDir, cp.Options{PreserveOwner: true}); err != nil {
return err
}
endpoints := getEndpoints(e.config)
clientURL := endpoints[0]
// peer URL is usually 1 more than client
peerURL, err := addPort(endpoints[0], 1)
if err != nil {
return err
}
// client http URL is usually 3 more than client, after peer and metrics
clientHTTPURL, err := addPort(endpoints[0], 3)
if err != nil {
return err
}
embedded := executor.Embedded{}
ctx, e.cancel = context.WithCancel(ctx)
return embedded.ETCD(ctx, executor.ETCDConfig{
InitialOptions: executor.InitialOptions{AdvertisePeerURL: peerURL},
DataDir: tmpDataDir,
ForceNewCluster: true,
AdvertiseClientURLs: clientURL,
ListenClientURLs: clientURL,
ListenClientHTTPURLs: clientHTTPURL,
ListenPeerURLs: peerURL,
Logger: "zap",
HeartbeatInterval: 500,
ElectionTimeout: 5000,
SnapshotCount: 10000,
Name: e.name,
LogOutputs: []string{"stderr"},
ExperimentalInitialCorruptCheck: true,
}, append(e.config.ExtraEtcdArgs, "--max-snapshots=0", "--max-wals=0"))
}
func addPort(address string, offset int) (string, error) {
u, err := url.Parse(address)
if err != nil {
return "", err
}
port, err := strconv.Atoi(u.Port())
if err != nil {
return "", err
}
port += offset
return fmt.Sprintf("%s://%s:%d", u.Scheme, u.Hostname(), port), nil
}
// RemovePeer removes a peer from the cluster. The peer name and IP address must both match.
func (e *ETCD) RemovePeer(ctx context.Context, name, address string, allowSelfRemoval bool) error {
ctx, cancel := context.WithTimeout(ctx, memberRemovalTimeout)
defer cancel()
members, err := e.client.MemberList(ctx)
if err != nil {
return err
}
for _, member := range members.Members {
if member.Name != name {
continue
}
for _, peerURL := range member.PeerURLs {
u, err := url.Parse(peerURL)
if err != nil {
return err
}
if u.Hostname() == address {
if e.address == address && !allowSelfRemoval {
return errors.New("not removing self from etcd cluster")
}
logrus.Infof("Removing name=%s id=%d address=%s from etcd", member.Name, member.ID, address)
_, err := e.client.MemberRemove(ctx, member.ID)
if errors.Is(err, rpctypes.ErrGRPCMemberNotFound) {
return nil
}
return err
}
}
}
return nil
}
// manageLearners monitors the etcd cluster to ensure that learners are making progress towards
// being promoted to full voting member. The checks only run on the cluster member that is
// the etcd leader.
func (e *ETCD) manageLearners(ctx context.Context) {
<-e.config.Runtime.AgentReady
t := time.NewTicker(manageTickerTime)
defer t.Stop()
for range t.C {
ctx, cancel := context.WithTimeout(ctx, manageTickerTime)
defer cancel()
// Check to see if the local node is the leader. Only the leader should do learner management.
if e.client == nil {
logrus.Debug("Etcd client was nil")
continue
}
endpoints := getEndpoints(e.config)
if status, err := e.client.Status(ctx, endpoints[0]); err != nil {
logrus.Errorf("Failed to check local etcd status for learner management: %v", err)
continue
} else if status.Header.MemberId != status.Leader {
continue
}
progress, err := e.getLearnerProgress(ctx)
if err != nil {
logrus.Errorf("Failed to get recorded learner progress from etcd: %v", err)
continue
}
members, err := e.client.MemberList(ctx)
if err != nil {
logrus.Errorf("Failed to get etcd members for learner management: %v", err)
continue
}
client, err := util.GetClientSet(e.config.Runtime.KubeConfigSupervisor)
if err != nil {
logrus.Errorf("Failed to get k8s client for patch node status condition: %v", err)
continue
}
nodes, err := e.getETCDNodes()
if err != nil {
logrus.Warnf("Failed to list nodes with etcd role: %v", err)
}
for _, member := range members.Members {
if member.IsLearner {
if err := e.trackLearnerProgress(ctx, progress, member); err != nil {
logrus.Errorf("Failed to track learner progress towards promotion: %v", err)
}
for _, node := range nodes {
if strings.HasPrefix(member.Name, node.Name+"-") {
_, _, err := e.setEtcdStatusCondition(node, client.CoreV1(), member.Name, false)
if err != nil {
logrus.Errorf("Unable to set etcd status condition %s: %v", member.Name, err)
}
}
}
break
}
for _, node := range nodes {
if strings.HasPrefix(member.Name, node.Name+"-") {
_, _, err := e.setEtcdStatusCondition(node, client.CoreV1(), member.Name, true)
if err != nil {
logrus.Errorf("Unable to set etcd status condition %s: %v", member.Name, err)
}
}
}
}
}
}
func (e *ETCD) getETCDNodes() ([]*v1.Node, error) {
if e.config.Runtime.Core == nil {
return nil, errors.New("runtime core not ready")
}
nodes := e.config.Runtime.Core.Core().V1().Node()
etcdSelector := labels.Set{util.ETCDRoleLabelKey: "true"}
return nodes.Cache().List(etcdSelector.AsSelector())
}
// trackLearnerProcess attempts to promote a learner. If it cannot be promoted, progress through the raft index is tracked.
// If the learner does not make any progress in a reasonable amount of time, it is evicted from the cluster.
func (e *ETCD) trackLearnerProgress(ctx context.Context, progress *learnerProgress, member *etcdserverpb.Member) error {
// Try to promote it. If it can be promoted, no further tracking is necessary
if _, err := e.client.MemberPromote(ctx, member.ID); err != nil {
logrus.Debugf("Unable to promote learner %s: %v", member.Name, err)
} else {
logrus.Infof("Promoted learner %s", member.Name)
return nil
}
now := time.Now()
// If this is the first time we've tracked this member's progress, reset stats
if progress.Name != member.Name || progress.ID != member.ID {
progress.ID = member.ID
progress.Name = member.Name
progress.RaftAppliedIndex = 0
progress.LastProgress.Time = now
}
// Update progress by retrieving status from the member's first reachable client URL
for _, ep := range member.ClientURLs {
ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
status, err := e.client.Status(ctx, ep)
if err != nil {
logrus.Debugf("Failed to get etcd status from learner %s at %s: %v", member.Name, ep, err)
continue
}
if progress.RaftAppliedIndex < status.RaftAppliedIndex {
logrus.Debugf("Learner %s has progressed from RaftAppliedIndex %d to %d", progress.Name, progress.RaftAppliedIndex, status.RaftAppliedIndex)
progress.RaftAppliedIndex = status.RaftAppliedIndex
progress.LastProgress.Time = now
}
break
}
// Warn if the learner hasn't made any progress
if !progress.LastProgress.Time.Equal(now) {
logrus.Warnf("Learner %s stalled at RaftAppliedIndex=%d for %s", progress.Name, progress.RaftAppliedIndex, now.Sub(progress.LastProgress.Time).String())
}
// See if it's time to evict yet
if now.Sub(progress.LastProgress.Time) > learnerMaxStallTime {
if _, err := e.client.MemberRemove(ctx, member.ID); err != nil {
return err
}
logrus.Warnf("Removed learner %s from etcd cluster", member.Name)
return nil
}
return e.setLearnerProgress(ctx, progress)
}
func (e *ETCD) setEtcdStatusCondition(node *v1.Node, client corev1.CoreV1Interface, memberName string, promoted bool) (*v1.Node, []byte, error) {
etcdStatusType := v1.NodeConditionType("EtcdIsVoter")
var newCondition v1.NodeCondition
if promoted {
newCondition = v1.NodeCondition{
Type: etcdStatusType,
Status: "True",
Reason: "MemberNotLearner",
Message: "Node is a voting member of the etcd cluster",
}
} else {
newCondition = v1.NodeCondition{
Type: etcdStatusType,
Status: "False",
Reason: "MemberIsLearner",
Message: "Node has not been promoted to voting member of the etcd cluster",
}
}
updatedNode := *node
if find, condition := nodeUtil.GetNodeCondition(&updatedNode.Status, etcdStatusType); find >= 0 {
if condition.Status == newCondition.Status {
logrus.Debugf("Member %s is not changing etcd status condition", memberName)
condition.LastHeartbeatTime = metav1.Now()
return nodeHelper.PatchNodeStatus(client, types.NodeName(node.Name), node, &updatedNode)
}
logrus.Debugf("Member %s is changing etcd condition", memberName)
condition = &newCondition
condition.LastHeartbeatTime = metav1.Now()
condition.LastTransitionTime = metav1.Now()
return nodeHelper.PatchNodeStatus(client, types.NodeName(node.Name), node, &updatedNode)
}
logrus.Infof("Adding etcd member %s status condition", memberName)
newCondition.LastHeartbeatTime = metav1.Now()
newCondition.LastTransitionTime = metav1.Now()
updatedNode.Status.Conditions = append(updatedNode.Status.Conditions, newCondition)
return nodeHelper.PatchNodeStatus(client, types.NodeName(node.Name), node, &updatedNode)
}
// getLearnerProgress returns the stored learnerProgress struct as retrieved from etcd
func (e *ETCD) getLearnerProgress(ctx context.Context) (*learnerProgress, error) {
progress := &learnerProgress{}
value, err := e.client.Get(ctx, learnerProgressKey)
if err != nil {
return nil, err
}
if value.Count < 1 {
return progress, nil
}
if err := json.NewDecoder(bytes.NewBuffer(value.Kvs[0].Value)).Decode(progress); err != nil {
return nil, err
}
return progress, nil
}
// setLearnerProgress stores the learnerProgress struct to etcd
func (e *ETCD) setLearnerProgress(ctx context.Context, status *learnerProgress) error {
w := &bytes.Buffer{}
if err := json.NewEncoder(w).Encode(status); err != nil {
return err
}
_, err := e.client.Put(ctx, learnerProgressKey, w.String())
return err
}
// clearAlarms checks for any alarms on the local etcd member. If found, they are
// reported and the alarm state is cleared.
func (e *ETCD) clearAlarms(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()
if e.client == nil {
return errors.New("etcd client was nil")
}
alarmList, err := e.client.AlarmList(ctx)
if err != nil {
return fmt.Errorf("etcd alarm list failed: %v", err)
}
for _, alarm := range alarmList.Alarms {
logrus.Warnf("Alarm on etcd member %d: %s", alarm.MemberID, alarm.Alarm)
}
if len(alarmList.Alarms) > 0 {
if _, err := e.client.AlarmDisarm(ctx, &clientv3.AlarmMember{}); err != nil {
return fmt.Errorf("etcd alarm disarm failed: %v", err)
}
logrus.Infof("Alarms disarmed on etcd server")
}
return nil
}
func (e *ETCD) defragment(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()
if e.client == nil {
return errors.New("etcd client was nil")
}
logrus.Infof("Defragmenting etcd database")
endpoints := getEndpoints(e.config)
_, err := e.client.Defragment(ctx, endpoints[0])
return err
}
// clientURLs returns a list of all non-learner etcd cluster member client access URLs.
// The list is retrieved from the remote server that is being joined.
func ClientURLs(ctx context.Context, clientAccessInfo *clientaccess.Info, selfIP string) ([]string, Members, error) {
var memberList Members
resp, err := clientAccessInfo.Get("/db/info")
if err != nil {
return nil, memberList, err
}
if err := json.Unmarshal(resp, &memberList); err != nil {
return nil, memberList, err
}
ip, err := getAdvertiseAddress(selfIP)
if err != nil {
return nil, memberList, err
}
var clientURLs []string
members:
for _, member := range memberList.Members {
// excluding learner member from the client list
if member.IsLearner {
continue
}
for _, clientURL := range member.ClientURLs {
u, err := url.Parse(clientURL)
if err != nil {
continue
}
if u.Hostname() == ip {
continue members
}
}
clientURLs = append(clientURLs, member.ClientURLs...)
}
return clientURLs, memberList, nil
}
// Restore performs a restore of the ETCD datastore from
// the given snapshot path. This operation exists upon
// completion.
func (e *ETCD) Restore(ctx context.Context) error {
// check the old etcd data dir
oldDataDir := dbDir(e.config) + "-old-" + strconv.Itoa(int(time.Now().Unix()))
if e.config.ClusterResetRestorePath == "" {
return errors.New("no etcd restore path was specified")
}
// make sure snapshot exists before restoration
if _, err := os.Stat(e.config.ClusterResetRestorePath); err != nil {
return err
}
var restorePath string
if strings.HasSuffix(e.config.ClusterResetRestorePath, compressedExtension) {
snapshotDir, err := snapshotDir(e.config, true)
if err != nil {
return errors.Wrap(err, "failed to get the snapshot dir")
}
decompressSnapshot, err := e.decompressSnapshot(snapshotDir, e.config.ClusterResetRestorePath)
if err != nil {
return err
}
restorePath = decompressSnapshot
} else {
restorePath = e.config.ClusterResetRestorePath
}
// move the data directory to a temp path
if err := os.Rename(dbDir(e.config), oldDataDir); err != nil {
return err
}
logrus.Infof("Pre-restore etcd database moved to %s", oldDataDir)
lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
if err != nil {
return err
}
return snapshot.NewV3(lg).Restore(snapshot.RestoreConfig{
SnapshotPath: restorePath,
Name: e.name,
OutputDataDir: dbDir(e.config),
OutputWALDir: walDir(e.config),
PeerURLs: []string{e.peerURL()},
InitialCluster: e.name + "=" + e.peerURL(),
})
}
// backupDirWithRetention will move the dir to a backup dir
// and will keep only maxBackupRetention of dirs.
func backupDirWithRetention(dir string, maxBackupRetention int) (string, error) {
backupDir := dir + "-backup-" + strconv.Itoa(int(time.Now().Unix()))
if _, err := os.Stat(dir); err != nil {
return "", nil
}
entries, err := os.ReadDir(filepath.Dir(dir))
if err != nil {
return "", err
}
files := make([]fs.FileInfo, 0, len(entries))
for _, entry := range entries {
info, err := entry.Info()
if err != nil {
return "", err
}
files = append(files, info)
}
if err != nil {
return "", err
}
sort.Slice(files, func(i, j int) bool {
return files[i].ModTime().After(files[j].ModTime())
})
count := 0
for _, f := range files {
if strings.HasPrefix(f.Name(), filepath.Base(dir)+"-backup") && f.IsDir() {
count++
if count > maxBackupRetention {
if err := os.RemoveAll(filepath.Join(filepath.Dir(dir), f.Name())); err != nil {
return "", err
}
}
}
}
// move the directory to a temp path
if err := os.Rename(dir, backupDir); err != nil {
return "", err
}
return backupDir, nil
}
// GetAPIServerURLsFromETCD will try to fetch the version.Program/apiaddresses key from etcd
// and unmarshal it to a list of apiserver endpoints.
func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]string, error) {
cl, err := getClient(ctx, cfg)
if err != nil {
return nil, err
}
defer cl.Close()
etcdResp, err := cl.KV.Get(ctx, AddressKey)
if err != nil {
return nil, err
}
if etcdResp.Count == 0 || len(etcdResp.Kvs[0].Value) == 0 {
return nil, ErrAddressNotSet
}
var addresses []string
if err := json.Unmarshal(etcdResp.Kvs[0].Value, &addresses); err != nil {
return nil, fmt.Errorf("failed to unmarshal apiserver addresses from etcd: %v", err)
}
return addresses, nil
}
// GetMembersClientURLs will list through the member lists in etcd and return
// back a combined list of client urls for each member in the cluster
func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) {
return e.client.Endpoints(), nil
}
// GetMembersNames will list through the member lists in etcd and return
// back a combined list of member names
func (e *ETCD) GetMembersNames(ctx context.Context) ([]string, error) {
ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()
members, err := e.client.MemberList(ctx)
if err != nil {
return nil, err
}
var memberNames []string
for _, member := range members.Members {
memberNames = append(memberNames, member.Name)
}
return memberNames, nil
}
// RemoveSelf will remove the member if it exists in the cluster. This should
// only be called on a node that may have previously run etcd, but will not
// currently run etcd, to ensure that it is not a member of the cluster.
// This is also called by tests to do cleanup between runs.
func (e *ETCD) RemoveSelf(ctx context.Context) error {
if e.client == nil {
if err := e.startClient(ctx); err != nil {
return err
}
}
if err := e.RemovePeer(ctx, e.name, e.address, true); err != nil {
return err
}
// backup the data dir to avoid issues when re-enabling etcd
oldDataDir := dbDir(e.config) + "-old-" + strconv.Itoa(int(time.Now().Unix()))
// move the data directory to a temp path
return os.Rename(dbDir(e.config), oldDataDir)
}