mirror of
https://github.com/k3s-io/k3s.git
synced 2024-06-07 19:41:36 +00:00
etcd snapshot functionality enhancements (#4453)
Signed-off-by: Chris Kim <oats87g@gmail.com>
This commit is contained in:
parent
0c1f816f24
commit
ae4a1a144a
@ -25,6 +25,11 @@ var EtcdSnapshotFlags = []cli.Flag{
|
||||
Usage: "(data) Folder to hold state default /var/lib/rancher/" + version.Program + " or ${HOME}/.rancher/" + version.Program + " if not root",
|
||||
Destination: &ServerConfig.DataDir,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "dir,etcd-snapshot-dir",
|
||||
Usage: "(db) Directory to save etcd on-demand snapshot. (default: ${data-dir}/db/snapshots)",
|
||||
Destination: &ServerConfig.EtcdSnapshotDir,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "name",
|
||||
Usage: "(db) Set the base name of the etcd on-demand snapshot (appended with UNIX timestamp).",
|
||||
@ -101,11 +106,7 @@ func NewEtcdSnapshotCommand(action func(*cli.Context) error, subcommands []cli.C
|
||||
SkipArgReorder: true,
|
||||
Action: action,
|
||||
Subcommands: subcommands,
|
||||
Flags: append(EtcdSnapshotFlags, &cli.StringFlag{
|
||||
Name: "dir,etcd-snapshot-dir",
|
||||
Usage: "(db) Directory to save etcd on-demand snapshot. (default: ${data-dir}/db/snapshots)",
|
||||
Destination: &ServerConfig.EtcdSnapshotDir,
|
||||
}),
|
||||
Flags: EtcdSnapshotFlags,
|
||||
}
|
||||
}
|
||||
|
||||
@ -130,7 +131,7 @@ func NewEtcdSnapshotSubcommands(delete, list, prune, save func(ctx *cli.Context)
|
||||
},
|
||||
{
|
||||
Name: "prune",
|
||||
Usage: "Remove snapshots that exceed the configured retention count",
|
||||
Usage: "Remove snapshots that match the name prefix that exceed the configured retention count",
|
||||
SkipFlagParsing: false,
|
||||
SkipArgReorder: true,
|
||||
Action: prune,
|
||||
@ -147,11 +148,7 @@ func NewEtcdSnapshotSubcommands(delete, list, prune, save func(ctx *cli.Context)
|
||||
SkipFlagParsing: false,
|
||||
SkipArgReorder: true,
|
||||
Action: save,
|
||||
Flags: append(EtcdSnapshotFlags, &cli.StringFlag{
|
||||
Name: "dir",
|
||||
Usage: "(db) Directory to save etcd on-demand snapshot. (default: ${data-dir}/db/snapshots)",
|
||||
Destination: &ServerConfig.EtcdSnapshotDir,
|
||||
}),
|
||||
Flags: EtcdSnapshotFlags,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -173,11 +173,19 @@ func list(app *cli.Context, cfg *cmds.Server) error {
|
||||
w := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', 0)
|
||||
defer w.Flush()
|
||||
|
||||
for _, s := range sf {
|
||||
if cfg.EtcdS3 {
|
||||
fmt.Fprintf(w, "%s\t%d\t%s\n", s.Name, s.Size, s.CreatedAt.Format(time.RFC3339))
|
||||
} else {
|
||||
fmt.Fprintf(w, "%s\t%s\t%d\t%s\n", s.Name, s.Location, s.Size, s.CreatedAt.Format(time.RFC3339))
|
||||
if cfg.EtcdS3 {
|
||||
fmt.Fprint(w, "Name\tSize\tCreated\n")
|
||||
for _, s := range sf {
|
||||
if s.NodeName == "s3" {
|
||||
fmt.Fprintf(w, "%s\t%d\t%s\n", s.Name, s.Size, s.CreatedAt.Format(time.RFC3339))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
fmt.Fprint(w, "Name\tLocation\tSize\tCreated\n")
|
||||
for _, s := range sf {
|
||||
if s.NodeName != "s3" {
|
||||
fmt.Fprintf(w, "%s\t%s\t%d\t%s\n", s.Name, s.Location, s.Size, s.CreatedAt.Format(time.RFC3339))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -201,10 +209,17 @@ func prune(app *cli.Context, cfg *cmds.Server) error {
|
||||
|
||||
serverConfig.ControlConfig.DataDir = dataDir
|
||||
serverConfig.ControlConfig.EtcdSnapshotRetention = cfg.EtcdSnapshotRetention
|
||||
serverConfig.ControlConfig.Runtime.KubeConfigAdmin = filepath.Join(dataDir, "cred", "admin.kubeconfig")
|
||||
|
||||
ctx := signals.SetupSignalContext()
|
||||
e := etcd.NewETCD()
|
||||
e.SetControlConfig(&serverConfig.ControlConfig)
|
||||
|
||||
sc, err := server.NewContext(ctx, serverConfig.ControlConfig.Runtime.KubeConfigAdmin)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
serverConfig.ControlConfig.Runtime.Core = sc.Core
|
||||
|
||||
return e.PruneSnapshots(ctx)
|
||||
}
|
||||
|
@ -104,7 +104,7 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
|
||||
}
|
||||
|
||||
if !c.config.EtcdDisableSnapshots {
|
||||
if err := c.managedDB.StoreSnapshotData(ctx); err != nil {
|
||||
if err := c.managedDB.ReconcileSnapshotData(ctx); err != nil {
|
||||
logrus.Errorf("Failed to record snapshots for cluster: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ type Driver interface {
|
||||
Restore(ctx context.Context) error
|
||||
EndpointName() string
|
||||
Snapshot(ctx context.Context, config *config.Control) error
|
||||
StoreSnapshotData(ctx context.Context) error
|
||||
ReconcileSnapshotData(ctx context.Context) error
|
||||
GetMembersClientURLs(ctx context.Context) ([]string, error)
|
||||
RemoveSelf(ctx context.Context) error
|
||||
}
|
||||
|
469
pkg/etcd/etcd.go
469
pkg/etcd/etcd.go
@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
@ -68,7 +69,8 @@ var (
|
||||
// AddressKey will contain the value of api addresses list
|
||||
AddressKey = version.Program + "/apiaddresses"
|
||||
|
||||
snapshotConfigMapName = version.Program + "-etcd-snapshots"
|
||||
snapshotExtraMetadataConfigMapName = version.Program + "-etcd-snapshot-extra-metadata"
|
||||
snapshotConfigMapName = version.Program + "-etcd-snapshots"
|
||||
|
||||
NodeNameAnnotation = "etcd." + version.Program + ".cattle.io/node-name"
|
||||
NodeAddressAnnotation = "etcd." + version.Program + ".cattle.io/node-address"
|
||||
@ -921,7 +923,7 @@ func (e *ETCD) preSnapshotSetup(ctx context.Context, config *config.Control) err
|
||||
return nil
|
||||
}
|
||||
|
||||
// Snapshot attempts to save a new snapshot to the configured directory, and then clean up any old
|
||||
// Snapshot attempts to save a new snapshot to the configured directory, and then clean up any old and failed
|
||||
// snapshots in excess of the retention limits. This method is used in the internal cron snapshot
|
||||
// system as well as used to do on-demand snapshots.
|
||||
func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error {
|
||||
@ -929,13 +931,29 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error {
|
||||
return err
|
||||
}
|
||||
|
||||
logrus.Debugf("Attempting to retrieve extra metadata from %s ConfigMap", snapshotExtraMetadataConfigMapName)
|
||||
var extraMetadata string
|
||||
if snapshotExtraMetadataConfigMap, err := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotExtraMetadataConfigMapName, metav1.GetOptions{}); err != nil {
|
||||
logrus.Debugf("Error encountered attempting to retrieve extra metadata from %s ConfigMap, error: %v", snapshotExtraMetadataConfigMapName, err)
|
||||
extraMetadata = ""
|
||||
} else {
|
||||
if m, err := json.Marshal(snapshotExtraMetadataConfigMap.Data); err != nil {
|
||||
logrus.Debugf("Error attempting to marshal extra metadata contained in %s ConfigMap, error: %v", snapshotExtraMetadataConfigMapName, err)
|
||||
extraMetadata = ""
|
||||
} else {
|
||||
logrus.Debugf("Setting extra metadata from %s ConfigMap", snapshotExtraMetadataConfigMapName)
|
||||
logrus.Tracef("Marshalled extra metadata in %s ConfigMap was: %s", snapshotExtraMetadataConfigMapName, string(m))
|
||||
extraMetadata = base64.StdEncoding.EncodeToString(m)
|
||||
}
|
||||
}
|
||||
|
||||
status, err := e.client.Status(ctx, endpoint)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to check etcd status for snapshot")
|
||||
}
|
||||
|
||||
if status.IsLearner {
|
||||
logrus.Warnf("Skipping snapshot: not supported for learner")
|
||||
logrus.Warnf("Unable to take snapshot: not supported for learner")
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -950,40 +968,104 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error {
|
||||
}
|
||||
|
||||
nodeName := os.Getenv("NODE_NAME")
|
||||
snapshotName := fmt.Sprintf("%s-%s-%d", e.config.EtcdSnapshotName, nodeName, time.Now().Unix())
|
||||
now := time.Now()
|
||||
snapshotName := fmt.Sprintf("%s-%s-%d", e.config.EtcdSnapshotName, nodeName, now.Unix())
|
||||
snapshotPath := filepath.Join(snapshotDir, snapshotName)
|
||||
|
||||
logrus.Infof("Saving etcd snapshot to %s", snapshotPath)
|
||||
|
||||
var sf *SnapshotFile
|
||||
|
||||
if err := snapshot.NewV3(nil).Save(ctx, *cfg, snapshotPath); err != nil {
|
||||
return errors.Wrap(err, "failed to save snapshot")
|
||||
sf = &SnapshotFile{
|
||||
Name: snapshotName,
|
||||
Location: "",
|
||||
Metadata: extraMetadata,
|
||||
NodeName: nodeName,
|
||||
CreatedAt: &metav1.Time{
|
||||
Time: now,
|
||||
},
|
||||
Status: FailedSnapshotStatus,
|
||||
Message: base64.StdEncoding.EncodeToString([]byte(err.Error())),
|
||||
Size: 0,
|
||||
}
|
||||
logrus.Errorf("Failed to take etcd snapshot: %v", err)
|
||||
if err := e.addSnapshotData(*sf); err != nil {
|
||||
return errors.Wrap(err, "failed to save local snapshot failure data to configmap")
|
||||
}
|
||||
}
|
||||
|
||||
if e.config.EtcdS3 {
|
||||
logrus.Infof("Saving etcd snapshot %s to S3", snapshotName)
|
||||
if err := e.initS3IfNil(ctx); err != nil {
|
||||
return err
|
||||
// If the snapshot attempt was successful, sf will be nil as we did not set it.
|
||||
if sf == nil {
|
||||
f, err := os.Stat(snapshotPath)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to retrieve snapshot information from local snapshot")
|
||||
}
|
||||
if err := e.s3.upload(ctx, snapshotPath); err != nil {
|
||||
return err
|
||||
sf = &SnapshotFile{
|
||||
Name: f.Name(),
|
||||
Metadata: extraMetadata,
|
||||
Location: "file://" + snapshotPath,
|
||||
NodeName: nodeName,
|
||||
CreatedAt: &metav1.Time{
|
||||
Time: f.ModTime(),
|
||||
},
|
||||
Status: SuccessfulSnapshotStatus,
|
||||
Size: f.Size(),
|
||||
}
|
||||
logrus.Infof("S3 upload complete for %s", snapshotName)
|
||||
|
||||
if e.config.EtcdSnapshotRetention >= 1 {
|
||||
if err := e.s3.snapshotRetention(ctx); err != nil {
|
||||
return errors.Wrap(err, "failed to apply s3 snapshot retention")
|
||||
if err := e.addSnapshotData(*sf); err != nil {
|
||||
return errors.Wrap(err, "failed to save local snapshot data to configmap")
|
||||
}
|
||||
|
||||
if err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir); err != nil {
|
||||
return errors.Wrap(err, "failed to apply local snapshot retention policy")
|
||||
}
|
||||
|
||||
if e.config.EtcdS3 {
|
||||
logrus.Infof("Saving etcd snapshot %s to S3", snapshotName)
|
||||
// Set sf to nil so that we can attempt to now upload the snapshot to S3 if needed
|
||||
sf = nil
|
||||
if err := e.initS3IfNil(ctx); err != nil {
|
||||
logrus.Warnf("Unable to initialize S3 client: %v", err)
|
||||
sf = &SnapshotFile{
|
||||
Name: filepath.Base(snapshotPath),
|
||||
Metadata: extraMetadata,
|
||||
NodeName: "s3",
|
||||
CreatedAt: &metav1.Time{
|
||||
Time: now,
|
||||
},
|
||||
Message: base64.StdEncoding.EncodeToString([]byte(err.Error())),
|
||||
Size: 0,
|
||||
Status: FailedSnapshotStatus,
|
||||
S3: &s3Config{
|
||||
Endpoint: e.config.EtcdS3Endpoint,
|
||||
EndpointCA: e.config.EtcdS3EndpointCA,
|
||||
SkipSSLVerify: e.config.EtcdS3SkipSSLVerify,
|
||||
Bucket: e.config.EtcdS3BucketName,
|
||||
Region: e.config.EtcdS3Region,
|
||||
Folder: e.config.EtcdS3Folder,
|
||||
Insecure: e.config.EtcdS3Insecure,
|
||||
},
|
||||
}
|
||||
}
|
||||
// sf should be nil if we were able to successfully initialize the S3 client.
|
||||
if sf == nil {
|
||||
sf, err = e.s3.upload(ctx, snapshotPath, extraMetadata, now)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logrus.Infof("S3 upload complete for %s", snapshotName)
|
||||
if err := e.s3.snapshotRetention(ctx); err != nil {
|
||||
return errors.Wrap(err, "failed to apply s3 snapshot retention policy")
|
||||
}
|
||||
}
|
||||
if err := e.addSnapshotData(*sf); err != nil {
|
||||
return errors.Wrap(err, "failed to save snapshot data to configmap")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check if we need to perform a retention check
|
||||
if e.config.EtcdSnapshotRetention >= 1 {
|
||||
if err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir); err != nil {
|
||||
return errors.Wrap(err, "failed to apply snapshot retention")
|
||||
}
|
||||
}
|
||||
|
||||
return e.StoreSnapshotData(ctx)
|
||||
return e.ReconcileSnapshotData(ctx)
|
||||
}
|
||||
|
||||
type s3Config struct {
|
||||
@ -996,24 +1078,67 @@ type s3Config struct {
|
||||
Insecure bool `json:"insecure,omitempty"`
|
||||
}
|
||||
|
||||
type SnapshotStatus string
|
||||
|
||||
const SuccessfulSnapshotStatus SnapshotStatus = "successful"
|
||||
const FailedSnapshotStatus SnapshotStatus = "failed"
|
||||
|
||||
// SnapshotFile represents a single snapshot and it's
|
||||
// metadata.
|
||||
type SnapshotFile struct {
|
||||
Name string `json:"name"`
|
||||
// Location contains the full path of the snapshot. For
|
||||
// local paths, the location will be prefixed with "file://".
|
||||
Location string `json:"location,omitempty"`
|
||||
NodeName string `json:"nodeName,omitempty"`
|
||||
CreatedAt *metav1.Time `json:"createdAt,omitempty"`
|
||||
Size int64 `json:"size,omitempty"`
|
||||
S3 *s3Config `json:"s3Config,omitempty"`
|
||||
Location string `json:"location,omitempty"`
|
||||
Metadata string `json:"metadata,omitempty"`
|
||||
Message string `json:"message,omitempty"`
|
||||
NodeName string `json:"nodeName,omitempty"`
|
||||
CreatedAt *metav1.Time `json:"createdAt,omitempty"`
|
||||
Size int64 `json:"size,omitempty"`
|
||||
Status SnapshotStatus `json:"status,omitempty"`
|
||||
S3 *s3Config `json:"s3Config,omitempty"`
|
||||
}
|
||||
|
||||
// listSnapshots provides a list of the currently stored
|
||||
// snapshots on disk or in S3 along with their relevant
|
||||
// listLocalSnapshots provides a list of the currently stored
|
||||
// snapshots on disk along with their relevant
|
||||
// metadata.
|
||||
func (e *ETCD) listSnapshots(ctx context.Context, snapshotDir string) ([]SnapshotFile, error) {
|
||||
var snapshots []SnapshotFile
|
||||
func (e *ETCD) listLocalSnapshots() (map[string]SnapshotFile, error) {
|
||||
snapshots := make(map[string]SnapshotFile)
|
||||
snapshotDir, err := snapshotDir(e.config, true)
|
||||
if err != nil {
|
||||
return snapshots, errors.Wrap(err, "failed to get the snapshot dir")
|
||||
}
|
||||
|
||||
files, err := ioutil.ReadDir(snapshotDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodeName := os.Getenv("NODE_NAME")
|
||||
|
||||
for _, f := range files {
|
||||
sf := SnapshotFile{
|
||||
Name: f.Name(),
|
||||
Location: "file://" + filepath.Join(snapshotDir, f.Name()),
|
||||
NodeName: nodeName,
|
||||
CreatedAt: &metav1.Time{
|
||||
Time: f.ModTime(),
|
||||
},
|
||||
Size: f.Size(),
|
||||
Status: SuccessfulSnapshotStatus,
|
||||
}
|
||||
sfKey := generateSnapshotConfigMapKey(sf)
|
||||
snapshots[sfKey] = sf
|
||||
}
|
||||
|
||||
return snapshots, nil
|
||||
}
|
||||
|
||||
// listS3Snapshots provides a list of currently stored
|
||||
// snapshots in S3 along with their relevant
|
||||
// metadata.
|
||||
func (e *ETCD) listS3Snapshots(ctx context.Context) (map[string]SnapshotFile, error) {
|
||||
snapshots := make(map[string]SnapshotFile)
|
||||
|
||||
if e.config.EtcdS3 {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
@ -1046,7 +1171,7 @@ func (e *ETCD) listSnapshots(ctx context.Context, snapshotDir string) ([]Snapsho
|
||||
return nil, err
|
||||
}
|
||||
|
||||
snapshots = append(snapshots, SnapshotFile{
|
||||
sf := SnapshotFile{
|
||||
Name: filepath.Base(obj.Key),
|
||||
NodeName: "s3",
|
||||
CreatedAt: &metav1.Time{
|
||||
@ -1062,31 +1187,12 @@ func (e *ETCD) listSnapshots(ctx context.Context, snapshotDir string) ([]Snapsho
|
||||
Folder: e.config.EtcdS3Folder,
|
||||
Insecure: e.config.EtcdS3Insecure,
|
||||
},
|
||||
})
|
||||
Status: SuccessfulSnapshotStatus,
|
||||
}
|
||||
sfKey := generateSnapshotConfigMapKey(sf)
|
||||
snapshots[sfKey] = sf
|
||||
}
|
||||
|
||||
return snapshots, nil
|
||||
}
|
||||
|
||||
files, err := ioutil.ReadDir(snapshotDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodeName := os.Getenv("NODE_NAME")
|
||||
|
||||
for _, f := range files {
|
||||
snapshots = append(snapshots, SnapshotFile{
|
||||
Name: f.Name(),
|
||||
Location: "file://" + filepath.Join(snapshotDir, f.Name()),
|
||||
NodeName: nodeName,
|
||||
CreatedAt: &metav1.Time{
|
||||
Time: f.ModTime(),
|
||||
},
|
||||
Size: f.Size(),
|
||||
})
|
||||
}
|
||||
|
||||
return snapshots, nil
|
||||
}
|
||||
|
||||
@ -1104,34 +1210,37 @@ func (e *ETCD) initS3IfNil(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// PruneSnapshots perfrorms a retention run with the given
|
||||
// PruneSnapshots performs a retention run with the given
|
||||
// retention duration and removes expired snapshots.
|
||||
func (e *ETCD) PruneSnapshots(ctx context.Context) error {
|
||||
snapshotDir, err := snapshotDir(e.config, false)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to get the snapshot dir")
|
||||
}
|
||||
|
||||
if e.config.EtcdS3 {
|
||||
if e.initS3IfNil(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return e.s3.snapshotRetention(ctx)
|
||||
if err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir); err != nil {
|
||||
logrus.Errorf("Error applying snapshot retention policy: %v", err)
|
||||
}
|
||||
|
||||
return snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir)
|
||||
if e.config.EtcdS3 {
|
||||
if err := e.initS3IfNil(ctx); err != nil {
|
||||
logrus.Warnf("Unable to initialize S3 client during prune: %v", err)
|
||||
} else {
|
||||
if err := e.s3.snapshotRetention(ctx); err != nil {
|
||||
logrus.Errorf("Error applying S3 snapshot retention policy: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return e.ReconcileSnapshotData(ctx)
|
||||
}
|
||||
|
||||
// ListSnapshots is an exported wrapper method that wraps an
|
||||
// unexported method of the same name.
|
||||
func (e *ETCD) ListSnapshots(ctx context.Context) ([]SnapshotFile, error) {
|
||||
snapshotDir, err := snapshotDir(e.config, false)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to get the snapshot dir")
|
||||
func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]SnapshotFile, error) {
|
||||
if e.config.EtcdS3 {
|
||||
return e.listS3Snapshots(ctx)
|
||||
}
|
||||
|
||||
return e.listSnapshots(ctx, snapshotDir)
|
||||
return e.listLocalSnapshots()
|
||||
}
|
||||
|
||||
// deleteSnapshots removes the given snapshots from
|
||||
@ -1183,7 +1292,7 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logrus.Errorf("Unable to delete snapshot: %v", ctx.Err())
|
||||
return e.StoreSnapshotData(ctx)
|
||||
return e.ReconcileSnapshotData(ctx)
|
||||
case <-time.After(time.Millisecond * 100):
|
||||
continue
|
||||
case err, ok := <-e.s3.client.RemoveObjects(ctx, e.config.EtcdS3BucketName, objectsCh, minio.RemoveObjectsOptions{}):
|
||||
@ -1191,7 +1300,7 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error {
|
||||
logrus.Errorf("Unable to delete snapshot: %v", err.Err)
|
||||
}
|
||||
if !ok {
|
||||
return e.StoreSnapshotData(ctx)
|
||||
return e.ReconcileSnapshotData(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1214,59 +1323,33 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error {
|
||||
logrus.Debug("Removed snapshot ", s)
|
||||
}
|
||||
|
||||
return e.StoreSnapshotData(ctx)
|
||||
return e.ReconcileSnapshotData(ctx)
|
||||
}
|
||||
|
||||
// updateSnapshotData populates the given map with the contents of the given slice.
|
||||
func updateSnapshotData(data map[string]string, snapshotFiles []SnapshotFile) error {
|
||||
for _, v := range snapshotFiles {
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data[v.Name] = string(b)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// StoreSnapshotData stores the given snapshot data in the "snapshots" ConfigMap.
|
||||
func (e *ETCD) StoreSnapshotData(ctx context.Context) error {
|
||||
logrus.Infof("Saving current etcd snapshot set to %s ConfigMap", snapshotConfigMapName)
|
||||
|
||||
snapshotDir, err := snapshotDir(e.config, true)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to get the snapshot dir")
|
||||
}
|
||||
|
||||
// AddSnapshotData adds the given snapshot file information to the snapshot configmap, using the existing extra metadata
|
||||
// available at the time.
|
||||
func (e *ETCD) addSnapshotData(sf SnapshotFile) error {
|
||||
return retry.OnError(retry.DefaultBackoff, func(err error) bool {
|
||||
return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err)
|
||||
}, func() error {
|
||||
// make sure the core.Factory is initialize. There can
|
||||
// make sure the core.Factory is initialized. There can
|
||||
// be a race between this core code startup.
|
||||
for e.config.Runtime.Core == nil {
|
||||
runtime.Gosched()
|
||||
}
|
||||
|
||||
snapshotConfigMap, getErr := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{})
|
||||
|
||||
snapshotFiles, err := e.listSnapshots(ctx, snapshotDir)
|
||||
marshalledSnapshotFile, err := json.Marshal(sf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data := make(map[string]string, len(snapshotFiles))
|
||||
if err := updateSnapshotData(data, snapshotFiles); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if apierrors.IsNotFound(getErr) {
|
||||
cm := v1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: snapshotConfigMapName,
|
||||
Namespace: metav1.NamespaceSystem,
|
||||
},
|
||||
Data: data,
|
||||
Data: map[string]string{sf.Name: string(marshalledSnapshotFile)},
|
||||
}
|
||||
_, err := e.config.Runtime.Core.Core().V1().ConfigMap().Create(&cm)
|
||||
return err
|
||||
@ -1276,24 +1359,179 @@ func (e *ETCD) StoreSnapshotData(ctx context.Context) error {
|
||||
snapshotConfigMap.Data = make(map[string]string)
|
||||
}
|
||||
|
||||
sfKey := generateSnapshotConfigMapKey(sf)
|
||||
snapshotConfigMap.Data[sfKey] = string(marshalledSnapshotFile)
|
||||
|
||||
_, err = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func generateSnapshotConfigMapKey(sf SnapshotFile) string {
|
||||
var sfKey string
|
||||
if sf.NodeName == "s3" {
|
||||
sfKey = "s3-" + sf.Name
|
||||
} else {
|
||||
sfKey = "local-" + sf.Name
|
||||
}
|
||||
return sfKey
|
||||
}
|
||||
|
||||
// ReconcileSnapshotData reconciles snapshot data in the snapshot ConfigMap.
|
||||
// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to list S3 snapshots
|
||||
// and reconcile snapshots from S3. Notably,
|
||||
func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
||||
logrus.Infof("Reconciling etcd snapshot data in %s ConfigMap", snapshotConfigMapName)
|
||||
defer logrus.Infof("Reconciliation of snapshot data in %s ConfigMap complete", snapshotConfigMapName)
|
||||
return retry.OnError(retry.DefaultBackoff, func(err error) bool {
|
||||
return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err)
|
||||
}, func() error {
|
||||
// make sure the core.Factory is initialize. There can
|
||||
// be a race between this core code startup.
|
||||
for e.config.Runtime.Core == nil {
|
||||
runtime.Gosched()
|
||||
}
|
||||
|
||||
logrus.Debug("core.Factory is initialized")
|
||||
|
||||
snapshotConfigMap, getErr := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{})
|
||||
if apierrors.IsNotFound(getErr) {
|
||||
// Can't reconcile what doesn't exist.
|
||||
return errors.New("No snapshot configmap found")
|
||||
}
|
||||
|
||||
logrus.Debugf("Attempting to reconcile etcd snapshot data for configmap generation %d", snapshotConfigMap.Generation)
|
||||
|
||||
// if the snapshot config map data is nil, no need to reconcile.
|
||||
if snapshotConfigMap.Data == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
snapshotFiles, err := e.listLocalSnapshots()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// s3ListSuccessful is set to true if we are successful at listing snapshots from S3 to eliminate accidental
|
||||
// clobbering of S3 snapshots in the configmap due to misconfigured S3 credentials/details
|
||||
s3ListSuccessful := false
|
||||
|
||||
if e.config.EtcdS3 {
|
||||
if s3Snapshots, err := e.listS3Snapshots(ctx); err != nil {
|
||||
logrus.Errorf("error retrieving S3 snapshots for reconciliation: %v", err)
|
||||
} else {
|
||||
for k, v := range s3Snapshots {
|
||||
snapshotFiles[k] = v
|
||||
}
|
||||
s3ListSuccessful = true
|
||||
}
|
||||
}
|
||||
|
||||
nodeName := os.Getenv("NODE_NAME")
|
||||
|
||||
// remove entries for this node only
|
||||
// deletedSnapshots is a map[string]string where key is the configmap key and the value is the marshalled snapshot file
|
||||
// it will be populated below with snapshots that are either from S3 or on the local node. Notably, deletedSnapshots will
|
||||
// not contain snapshots that are in the "failed" status
|
||||
deletedSnapshots := make(map[string]string)
|
||||
// failedSnapshots is a slice of unmarshaled snapshot files sourced from the configmap
|
||||
// These are stored unmarshaled so we can sort based on name.
|
||||
var failedSnapshots []SnapshotFile
|
||||
var failedS3Snapshots []SnapshotFile
|
||||
|
||||
// remove entries for this node and s3 (if S3 is enabled) only
|
||||
for k, v := range snapshotConfigMap.Data {
|
||||
var sf SnapshotFile
|
||||
if err := json.Unmarshal([]byte(v), &sf); err != nil {
|
||||
return err
|
||||
}
|
||||
if sf.NodeName == nodeName || sf.NodeName == "s3" {
|
||||
if (sf.NodeName == nodeName || (sf.NodeName == "s3" && s3ListSuccessful)) && sf.Status != FailedSnapshotStatus {
|
||||
// Only delete the snapshot if the snapshot was not failed
|
||||
// sf.Status != FailedSnapshotStatus is intentional, as it is possible we are reconciling snapshots stored from older versions that did not set status
|
||||
deletedSnapshots[generateSnapshotConfigMapKey(sf)] = v // store a copy of the snapshot
|
||||
delete(snapshotConfigMap.Data, k)
|
||||
} else if sf.Status == FailedSnapshotStatus && sf.NodeName == nodeName && e.config.EtcdSnapshotRetention >= 1 {
|
||||
// Handle locally failed snapshots.
|
||||
failedSnapshots = append(failedSnapshots, sf)
|
||||
delete(snapshotConfigMap.Data, k)
|
||||
} else if sf.Status == FailedSnapshotStatus && e.config.EtcdS3 && sf.NodeName == "s3" && strings.HasPrefix(sf.Name, e.config.EtcdSnapshotName+"-"+nodeName) && e.config.EtcdSnapshotRetention >= 1 {
|
||||
// If we're operating against S3, we can clean up failed S3 snapshots that failed on this node.
|
||||
failedS3Snapshots = append(failedS3Snapshots, sf)
|
||||
delete(snapshotConfigMap.Data, k)
|
||||
}
|
||||
}
|
||||
|
||||
// save this node's entries to the ConfigMap
|
||||
for k, v := range data {
|
||||
snapshotConfigMap.Data[k] = v
|
||||
// Apply the failed snapshot retention policy to locally failed snapshots
|
||||
if len(failedSnapshots) > 0 && e.config.EtcdSnapshotRetention >= 1 {
|
||||
sort.Slice(failedSnapshots, func(i, j int) bool {
|
||||
return failedSnapshots[i].Name > failedSnapshots[j].Name
|
||||
})
|
||||
|
||||
var keepCount int
|
||||
if e.config.EtcdSnapshotRetention >= len(failedSnapshots) {
|
||||
keepCount = len(failedSnapshots)
|
||||
} else {
|
||||
keepCount = e.config.EtcdSnapshotRetention
|
||||
}
|
||||
for _, dfs := range failedSnapshots[:keepCount] {
|
||||
sfKey := generateSnapshotConfigMapKey(dfs)
|
||||
marshalledSnapshot, err := json.Marshal(dfs)
|
||||
if err != nil {
|
||||
logrus.Errorf("unable to marshal snapshot to store in configmap %v", err)
|
||||
} else {
|
||||
snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Apply the failed snapshot retention policy to the S3 snapshots
|
||||
if len(failedS3Snapshots) > 0 && e.config.EtcdSnapshotRetention >= 1 {
|
||||
sort.Slice(failedS3Snapshots, func(i, j int) bool {
|
||||
return failedS3Snapshots[i].Name > failedS3Snapshots[j].Name
|
||||
})
|
||||
|
||||
var keepCount int
|
||||
if e.config.EtcdSnapshotRetention >= len(failedS3Snapshots) {
|
||||
keepCount = len(failedS3Snapshots)
|
||||
} else {
|
||||
keepCount = e.config.EtcdSnapshotRetention
|
||||
}
|
||||
for _, dfs := range failedS3Snapshots[:keepCount] {
|
||||
sfKey := generateSnapshotConfigMapKey(dfs)
|
||||
marshalledSnapshot, err := json.Marshal(dfs)
|
||||
if err != nil {
|
||||
logrus.Errorf("unable to marshal snapshot to store in configmap %v", err)
|
||||
} else {
|
||||
snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// save the local entries to the ConfigMap if they are still on disk or in S3.
|
||||
for _, snapshot := range snapshotFiles {
|
||||
var sf SnapshotFile
|
||||
sfKey := generateSnapshotConfigMapKey(snapshot)
|
||||
if v, ok := deletedSnapshots[sfKey]; ok {
|
||||
// use the snapshot file we have from the existing configmap, and unmarshal it so we can manipulate it
|
||||
if err := json.Unmarshal([]byte(v), &sf); err != nil {
|
||||
logrus.Errorf("error unmarshaling snapshot file: %v", err)
|
||||
// use the snapshot with info we sourced from disk/S3 (will be missing metadata, but something is better than nothing)
|
||||
sf = snapshot
|
||||
}
|
||||
} else {
|
||||
sf = snapshot
|
||||
}
|
||||
|
||||
sf.Status = SuccessfulSnapshotStatus // if the snapshot is on disk or in S3, it was successful.
|
||||
|
||||
marshalledSnapshot, err := json.Marshal(sf)
|
||||
if err != nil {
|
||||
logrus.Warnf("unable to marshal snapshot metadata %s to store in configmap, received error: %v", sf.Name, err)
|
||||
} else {
|
||||
snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot)
|
||||
}
|
||||
}
|
||||
|
||||
logrus.Debugf("Updating snapshot ConfigMap (%s) with %d entries", snapshotConfigMapName, len(snapshotConfigMap.Data))
|
||||
_, err = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap)
|
||||
return err
|
||||
})
|
||||
@ -1339,7 +1577,12 @@ func (e *ETCD) Restore(ctx context.Context) error {
|
||||
// snapshotRetention iterates through the snapshots and removes the oldest
|
||||
// leaving the desired number of snapshots.
|
||||
func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) error {
|
||||
if retention < 1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
nodeName := os.Getenv("NODE_NAME")
|
||||
logrus.Infof("Applying local snapshot retention policy: retention: %d, snapshotPrefix: %s, directory: %s", retention, snapshotPrefix+"-"+nodeName, snapshotDir)
|
||||
|
||||
var snapshotFiles []os.FileInfo
|
||||
if err := filepath.Walk(snapshotDir, func(path string, info os.FileInfo, err error) error {
|
||||
@ -1362,7 +1605,9 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string)
|
||||
|
||||
delCount := len(snapshotFiles) - retention
|
||||
for _, df := range snapshotFiles[:delCount] {
|
||||
if err := os.Remove(filepath.Join(snapshotDir, df.Name())); err != nil {
|
||||
snapshotPath := filepath.Join(snapshotDir, df.Name())
|
||||
logrus.Infof("Removing local snapshot %s", snapshotPath)
|
||||
if err := os.Remove(snapshotPath); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -36,7 +36,7 @@ var _ = Describe("etcd snapshots", func() {
|
||||
})
|
||||
It("saves an etcd snapshot", func() {
|
||||
Expect(testutil.K3sCmd("etcd-snapshot", "save")).
|
||||
To(ContainSubstring("Saving current etcd snapshot set to k3s-etcd-snapshots"))
|
||||
To(ContainSubstring("saved"))
|
||||
})
|
||||
It("list snapshots", func() {
|
||||
Expect(testutil.K3sCmd("etcd-snapshot", "ls")).
|
||||
@ -70,13 +70,13 @@ var _ = Describe("etcd snapshots", func() {
|
||||
When("using etcd snapshot prune", func() {
|
||||
It("saves 3 different snapshots", func() {
|
||||
Expect(testutil.K3sCmd("etcd-snapshot", "save", "-name", "PRUNE_TEST")).
|
||||
To(ContainSubstring("Saving current etcd snapshot set to k3s-etcd-snapshots"))
|
||||
To(ContainSubstring("saved"))
|
||||
time.Sleep(1 * time.Second)
|
||||
Expect(testutil.K3sCmd("etcd-snapshot", "save", "-name", "PRUNE_TEST")).
|
||||
To(ContainSubstring("Saving current etcd snapshot set to k3s-etcd-snapshots"))
|
||||
To(ContainSubstring("saved"))
|
||||
time.Sleep(1 * time.Second)
|
||||
Expect(testutil.K3sCmd("etcd-snapshot", "save", "-name", "PRUNE_TEST")).
|
||||
To(ContainSubstring("Saving current etcd snapshot set to k3s-etcd-snapshots"))
|
||||
To(ContainSubstring("saved"))
|
||||
time.Sleep(1 * time.Second)
|
||||
})
|
||||
It("lists all 3 snapshots", func() {
|
||||
@ -89,7 +89,7 @@ var _ = Describe("etcd snapshots", func() {
|
||||
})
|
||||
It("prunes snapshots down to 2", func() {
|
||||
Expect(testutil.K3sCmd("etcd-snapshot", "prune", "--snapshot-retention", "2", "--name", "PRUNE_TEST")).
|
||||
To(BeEmpty())
|
||||
To(ContainSubstring("Removing local snapshot"))
|
||||
lsResult, err := testutil.K3sCmd("etcd-snapshot", "ls")
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
reg, err := regexp.Compile(`:///var/lib/rancher/k3s/server/db/snapshots/PRUNE_TEST`)
|
||||
|
@ -14,12 +14,14 @@ import (
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rancher/k3s/pkg/daemons/config"
|
||||
"github.com/sirupsen/logrus"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
// S3 maintains state for S3 functionality.
|
||||
@ -32,6 +34,9 @@ type S3 struct {
|
||||
// copy of the config.Control pointer and initializes
|
||||
// a new Minio client.
|
||||
func NewS3(ctx context.Context, config *config.Control) (*S3, error) {
|
||||
if config.EtcdS3BucketName == "" {
|
||||
return nil, errors.New("s3 bucket name was not set")
|
||||
}
|
||||
tr := http.DefaultTransport
|
||||
|
||||
switch {
|
||||
@ -88,9 +93,11 @@ func NewS3(ctx context.Context, config *config.Control) (*S3, error) {
|
||||
|
||||
// upload uploads the given snapshot to the configured S3
|
||||
// compatible backend.
|
||||
func (s *S3) upload(ctx context.Context, snapshot string) error {
|
||||
func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now time.Time) (*SnapshotFile, error) {
|
||||
logrus.Infof("Uploading snapshot %s to S3", snapshot)
|
||||
basename := filepath.Base(snapshot)
|
||||
var snapshotFileName string
|
||||
var snapshotFile SnapshotFile
|
||||
if s.config.EtcdS3Folder != "" {
|
||||
snapshotFileName = filepath.Join(s.config.EtcdS3Folder, basename)
|
||||
} else {
|
||||
@ -103,11 +110,56 @@ func (s *S3) upload(ctx context.Context, snapshot string) error {
|
||||
ContentType: "application/zip",
|
||||
NumThreads: 2,
|
||||
}
|
||||
if _, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotFileName, snapshot, opts); err != nil {
|
||||
logrus.Errorf("Error received in attempt to upload snapshot to S3: %s", err)
|
||||
}
|
||||
uploadInfo, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotFileName, snapshot, opts)
|
||||
if err != nil {
|
||||
snapshotFile = SnapshotFile{
|
||||
Name: filepath.Base(uploadInfo.Key),
|
||||
Metadata: extraMetadata,
|
||||
NodeName: "s3",
|
||||
CreatedAt: &metav1.Time{
|
||||
Time: now,
|
||||
},
|
||||
Message: base64.StdEncoding.EncodeToString([]byte(err.Error())),
|
||||
Size: 0,
|
||||
Status: FailedSnapshotStatus,
|
||||
S3: &s3Config{
|
||||
Endpoint: s.config.EtcdS3Endpoint,
|
||||
EndpointCA: s.config.EtcdS3EndpointCA,
|
||||
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify,
|
||||
Bucket: s.config.EtcdS3BucketName,
|
||||
Region: s.config.EtcdS3Region,
|
||||
Folder: s.config.EtcdS3Folder,
|
||||
Insecure: s.config.EtcdS3Insecure,
|
||||
},
|
||||
}
|
||||
logrus.Errorf("Error received during snapshot upload to S3: %s", err)
|
||||
} else {
|
||||
ca, err := time.Parse(time.RFC3339, uploadInfo.LastModified.Format(time.RFC3339))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil
|
||||
snapshotFile = SnapshotFile{
|
||||
Name: filepath.Base(uploadInfo.Key),
|
||||
Metadata: extraMetadata,
|
||||
NodeName: "s3",
|
||||
CreatedAt: &metav1.Time{
|
||||
Time: ca,
|
||||
},
|
||||
Size: uploadInfo.Size,
|
||||
Status: SuccessfulSnapshotStatus,
|
||||
S3: &s3Config{
|
||||
Endpoint: s.config.EtcdS3Endpoint,
|
||||
EndpointCA: s.config.EtcdS3EndpointCA,
|
||||
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify,
|
||||
Bucket: s.config.EtcdS3BucketName,
|
||||
Region: s.config.EtcdS3Region,
|
||||
Folder: s.config.EtcdS3Folder,
|
||||
Insecure: s.config.EtcdS3Insecure,
|
||||
},
|
||||
}
|
||||
}
|
||||
return &snapshotFile, nil
|
||||
}
|
||||
|
||||
// download downloads the given snapshot from the configured S3
|
||||
@ -170,9 +222,13 @@ func (s *S3) snapshotPrefix() string {
|
||||
return prefix
|
||||
}
|
||||
|
||||
// snapshotRetention deletes the given snapshot from the configured S3
|
||||
// compatible backend.
|
||||
// snapshotRetention prunes snapshots in the configured S3 compatible backend for this specific node.
|
||||
func (s *S3) snapshotRetention(ctx context.Context) error {
|
||||
if s.config.EtcdSnapshotRetention < 1 {
|
||||
return nil
|
||||
}
|
||||
logrus.Infof("Applying snapshot retention policy to snapshots stored in S3: retention: %d, snapshotPrefix: %s", s.config.EtcdSnapshotRetention, s.snapshotPrefix())
|
||||
|
||||
var snapshotFiles []minio.ObjectInfo
|
||||
|
||||
toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
|
||||
@ -199,7 +255,7 @@ func (s *S3) snapshotRetention(ctx context.Context) error {
|
||||
|
||||
delCount := len(snapshotFiles) - s.config.EtcdSnapshotRetention
|
||||
for _, df := range snapshotFiles[:delCount] {
|
||||
logrus.Debugf("Removing snapshot: %s", df.Key)
|
||||
logrus.Infof("Removing S3 snapshot: %s", df.Key)
|
||||
if err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, df.Key, minio.RemoveObjectOptions{}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user