Move etcd snapshot management CLI to request/response

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
This commit is contained in:
Brad Davidson 2024-03-28 00:48:13 +00:00 committed by Brad Davidson
parent 0792461885
commit fe465cc832
19 changed files with 639 additions and 330 deletions

2
go.mod
View File

@ -151,6 +151,7 @@ require (
k8s.io/api v0.29.3
k8s.io/apimachinery v0.29.3
k8s.io/apiserver v0.29.3
k8s.io/cli-runtime v0.22.2
k8s.io/client-go v11.0.1-0.20190409021438-1a26190bd76a+incompatible
k8s.io/cloud-provider v0.29.3
k8s.io/cluster-bootstrap v0.0.0
@ -485,7 +486,6 @@ require (
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.29.3 // indirect
k8s.io/cli-runtime v0.22.2 // indirect
k8s.io/code-generator v0.29.3 // indirect
k8s.io/controller-manager v0.25.4 // indirect
k8s.io/csi-translation-lib v0.0.0 // indirect

View File

@ -21,6 +21,14 @@ var EtcdSnapshotFlags = []cli.Flag{
Destination: &AgentConfig.NodeName,
},
DataDirFlag,
ServerToken,
&cli.StringFlag{
Name: "server, s",
Usage: "(cluster) Server to connect to",
EnvVar: version.ProgramUpper + "_URL",
Value: "https://127.0.0.1:6443",
Destination: &ServerConfig.ServerURL,
},
&cli.StringFlag{
Name: "dir,etcd-snapshot-dir",
Usage: "(db) Directory to save etcd on-demand snapshot. (default: ${data-dir}/db/snapshots)",

View File

@ -1,143 +1,84 @@
package etcdsnapshot
import (
"context"
"bytes"
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"slices"
"sort"
"strings"
"text/tabwriter"
"time"
"github.com/erikdubbelboer/gspt"
k3s "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
"github.com/k3s-io/k3s/pkg/cli/cmds"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/clientaccess"
"github.com/k3s-io/k3s/pkg/cluster/managed"
"github.com/k3s-io/k3s/pkg/etcd"
"github.com/k3s-io/k3s/pkg/server"
"github.com/k3s-io/k3s/pkg/util"
util2 "github.com/k3s-io/k3s/pkg/util"
"github.com/pkg/errors"
"github.com/rancher/wrangler/pkg/signals"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
"gopkg.in/yaml.v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/printers"
)
type etcdCommand struct {
etcd *etcd.ETCD
ctx context.Context
}
// commandSetup setups up common things needed
// for each etcd command.
func commandSetup(app *cli.Context, cfg *cmds.Server, config *server.Config) (*etcdCommand, error) {
ctx := signals.SetupSignalContext()
gspt.SetProcTitle(os.Args[0])
func commandSetup(app *cli.Context, cfg *cmds.Server) (*etcd.SnapshotRequest, *clientaccess.Info, error) {
// hide process arguments from ps output, since they may contain
// database credentials or other secrets.
gspt.SetProcTitle(os.Args[0] + " etcd-snapshot")
nodeName := app.String("node-name")
if nodeName == "" {
h, err := os.Hostname()
if err != nil {
return nil, err
}
nodeName = h
sr := &etcd.SnapshotRequest{}
// Operation and name are set by the command handler.
// Compression, dir, and retention take the server defaults if not overridden on the CLI.
if app.IsSet("etcd-snapshot-compress") {
sr.Compress = &cfg.EtcdSnapshotCompress
}
if app.IsSet("etcd-snapshot-dir") {
sr.Dir = &cfg.EtcdSnapshotDir
}
if app.IsSet("etcd-snapshot-retention") {
sr.Retention = &cfg.EtcdSnapshotRetention
}
os.Setenv("NODE_NAME", nodeName)
if cfg.EtcdS3 {
sr.S3 = &etcd.SnapshotRequestS3{}
sr.S3.AccessKey = cfg.EtcdS3AccessKey
sr.S3.Bucket = cfg.EtcdS3BucketName
sr.S3.Endpoint = cfg.EtcdS3Endpoint
sr.S3.EndpointCA = cfg.EtcdS3EndpointCA
sr.S3.Folder = cfg.EtcdS3Folder
sr.S3.Insecure = cfg.EtcdS3Insecure
sr.S3.Region = cfg.EtcdS3Region
sr.S3.SecretKey = cfg.EtcdS3SecretKey
sr.S3.SkipSSLVerify = cfg.EtcdS3SkipSSLVerify
sr.S3.Timeout = metav1.Duration{Duration: cfg.EtcdS3Timeout}
}
dataDir, err := server.ResolveDataDir(cfg.DataDir)
if err != nil {
return nil, err
return nil, nil, err
}
config.DisableAgent = true
config.ControlConfig.DataDir = dataDir
config.ControlConfig.BindAddress = cfg.BindAddress
config.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName
config.ControlConfig.EtcdSnapshotDir = cfg.EtcdSnapshotDir
config.ControlConfig.EtcdSnapshotCompress = cfg.EtcdSnapshotCompress
config.ControlConfig.EtcdListFormat = strings.ToLower(cfg.EtcdListFormat)
config.ControlConfig.EtcdS3 = cfg.EtcdS3
config.ControlConfig.EtcdS3Endpoint = cfg.EtcdS3Endpoint
config.ControlConfig.EtcdS3EndpointCA = cfg.EtcdS3EndpointCA
config.ControlConfig.EtcdS3SkipSSLVerify = cfg.EtcdS3SkipSSLVerify
config.ControlConfig.EtcdS3AccessKey = cfg.EtcdS3AccessKey
config.ControlConfig.EtcdS3SecretKey = cfg.EtcdS3SecretKey
config.ControlConfig.EtcdS3BucketName = cfg.EtcdS3BucketName
config.ControlConfig.EtcdS3Region = cfg.EtcdS3Region
config.ControlConfig.EtcdS3Folder = cfg.EtcdS3Folder
config.ControlConfig.EtcdS3Insecure = cfg.EtcdS3Insecure
config.ControlConfig.EtcdS3Timeout = cfg.EtcdS3Timeout
config.ControlConfig.Runtime = daemonconfig.NewRuntime(nil)
config.ControlConfig.Runtime.ETCDServerCA = filepath.Join(dataDir, "tls", "etcd", "server-ca.crt")
config.ControlConfig.Runtime.ClientETCDCert = filepath.Join(dataDir, "tls", "etcd", "client.crt")
config.ControlConfig.Runtime.ClientETCDKey = filepath.Join(dataDir, "tls", "etcd", "client.key")
config.ControlConfig.Runtime.KubeConfigAdmin = filepath.Join(dataDir, "cred", "admin.kubeconfig")
// We need to go through defaulting of cluster addresses to ensure that the etcd config for the standalone
// command uses the same endpoint selection logic as it does when starting up the full server. Specifically,
// we need to set an IPv6 service CIDR on IPv6-only or IPv6-first nodes, as the etcd default endpoints check
// the service CIDR primary addresss family to determine what loopback address to use.
nodeName, nodeIPs, err := util.GetHostnameAndIPs(cmds.AgentConfig.NodeName, cmds.AgentConfig.NodeIP)
if err != nil {
return nil, err
}
config.ControlConfig.ServerNodeName = nodeName
// configure ClusterIPRanges. Use default 10.42.0.0/16 or fd00:42::/56 if user did not set it
_, defaultClusterCIDR, defaultServiceCIDR, _ := util.GetDefaultAddresses(nodeIPs[0])
if len(cfg.ClusterCIDR) == 0 {
cfg.ClusterCIDR.Set(defaultClusterCIDR)
}
for _, cidr := range util.SplitStringSlice(cfg.ClusterCIDR) {
_, parsed, err := net.ParseCIDR(cidr)
if cfg.Token == "" {
fp := filepath.Join(dataDir, "token")
tokenByte, err := os.ReadFile(fp)
if err != nil {
return nil, errors.Wrapf(err, "invalid cluster-cidr %s", cidr)
return nil, nil, err
}
config.ControlConfig.ClusterIPRanges = append(config.ControlConfig.ClusterIPRanges, parsed)
cfg.Token = string(bytes.TrimRight(tokenByte, "\n"))
}
info, err := clientaccess.ParseAndValidateToken(cmds.ServerConfig.ServerURL, cfg.Token, clientaccess.WithUser("server"))
return sr, info, err
}
// set ClusterIPRange to the first address (first defined IPFamily is preferred)
config.ControlConfig.ClusterIPRange = config.ControlConfig.ClusterIPRanges[0]
// configure ServiceIPRanges. Use default 10.43.0.0/16 or fd00:43::/112 if user did not set it
if len(cfg.ServiceCIDR) == 0 {
cfg.ServiceCIDR.Set(defaultServiceCIDR)
}
for _, cidr := range util.SplitStringSlice(cfg.ServiceCIDR) {
_, parsed, err := net.ParseCIDR(cidr)
if err != nil {
return nil, errors.Wrapf(err, "invalid service-cidr %s", cidr)
}
config.ControlConfig.ServiceIPRanges = append(config.ControlConfig.ServiceIPRanges, parsed)
}
// set ServiceIPRange to the first address (first defined IPFamily is preferred)
config.ControlConfig.ServiceIPRange = config.ControlConfig.ServiceIPRanges[0]
e := etcd.NewETCD()
if err := e.SetControlConfig(&config.ControlConfig); err != nil {
return nil, err
}
initialized, err := e.IsInitialized()
if err != nil {
return nil, err
}
if !initialized {
return nil, fmt.Errorf("etcd database not found in %s", config.ControlConfig.DataDir)
}
sc, err := server.NewContext(ctx, config, false)
if err != nil {
return nil, err
}
config.ControlConfig.Runtime.K3s = sc.K3s
config.ControlConfig.Runtime.Core = sc.Core
return &etcdCommand{etcd: e, ctx: ctx}, nil
func wrapServerError(err error) error {
return errors.Wrap(err, "see server log for details")
}
// Save triggers an on-demand etcd snapshot operation
@ -149,20 +90,40 @@ func Save(app *cli.Context) error {
}
func save(app *cli.Context, cfg *cmds.Server) error {
var serverConfig server.Config
if len(app.Args()) > 0 {
return util2.ErrCommandNoArgs
}
ec, err := commandSetup(app, cfg, &serverConfig)
// Save always sets retention to 0 to disable automatic pruning.
// Prune can be run manually after save, if desired.
app.Set("etcd-snapshot-retention", "0")
sr, info, err := commandSetup(app, cfg)
if err != nil {
return err
}
serverConfig.ControlConfig.EtcdSnapshotRetention = 0 // disable retention check
sr.Operation = etcd.SnapshotOperationSave
sr.Name = []string{cfg.EtcdSnapshotName}
return ec.etcd.Snapshot(ec.ctx)
b, err := json.Marshal(sr)
if err != nil {
return err
}
r, err := info.Post("/db/snapshot", b)
if err != nil {
return wrapServerError(err)
}
resp := &managed.SnapshotResult{}
if err := json.Unmarshal(r, resp); err != nil {
return err
}
for _, name := range resp.Created {
logrus.Infof("Snapshot %s saved.", name)
}
return nil
}
func Delete(app *cli.Context) error {
@ -173,19 +134,42 @@ func Delete(app *cli.Context) error {
}
func delete(app *cli.Context, cfg *cmds.Server) error {
var serverConfig server.Config
ec, err := commandSetup(app, cfg, &serverConfig)
if err != nil {
return err
}
snapshots := app.Args()
if len(snapshots) == 0 {
return errors.New("no snapshots given for removal")
}
return ec.etcd.DeleteSnapshots(ec.ctx, app.Args())
sr, info, err := commandSetup(app, cfg)
if err != nil {
return err
}
sr.Operation = etcd.SnapshotOperationDelete
sr.Name = snapshots
b, err := json.Marshal(sr)
if err != nil {
return err
}
r, err := info.Post("/db/snapshot", b)
if err != nil {
return wrapServerError(err)
}
resp := &managed.SnapshotResult{}
if err := json.Unmarshal(r, resp); err != nil {
return err
}
for _, name := range resp.Deleted {
logrus.Infof("Snapshot %s deleted.", name)
}
for _, name := range snapshots {
if !slices.Contains(resp.Deleted, name) {
logrus.Warnf("Snapshot %s not found.", name)
}
}
return nil
}
func List(app *cli.Context) error {
@ -207,30 +191,48 @@ func validEtcdListFormat(format string) bool {
}
func list(app *cli.Context, cfg *cmds.Server) error {
var serverConfig server.Config
ec, err := commandSetup(app, cfg, &serverConfig)
if err != nil {
return err
}
sf, err := ec.etcd.ListSnapshots(ec.ctx)
if err != nil {
return err
}
if cfg.EtcdListFormat != "" && !validEtcdListFormat(cfg.EtcdListFormat) {
return errors.New("invalid output format: " + cfg.EtcdListFormat)
}
sr, info, err := commandSetup(app, cfg)
if err != nil {
return err
}
sr.Operation = etcd.SnapshotOperationList
b, err := json.Marshal(sr)
if err != nil {
return err
}
r, err := info.Post("/db/snapshot", b)
if err != nil {
return wrapServerError(err)
}
sf := &k3s.ETCDSnapshotFileList{}
if err := json.Unmarshal(r, sf); err != nil {
return err
}
sort.Slice(sf.Items, func(i, j int) bool {
if sf.Items[i].Status.CreationTime.Equal(sf.Items[j].Status.CreationTime) {
return sf.Items[i].Spec.SnapshotName < sf.Items[j].Spec.SnapshotName
}
return sf.Items[i].Status.CreationTime.Before(sf.Items[j].Status.CreationTime)
})
switch cfg.EtcdListFormat {
case "json":
if err := json.NewEncoder(os.Stdout).Encode(sf); err != nil {
json := printers.JSONPrinter{}
if err := json.PrintObj(sf, os.Stdout); err != nil {
return err
}
return nil
case "yaml":
if err := yaml.NewEncoder(os.Stdout).Encode(sf); err != nil {
yaml := printers.YAMLPrinter{}
if err := yaml.PrintObj(sf, os.Stdout); err != nil {
return err
}
return nil
@ -238,23 +240,9 @@ func list(app *cli.Context, cfg *cmds.Server) error {
w := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', 0)
defer w.Flush()
// Sort snapshots by creation time and key
sfKeys := make([]string, 0, len(sf))
for k := range sf {
sfKeys = append(sfKeys, k)
}
sort.Slice(sfKeys, func(i, j int) bool {
iKey := sfKeys[i]
jKey := sfKeys[j]
if sf[iKey].CreatedAt.Equal(sf[jKey].CreatedAt) {
return iKey < jKey
}
return sf[iKey].CreatedAt.Before(sf[jKey].CreatedAt)
})
fmt.Fprint(w, "Name\tLocation\tSize\tCreated\n")
for _, k := range sfKeys {
fmt.Fprintf(w, "%s\t%s\t%d\t%s\n", sf[k].Name, sf[k].Location, sf[k].Size, sf[k].CreatedAt.Format(time.RFC3339))
for _, esf := range sf.Items {
fmt.Fprintf(w, "%s\t%s\t%d\t%s\n", esf.Spec.SnapshotName, esf.Spec.Location, esf.Status.Size.Value(), esf.Status.CreationTime.Format(time.RFC3339))
}
}
@ -269,14 +257,30 @@ func Prune(app *cli.Context) error {
}
func prune(app *cli.Context, cfg *cmds.Server) error {
var serverConfig server.Config
ec, err := commandSetup(app, cfg, &serverConfig)
sr, info, err := commandSetup(app, cfg)
if err != nil {
return err
}
serverConfig.ControlConfig.EtcdSnapshotRetention = cfg.EtcdSnapshotRetention
sr.Operation = etcd.SnapshotOperationPrune
sr.Name = []string{cfg.EtcdSnapshotName}
return ec.etcd.PruneSnapshots(ec.ctx)
b, err := json.Marshal(sr)
if err != nil {
return err
}
r, err := info.Post("/db/snapshot", b)
if err != nil {
return wrapServerError(err)
}
resp := &managed.SnapshotResult{}
if err := json.Unmarshal(r, resp); err != nil {
return err
}
for _, name := range resp.Deleted {
logrus.Infof("Snapshot %s deleted.", name)
}
return nil
}

View File

@ -66,7 +66,6 @@ func Enable(app *cli.Context) error {
}
func Disable(app *cli.Context) error {
if err := cmds.InitLogging(); err != nil {
return err
}

View File

@ -6,6 +6,7 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
@ -18,6 +19,9 @@ import (
"github.com/pkg/errors"
certutil "github.com/rancher/dynamiclistener/cert"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/net"
)
const (
@ -291,7 +295,6 @@ func (i *Info) Get(path string, option ...ClientOption) ([]byte, error) {
// Put makes a request to a subpath of info's BaseURL
func (i *Info) Put(path string, body []byte, option ...ClientOption) error {
u, err := url.Parse(i.BaseURL)
if err != nil {
return err
@ -305,6 +308,21 @@ func (i *Info) Put(path string, body []byte, option ...ClientOption) error {
return put(p.String(), body, GetHTTPClient(i.CACerts, i.CertFile, i.KeyFile, option...), i.Username, i.Password, i.Token())
}
// Post makes a request to a subpath of info's BaseURL
func (i *Info) Post(path string, body []byte, option ...ClientOption) ([]byte, error) {
u, err := url.Parse(i.BaseURL)
if err != nil {
return nil, err
}
p, err := url.Parse(path)
if err != nil {
return nil, err
}
p.Scheme = u.Scheme
p.Host = u.Host
return post(p.String(), body, GetHTTPClient(i.CACerts, i.CertFile, i.KeyFile, option...), i.Username, i.Password, i.Token())
}
// setServer sets the BaseURL and CACerts fields of the Info by connecting to the server
// and storing the CA bundle.
func (i *Info) setServer(server string) error {
@ -400,13 +418,8 @@ func get(u string, client *http.Client, username, password, token string) ([]byt
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
return nil, fmt.Errorf("%s: %s", u, resp.Status)
}
return io.ReadAll(resp.Body)
return readBody(resp)
}
// put makes a request to a url using a provided client and credentials,
@ -427,14 +440,59 @@ func put(u string, body []byte, client *http.Client, username, password, token s
if err != nil {
return err
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode < 200 || resp.StatusCode > 299 {
return fmt.Errorf("%s: %s %s", u, resp.Status, string(respBody))
_, err = readBody(resp)
return err
}
// post makes a request to a url using a provided client and credentials,
// returning the response body and error.
func post(u string, body []byte, client *http.Client, username, password, token string) ([]byte, error) {
req, err := http.NewRequest(http.MethodPost, u, bytes.NewBuffer(body))
if err != nil {
return nil, err
}
return nil
if token != "" {
req.Header.Add("Authorization", "Bearer "+token)
} else if username != "" {
req.SetBasicAuth(username, password)
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
return readBody(resp)
}
// readBody attempts to get the body from the response. If the response status
// code is not in the 2XX range, an error is returned. An attempt is made to
// decode the error body as a metav1.Status and return a StatusError, if
// possible.
func readBody(resp *http.Response) ([]byte, error) {
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
warnings, _ := net.ParseWarningHeaders(resp.Header["Warning"])
for _, warning := range warnings {
if warning.Code == 299 && len(warning.Text) != 0 {
logrus.Warnf(warning.Text)
}
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
status := metav1.Status{}
if err := json.Unmarshal(b, &status); err == nil && status.Kind == "Status" {
return nil, &apierrors.StatusError{ErrStatus: status}
}
return nil, fmt.Errorf("%s: %s", resp.Request.URL, resp.Status)
}
return b, nil
}
// FormatToken takes a username:password string or join token, and a path to a certificate bundle, and

View File

@ -23,7 +23,7 @@ type Driver interface {
Test(ctx context.Context) error
Restore(ctx context.Context) error
EndpointName() string
Snapshot(ctx context.Context) error
Snapshot(ctx context.Context) (*SnapshotResult, error)
ReconcileSnapshotData(ctx context.Context) error
GetMembersClientURLs(ctx context.Context) ([]string, error)
RemoveSelf(ctx context.Context) error
@ -40,3 +40,10 @@ func Registered() []Driver {
func Default() Driver {
return drivers[0]
}
// SnapshotResult is returned by the Snapshot function,
// and lists the names of created and deleted snapshots.
type SnapshotResult struct {
Created []string `json:"created,omitempty"`
Deleted []string `json:"deleted,omitempty"`
}

View File

@ -48,7 +48,7 @@ func Test_UnitMustParse(t *testing.T) {
name: "Etcd-snapshot with config with known and unknown flags",
args: []string{"k3s", "etcd-snapshot", "save"},
config: "./testdata/defaultdata.yaml",
want: []string{"k3s", "etcd-snapshot", "save", "--etcd-s3=true", "--etcd-s3-bucket=my-backup"},
want: []string{"k3s", "etcd-snapshot", "save", "--token=12345", "--etcd-s3=true", "--etcd-s3-bucket=my-backup"},
},
{
name: "Agent with known flags",

View File

@ -25,6 +25,7 @@ import (
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/control/deps"
"github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/server/auth"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/k3s-io/kine/pkg/client"
@ -664,7 +665,9 @@ func (e *ETCD) setName(force bool) error {
// handler wraps the handler with routes for database info
func (e *ETCD) handler(next http.Handler) http.Handler {
mux := mux.NewRouter().SkipClean(true)
mux.Use(auth.Middleware(e.config, version.Program+":server"))
mux.Handle("/db/info", e.infoHandler())
mux.Handle("/db/snapshot", e.snapshotHandler())
mux.NotFoundHandler = next
return mux
}
@ -673,6 +676,11 @@ func (e *ETCD) handler(next http.Handler) http.Handler {
// If we can't retrieve an actual MemberList from etcd, we return a canned response with only the local node listed.
func (e *ETCD) infoHandler() http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodGet {
util.SendError(fmt.Errorf("method not allowed"), rw, req, http.StatusMethodNotAllowed)
return
}
ctx, cancel := context.WithTimeout(req.Context(), 2*time.Second)
defer cancel()

View File

@ -93,7 +93,7 @@ func NewS3(ctx context.Context, config *config.Control) (*S3, error) {
exists, err := c.BucketExists(ctx, config.EtcdS3BucketName)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "failed to test for existence of bucket %s", config.EtcdS3BucketName)
}
if !exists {
return nil, fmt.Errorf("bucket %s does not exist", config.EtcdS3BucketName)
@ -280,9 +280,10 @@ func (s *S3) snapshotPrefix() string {
}
// snapshotRetention prunes snapshots in the configured S3 compatible backend for this specific node.
func (s *S3) snapshotRetention(ctx context.Context) error {
// Returns a list of pruned snapshot names.
func (s *S3) snapshotRetention(ctx context.Context) ([]string, error) {
if s.config.EtcdSnapshotRetention < 1 {
return nil
return nil, nil
}
logrus.Infof("Applying snapshot retention=%d to snapshots stored in s3://%s/%s", s.config.EtcdSnapshotRetention, s.config.EtcdS3BucketName, s.snapshotPrefix())
@ -297,7 +298,7 @@ func (s *S3) snapshotRetention(ctx context.Context) error {
}
for info := range s.client.ListObjects(toCtx, s.config.EtcdS3BucketName, opts) {
if info.Err != nil {
return info.Err
return nil, info.Err
}
// skip metadata
@ -309,7 +310,7 @@ func (s *S3) snapshotRetention(ctx context.Context) error {
}
if len(snapshotFiles) <= s.config.EtcdSnapshotRetention {
return nil
return nil, nil
}
// sort newest-first so we can prune entries past the retention count
@ -317,21 +318,16 @@ func (s *S3) snapshotRetention(ctx context.Context) error {
return snapshotFiles[j].LastModified.Before(snapshotFiles[i].LastModified)
})
deleted := []string{}
for _, df := range snapshotFiles[s.config.EtcdSnapshotRetention:] {
logrus.Infof("Removing S3 snapshot: s3://%s/%s", s.config.EtcdS3BucketName, df.Key)
if err := s.client.RemoveObject(toCtx, s.config.EtcdS3BucketName, df.Key, minio.RemoveObjectOptions{}); err != nil {
return err
}
metadataKey := path.Join(path.Dir(df.Key), metadataDir, path.Base(df.Key))
if err := s.client.RemoveObject(toCtx, s.config.EtcdS3BucketName, metadataKey, minio.RemoveObjectOptions{}); err != nil {
if isNotExist(err) {
return nil
}
return err
if err := s.deleteSnapshot(ctx, df.Key); err != nil {
return deleted, err
}
deleted = append(deleted, df.Key)
}
return nil
return deleted, nil
}
func (s *S3) deleteSnapshot(ctx context.Context, key string) error {

View File

@ -19,7 +19,8 @@ import (
"strings"
"time"
apisv1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
k3s "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
"github.com/k3s-io/k3s/pkg/cluster/managed"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
@ -27,10 +28,7 @@ import (
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/etcdutl/v3/snapshot"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
@ -204,14 +202,17 @@ func (e *ETCD) decompressSnapshot(snapshotDir, snapshotFile string) (string, err
}
// Snapshot attempts to save a new snapshot to the configured directory, and then clean up any old and failed
// snapshots in excess of the retention limits. This method is used in the internal cron snapshot
// system as well as used to do on-demand snapshots.
func (e *ETCD) Snapshot(ctx context.Context) error {
// snapshots in excess of the retention limits. Note that one snapshot request may result in creation and pruning
// of multiple snapshots, if S3 is enabled.
// Note that the prune step is generally disabled when snapshotting from the CLI, as there is a separate
// subcommand for prune that can be run manually if the user wants to remove old snapshots.
// Returns metadata about the new and pruned snapshots.
func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) {
if err := e.preSnapshotSetup(ctx); err != nil {
return err
return nil, err
}
if !e.snapshotSem.TryAcquire(maxConcurrentSnapshots) {
return fmt.Errorf("%d snapshots already in progress", maxConcurrentSnapshots)
return nil, fmt.Errorf("%d snapshots already in progress", maxConcurrentSnapshots)
}
defer e.snapshotSem.Release(maxConcurrentSnapshots)
@ -230,61 +231,40 @@ func (e *ETCD) Snapshot(ctx context.Context) error {
}
endpoints := getEndpoints(e.config)
var client *clientv3.Client
var err error
// Use the internal client if possible, or create a new one
// if run from the CLI.
if e.client != nil {
client = e.client
} else {
client, err = getClient(ctx, e.config, endpoints...)
if err != nil {
return err
}
defer client.Close()
}
status, err := client.Status(ctx, endpoints[0])
status, err := e.client.Status(ctx, endpoints[0])
if err != nil {
return errors.Wrap(err, "failed to check etcd status for snapshot")
return nil, errors.Wrap(err, "failed to check etcd status for snapshot")
}
if status.IsLearner {
logrus.Warnf("Unable to take snapshot: not supported for learner")
return nil
return nil, nil
}
snapshotDir, err := snapshotDir(e.config, true)
if err != nil {
return errors.Wrap(err, "failed to get etcd-snapshot-dir")
return nil, errors.Wrap(err, "failed to get etcd-snapshot-dir")
}
cfg, err := getClientConfig(ctx, e.config)
if err != nil {
return errors.Wrap(err, "failed to get config for etcd snapshot")
return nil, errors.Wrap(err, "failed to get config for etcd snapshot")
}
tokenHash, err := util.GetTokenHash(e.config)
if err != nil {
return errors.Wrap(err, "failed to get server token hash for etcd snapshot")
return nil, errors.Wrap(err, "failed to get server token hash for etcd snapshot")
}
nodeName := os.Getenv("NODE_NAME")
now := time.Now().Round(time.Second)
snapshotName := fmt.Sprintf("%s-%s-%d", e.config.EtcdSnapshotName, nodeName, now.Unix())
snapshotPath := filepath.Join(snapshotDir, snapshotName)
logrus.Infof("Saving etcd snapshot to %s", snapshotPath)
var sf *snapshotFile
lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
if err != nil {
return err
}
if err := snapshot.NewV3(lg).Save(ctx, *cfg, snapshotPath); err != nil {
if err := snapshot.NewV3(e.client.GetLogger()).Save(ctx, *cfg, snapshotPath); err != nil {
sf = &snapshotFile{
Name: snapshotName,
Location: "",
@ -299,19 +279,23 @@ func (e *ETCD) Snapshot(ctx context.Context) error {
}
logrus.Errorf("Failed to take etcd snapshot: %v", err)
if err := e.addSnapshotData(*sf); err != nil {
return errors.Wrap(err, "failed to sync ETCDSnapshotFile")
return nil, errors.Wrap(err, "failed to sync ETCDSnapshotFile")
}
}
res := &managed.SnapshotResult{}
// If the snapshot attempt was successful, sf will be nil as we did not set it to store the error message.
if sf == nil {
if e.config.EtcdSnapshotCompress {
zipPath, err := e.compressSnapshot(snapshotDir, snapshotName, snapshotPath, now)
if err != nil {
return errors.Wrap(err, "failed to compress snapshot")
// ensure that the unncompressed snapshot is cleaned up even if compression fails
if err := os.Remove(snapshotPath); err != nil && !os.IsNotExist(err) {
logrus.Warnf("Failed to remove uncompress snapshot file: %v", err)
}
if err := os.Remove(snapshotPath); err != nil {
return errors.Wrap(err, "failed to remove uncompressed snapshot")
if err != nil {
return nil, errors.Wrap(err, "failed to compress snapshot")
}
snapshotPath = zipPath
logrus.Info("Compressed snapshot: " + snapshotPath)
@ -319,8 +303,9 @@ func (e *ETCD) Snapshot(ctx context.Context) error {
f, err := os.Stat(snapshotPath)
if err != nil {
return errors.Wrap(err, "unable to retrieve snapshot information from local snapshot")
return nil, errors.Wrap(err, "unable to retrieve snapshot information from local snapshot")
}
sf = &snapshotFile{
Name: f.Name(),
Location: "file://" + snapshotPath,
@ -334,24 +319,31 @@ func (e *ETCD) Snapshot(ctx context.Context) error {
metadataSource: extraMetadata,
tokenHash: tokenHash,
}
res.Created = append(res.Created, sf.Name)
// Failing to save snapshot metadata is not fatal, the snapshot can still be used without it.
if err := saveSnapshotMetadata(snapshotPath, extraMetadata); err != nil {
return errors.Wrap(err, "failed to save local snapshot metadata")
logrus.Warnf("Failed to save local snapshot metadata: %v", err)
}
// If this fails, just log an error - the snapshot file will remain on disk
// and will be recorded next time the snapshot list is reconciled.
if err := e.addSnapshotData(*sf); err != nil {
return errors.Wrap(err, "failed to sync ETCDSnapshotFile")
logrus.Warnf("Failed to sync ETCDSnapshotFile: %v", err)
}
if err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir); err != nil {
return errors.Wrap(err, "failed to apply local snapshot retention policy")
// Snapshot retention may prune some files before returning an error. Failing to prune is not fatal.
deleted, err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir)
if err != nil {
logrus.Warnf("Failed to apply local snapshot retention policy: %v", err)
}
res.Deleted = append(res.Deleted, deleted...)
if e.config.EtcdS3 {
if err := e.initS3IfNil(ctx); err != nil {
logrus.Warnf("Unable to initialize S3 client: %v", err)
sf = &snapshotFile{
Name: filepath.Base(snapshotPath),
Name: f.Name(),
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: now,
@ -378,22 +370,28 @@ func (e *ETCD) Snapshot(ctx context.Context) error {
if err != nil {
logrus.Errorf("Error received during snapshot upload to S3: %s", err)
} else {
res.Created = append(res.Created, sf.Name)
logrus.Infof("S3 upload complete for %s", snapshotName)
}
// Attempt to apply retention even if the upload failed; failure may be due to bucket
// being full or some other condition that retention policy would resolve.
if err := e.s3.snapshotRetention(ctx); err != nil {
logrus.Errorf("Failed to apply s3 snapshot retention policy: %v", err)
// Snapshot retention may prune some files before returning an error. Failing to prune is not fatal.
deleted, err := e.s3.snapshotRetention(ctx)
res.Deleted = append(res.Deleted, deleted...)
if err != nil {
logrus.Warnf("Failed to apply s3 snapshot retention policy: %v", err)
}
}
// sf is either s3 snapshot metadata, or s3 failure record
// sf is either s3 snapshot metadata, or s3 init/upload failure record.
// If this fails, just log an error - the snapshot file will remain on s3
// and will be recorded next time the snapshot list is reconciled.
if err := e.addSnapshotData(*sf); err != nil {
return errors.Wrap(err, "failed to sync ETCDSnapshotFile")
logrus.Warnf("Failed to sync ETCDSnapshotFile: %v", err)
}
}
}
return e.ReconcileSnapshotData(ctx)
return res, e.ReconcileSnapshotData(ctx)
}
type s3Config struct {
@ -492,7 +490,7 @@ func (e *ETCD) listLocalSnapshots() (map[string]snapshotFile, error) {
// initS3IfNil initializes the S3 client
// if it hasn't yet been initialized.
func (e *ETCD) initS3IfNil(ctx context.Context) error {
if e.s3 == nil {
if e.config.EtcdS3 && e.s3 == nil {
s3, err := NewS3(ctx, e.config)
if err != nil {
return err
@ -503,15 +501,20 @@ func (e *ETCD) initS3IfNil(ctx context.Context) error {
return nil
}
// PruneSnapshots performs a retention run with the given
// retention duration and removes expired snapshots.
func (e *ETCD) PruneSnapshots(ctx context.Context) error {
// PruneSnapshots deleted old snapshots in excess of the configured retention count.
// Returns a list of deleted snapshots. Note that snapshots may be deleted
// with a non-nil error return.
func (e *ETCD) PruneSnapshots(ctx context.Context) (*managed.SnapshotResult, error) {
snapshotDir, err := snapshotDir(e.config, false)
if err != nil {
return errors.Wrap(err, "failed to get etcd-snapshot-dir")
return nil, errors.Wrap(err, "failed to get etcd-snapshot-dir")
}
if err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir); err != nil {
res := &managed.SnapshotResult{}
// Note that snapshotRetention functions may return a list of deleted files, as well as
// an error, if some snapshots are deleted before the error is encountered.
res.Deleted, err = snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir)
if err != nil {
logrus.Errorf("Error applying snapshot retention policy: %v", err)
}
@ -519,18 +522,24 @@ func (e *ETCD) PruneSnapshots(ctx context.Context) error {
if err := e.initS3IfNil(ctx); err != nil {
logrus.Warnf("Unable to initialize S3 client: %v", err)
} else {
if err := e.s3.snapshotRetention(ctx); err != nil {
deleted, err := e.s3.snapshotRetention(ctx)
if err != nil {
logrus.Errorf("Error applying S3 snapshot retention policy: %v", err)
}
res.Deleted = append(res.Deleted, deleted...)
}
}
return e.ReconcileSnapshotData(ctx)
return res, e.ReconcileSnapshotData(ctx)
}
// ListSnapshots is an exported wrapper method that wraps an
// unexported method of the same name.
func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]snapshotFile, error) {
snapshotFiles := map[string]snapshotFile{}
// ListSnapshots returns a list of snapshots. Local snapshots are always listed,
// s3 snapshots are listed if s3 is enabled.
// Snapshots are listed locally, not listed from the apiserver, so results
// are guaranteed to be in sync with what is on disk.
func (e *ETCD) ListSnapshots(ctx context.Context) (*k3s.ETCDSnapshotFileList, error) {
snapshotFiles := &k3s.ETCDSnapshotFileList{
TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "List"},
}
if e.config.EtcdS3 {
if err := e.initS3IfNil(ctx); err != nil {
logrus.Warnf("Unable to initialize S3 client: %v", err)
@ -540,7 +549,11 @@ func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]snapshotFile, erro
if err != nil {
return nil, err
}
snapshotFiles = sfs
for k, sf := range sfs {
esf := k3s.NewETCDSnapshotFile("", k, k3s.ETCDSnapshotFile{})
sf.toETCDSnapshotFile(esf)
snapshotFiles.Items = append(snapshotFiles.Items, *esf)
}
}
sfs, err := e.listLocalSnapshots()
@ -548,25 +561,30 @@ func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]snapshotFile, erro
return nil, err
}
for k, sf := range sfs {
snapshotFiles[k] = sf
esf := k3s.NewETCDSnapshotFile("", k, k3s.ETCDSnapshotFile{})
sf.toETCDSnapshotFile(esf)
snapshotFiles.Items = append(snapshotFiles.Items, *esf)
}
return snapshotFiles, err
return snapshotFiles, nil
}
// DeleteSnapshots removes the given snapshots from local storage and S3.
func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error {
// Returns a list of deleted snapshots. Note that snapshots may be deleted
// with a non-nil error return.
func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) (*managed.SnapshotResult, error) {
snapshotDir, err := snapshotDir(e.config, false)
if err != nil {
return errors.Wrap(err, "failed to get etcd-snapshot-dir")
return nil, errors.Wrap(err, "failed to get etcd-snapshot-dir")
}
if e.config.EtcdS3 {
if err := e.initS3IfNil(ctx); err != nil {
logrus.Warnf("Unable to initialize S3 client: %v", err)
return err
return nil, err
}
}
res := &managed.SnapshotResult{}
for _, s := range snapshots {
if err := e.deleteSnapshot(filepath.Join(snapshotDir, s)); err != nil {
if isNotExist(err) {
@ -575,6 +593,7 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error {
logrus.Errorf("Failed to delete local snapshot %s: %v", s, err)
}
} else {
res.Deleted = append(res.Deleted, s)
logrus.Infof("Snapshot %s deleted locally", s)
}
@ -586,12 +605,13 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error {
logrus.Errorf("Failed to delete S3 snapshot %s: %v", s, err)
}
} else {
res.Deleted = append(res.Deleted, s)
logrus.Infof("Snapshot %s deleted from S3", s)
}
}
}
return e.ReconcileSnapshotData(ctx)
return res, e.ReconcileSnapshotData(ctx)
}
func (e *ETCD) deleteSnapshot(snapshotPath string) error {
@ -631,7 +651,7 @@ func (e *ETCD) addSnapshotData(sf snapshotFile) error {
snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile()
esfName := generateSnapshotName(sf)
var esf *apisv1.ETCDSnapshotFile
var esf *k3s.ETCDSnapshotFile
return retry.OnError(snapshotDataBackoff, func(err error) bool {
return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err)
}, func() (err error) {
@ -641,7 +661,7 @@ func (e *ETCD) addSnapshotData(sf snapshotFile) error {
if !apierrors.IsNotFound(err) {
return err
}
esf = &apisv1.ETCDSnapshotFile{
esf = &k3s.ETCDSnapshotFile{
ObjectMeta: metav1.ObjectMeta{
Name: esfName,
},
@ -654,7 +674,7 @@ func (e *ETCD) addSnapshotData(sf snapshotFile) error {
// create or update as necessary
if esf.CreationTimestamp.IsZero() {
var created *apisv1.ETCDSnapshotFile
var created *k3s.ETCDSnapshotFile
created, err = snapshots.Create(esf)
if err == nil {
// Only emit an event for the snapshot when creating the resource
@ -707,7 +727,7 @@ func generateSnapshotName(sf snapshotFile) string {
// generateETCDSnapshotFileConfigMapKey generates a key that the corresponding
// snapshotFile would be stored under in the legacy configmap
func generateETCDSnapshotFileConfigMapKey(esf apisv1.ETCDSnapshotFile) string {
func generateETCDSnapshotFileConfigMapKey(esf k3s.ETCDSnapshotFile) string {
name := invalidKeyChars.ReplaceAllString(esf.Spec.SnapshotName, "_")
if esf.Spec.S3 != nil {
return "s3-" + name
@ -715,7 +735,7 @@ func generateETCDSnapshotFileConfigMapKey(esf apisv1.ETCDSnapshotFile) string {
return "local-" + name
}
func (e *ETCD) emitEvent(esf *apisv1.ETCDSnapshotFile) {
func (e *ETCD) emitEvent(esf *k3s.ETCDSnapshotFile) {
switch {
case e.config.Runtime.Event == nil:
case !esf.DeletionTimestamp.IsZero():
@ -923,17 +943,17 @@ func (e *ETCD) setSnapshotFunction(ctx context.Context) {
// having all the nodes take a snapshot at the exact same time can lead to excessive retry thrashing
// when updating the snapshot list configmap.
time.Sleep(time.Duration(rand.Float64() * float64(snapshotJitterMax)))
if err := e.Snapshot(ctx); err != nil {
if _, err := e.Snapshot(ctx); err != nil {
logrus.Errorf("Failed to take scheduled snapshot: %v", err)
}
})))
}
// snapshotRetention iterates through the snapshots and removes the oldest
// leaving the desired number of snapshots.
func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) error {
// leaving the desired number of snapshots. Returns a list of pruned snapshot names.
func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) ([]string, error) {
if retention < 1 {
return nil
return nil, nil
}
logrus.Infof("Applying snapshot retention=%d to local snapshots with prefix %s in %s", retention, snapshotPrefix, snapshotDir)
@ -953,10 +973,10 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string)
}
return nil
}); err != nil {
return err
return nil, err
}
if len(snapshotFiles) <= retention {
return nil
return nil, nil
}
// sort newest-first so we can prune entries past the retention count
@ -964,19 +984,21 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string)
return snapshotFiles[j].CreatedAt.Before(snapshotFiles[i].CreatedAt)
})
deleted := []string{}
for _, df := range snapshotFiles[retention:] {
snapshotPath := filepath.Join(snapshotDir, df.Name)
metadataPath := filepath.Join(snapshotDir, "..", metadataDir, df.Name)
logrus.Infof("Removing local snapshot %s", snapshotPath)
if err := os.Remove(snapshotPath); err != nil {
return err
return deleted, err
}
if err := os.Remove(metadataPath); err != nil && !os.IsNotExist(err) {
return err
return deleted, err
}
deleted = append(deleted, df.Name)
}
return nil
return deleted, nil
}
func isNotExist(err error) bool {
@ -1007,7 +1029,7 @@ func saveSnapshotMetadata(snapshotPath string, extraMetadata *v1.ConfigMap) erro
return os.WriteFile(metadataPath, m, 0700)
}
func (sf *snapshotFile) fromETCDSnapshotFile(esf *apisv1.ETCDSnapshotFile) {
func (sf *snapshotFile) fromETCDSnapshotFile(esf *k3s.ETCDSnapshotFile) {
if esf == nil {
panic("cannot convert from nil ETCDSnapshotFile")
}
@ -1067,7 +1089,7 @@ func (sf *snapshotFile) fromETCDSnapshotFile(esf *apisv1.ETCDSnapshotFile) {
}
}
func (sf *snapshotFile) toETCDSnapshotFile(esf *apisv1.ETCDSnapshotFile) {
func (sf *snapshotFile) toETCDSnapshotFile(esf *k3s.ETCDSnapshotFile) {
if esf == nil {
panic("cannot convert to nil ETCDSnapshotFile")
}
@ -1092,7 +1114,7 @@ func (sf *snapshotFile) toETCDSnapshotFile(esf *apisv1.ETCDSnapshotFile) {
} else {
message = string(b)
}
esf.Status.Error = &apisv1.ETCDSnapshotError{
esf.Status.Error = &k3s.ETCDSnapshotError{
Time: sf.CreatedAt,
Message: &message,
}
@ -1127,7 +1149,7 @@ func (sf *snapshotFile) toETCDSnapshotFile(esf *apisv1.ETCDSnapshotFile) {
esf.ObjectMeta.Labels[labelStorageNode] = esf.Spec.NodeName
} else {
esf.ObjectMeta.Labels[labelStorageNode] = "s3"
esf.Spec.S3 = &apisv1.ETCDSnapshotS3{
esf.Spec.S3 = &k3s.ETCDSnapshotS3{
Endpoint: sf.S3.Endpoint,
EndpointCA: sf.S3.EndpointCA,
SkipSSLVerify: sf.S3.SkipSSLVerify,

View File

@ -0,0 +1,221 @@
package etcd
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
k3s "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
"github.com/k3s-io/k3s/pkg/cluster/managed"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/util"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type SnapshotOperation string
const (
SnapshotOperationSave SnapshotOperation = "save"
SnapshotOperationList SnapshotOperation = "list"
SnapshotOperationPrune SnapshotOperation = "prune"
SnapshotOperationDelete SnapshotOperation = "delete"
)
type SnapshotRequestS3 struct {
s3Config
Timeout metav1.Duration `json:"timeout"`
AccessKey string `json:"accessKey"`
SecretKey string `json:"secretKey"`
}
type SnapshotRequest struct {
Operation SnapshotOperation `json:"operation"`
Name []string `json:"name,omitempty"`
Dir *string `json:"dir,omitempty"`
Compress *bool `json:"compress,omitempty"`
Retention *int `json:"retention,omitempty"`
S3 *SnapshotRequestS3 `json:"s3,omitempty"`
ctx context.Context
}
func (sr *SnapshotRequest) context() context.Context {
if sr.ctx != nil {
return sr.ctx
}
return context.Background()
}
// snapshotHandler handles snapshot save/list/prune requests from the CLI.
func (e *ETCD) snapshotHandler() http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
sr, err := getSnapshotRequest(req)
if err != nil {
util.SendErrorWithID(err, "etcd-snapshot", rw, req, http.StatusInternalServerError)
}
switch sr.Operation {
case SnapshotOperationList:
err = e.withRequest(sr).handleList(rw, req)
case SnapshotOperationSave:
err = e.withRequest(sr).handleSave(rw, req)
case SnapshotOperationPrune:
err = e.withRequest(sr).handlePrune(rw, req)
case SnapshotOperationDelete:
err = e.withRequest(sr).handleDelete(rw, req, sr.Name)
default:
err = e.handleInvalid(rw, req)
}
if err != nil {
logrus.Warnf("Error in etcd-snapshot handler: %v", err)
}
})
}
func (e *ETCD) handleList(rw http.ResponseWriter, req *http.Request) error {
if err := e.initS3IfNil(req.Context()); err != nil {
util.SendError(err, rw, req, http.StatusBadRequest)
return nil
}
sf, err := e.ListSnapshots(req.Context())
if sf == nil {
util.SendErrorWithID(err, "etcd-snapshot", rw, req, http.StatusInternalServerError)
return nil
}
sendSnapshotList(rw, req, sf)
return err
}
func (e *ETCD) handleSave(rw http.ResponseWriter, req *http.Request) error {
if err := e.initS3IfNil(req.Context()); err != nil {
util.SendError(err, rw, req, http.StatusBadRequest)
return nil
}
sr, err := e.Snapshot(req.Context())
if sr == nil {
util.SendErrorWithID(err, "etcd-snapshot", rw, req, http.StatusInternalServerError)
return nil
}
sendSnapshotResponse(rw, req, sr)
return err
}
func (e *ETCD) handlePrune(rw http.ResponseWriter, req *http.Request) error {
if err := e.initS3IfNil(req.Context()); err != nil {
util.SendError(err, rw, req, http.StatusBadRequest)
return nil
}
sr, err := e.PruneSnapshots(req.Context())
if sr == nil {
util.SendError(err, rw, req, http.StatusInternalServerError)
return nil
}
sendSnapshotResponse(rw, req, sr)
return err
}
func (e *ETCD) handleDelete(rw http.ResponseWriter, req *http.Request, snapshots []string) error {
if err := e.initS3IfNil(req.Context()); err != nil {
util.SendError(err, rw, req, http.StatusBadRequest)
return nil
}
sr, err := e.DeleteSnapshots(req.Context(), snapshots)
if sr == nil {
util.SendError(err, rw, req, http.StatusInternalServerError)
return nil
}
sendSnapshotResponse(rw, req, sr)
return err
}
func (e *ETCD) handleInvalid(rw http.ResponseWriter, req *http.Request) error {
util.SendErrorWithID(fmt.Errorf("invalid snapshot operation"), "etcd-snapshot", rw, req, http.StatusBadRequest)
return nil
}
// withRequest returns a modified ETCD struct that is overriden
// with etcd snapshot config from the snapshot request.
func (e *ETCD) withRequest(sr *SnapshotRequest) *ETCD {
re := &ETCD{
client: e.client,
config: &config.Control{
CriticalControlArgs: e.config.CriticalControlArgs,
Runtime: e.config.Runtime,
DataDir: e.config.DataDir,
Datastore: e.config.Datastore,
EtcdSnapshotCompress: e.config.EtcdSnapshotCompress,
EtcdSnapshotName: e.config.EtcdSnapshotName,
EtcdSnapshotRetention: e.config.EtcdSnapshotRetention,
},
name: e.name,
address: e.address,
cron: e.cron,
cancel: e.cancel,
snapshotSem: e.snapshotSem,
}
if len(sr.Name) > 0 {
re.config.EtcdSnapshotName = sr.Name[0]
}
if sr.Compress != nil {
re.config.EtcdSnapshotCompress = *sr.Compress
}
if sr.Dir != nil {
re.config.EtcdSnapshotDir = *sr.Dir
}
if sr.Retention != nil {
re.config.EtcdSnapshotRetention = *sr.Retention
}
if sr.S3 != nil {
re.config.EtcdS3 = true
re.config.EtcdS3BucketName = sr.S3.Bucket
re.config.EtcdS3AccessKey = sr.S3.AccessKey
re.config.EtcdS3SecretKey = sr.S3.SecretKey
re.config.EtcdS3Endpoint = sr.S3.Endpoint
re.config.EtcdS3EndpointCA = sr.S3.EndpointCA
re.config.EtcdS3SkipSSLVerify = sr.S3.SkipSSLVerify
re.config.EtcdS3Insecure = sr.S3.Insecure
re.config.EtcdS3Region = sr.S3.Region
re.config.EtcdS3Timeout = sr.S3.Timeout.Duration
}
return re
}
// getSnapshotRequest unmarshalls the snapshot operation request from a client.
func getSnapshotRequest(req *http.Request) (*SnapshotRequest, error) {
if req.Method != http.MethodPost {
return nil, http.ErrNotSupported
}
sr := &SnapshotRequest{}
b, err := io.ReadAll(req.Body)
if err != nil {
return nil, err
}
if err := json.Unmarshal(b, &sr); err != nil {
return nil, err
}
sr.ctx = req.Context()
return sr, nil
}
func sendSnapshotResponse(rw http.ResponseWriter, req *http.Request, sr *managed.SnapshotResult) {
b, err := json.Marshal(sr)
if err != nil {
util.SendErrorWithID(err, "etcd-snapshot", rw, req, http.StatusInternalServerError)
return
}
rw.Header().Set("Content-Type", "application/json")
rw.Write(b)
}
func sendSnapshotList(rw http.ResponseWriter, req *http.Request, sf *k3s.ETCDSnapshotFileList) {
b, err := json.Marshal(sf)
if err != nil {
util.SendErrorWithID(err, "etcd-snapshot", rw, req, http.StatusInternalServerError)
return
}
rw.Header().Set("Content-Type", "application/json")
rw.Write(b)
}

View File

@ -1,4 +1,4 @@
package server
package auth
import (
"net/http"
@ -51,7 +51,7 @@ func doAuth(roles []string, serverConfig *config.Control, next http.Handler, rw
next.ServeHTTP(rw, req)
}
func authMiddleware(serverConfig *config.Control, roles ...string) mux.MiddlewareFunc {
func Middleware(serverConfig *config.Control, roles ...string) mux.MiddlewareFunc {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
doAuth(roles, serverConfig, next, rw, req)

View File

@ -19,6 +19,7 @@ import (
"github.com/k3s-io/k3s/pkg/cluster"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/control/deps"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/pkg/errors"
certutil "github.com/rancher/dynamiclistener/cert"
@ -35,7 +36,7 @@ func caCertReplaceHandler(server *config.Control) http.HandlerFunc {
}
force, _ := strconv.ParseBool(req.FormValue("force"))
if err := caCertReplace(server, req.Body, force); err != nil {
genErrorMessage(resp, http.StatusInternalServerError, err, "certificate")
util.SendErrorWithID(err, "certificate", resp, req, http.StatusInternalServerError)
return
}
logrus.Infof("certificate: Cluster Certificate Authority data has been updated, %s must be restarted.", version.Program)

View File

@ -20,6 +20,7 @@ import (
"github.com/k3s-io/k3s/pkg/cli/cmds"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/nodepassword"
"github.com/k3s-io/k3s/pkg/server/auth"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/pkg/errors"
@ -51,7 +52,7 @@ func router(ctx context.Context, config *Config, cfg *cmds.Server) http.Handler
prefix := "/v1-" + version.Program
authed := mux.NewRouter().SkipClean(true)
authed.Use(authMiddleware(serverConfig, version.Program+":agent", user.NodesGroup, bootstrapapi.BootstrapDefaultGroup))
authed.Use(auth.Middleware(serverConfig, version.Program+":agent", user.NodesGroup, bootstrapapi.BootstrapDefaultGroup))
authed.Path(prefix + "/serving-kubelet.crt").Handler(servingKubeletCert(serverConfig, serverConfig.Runtime.ServingKubeletKey, nodeAuth))
authed.Path(prefix + "/client-kubelet.crt").Handler(clientKubeletCert(serverConfig, serverConfig.Runtime.ClientKubeletKey, nodeAuth))
authed.Path(prefix + "/client-kube-proxy.crt").Handler(fileHandler(serverConfig.Runtime.ClientKubeProxyCert, serverConfig.Runtime.ClientKubeProxyKey))
@ -70,23 +71,22 @@ func router(ctx context.Context, config *Config, cfg *cmds.Server) http.Handler
nodeAuthed := mux.NewRouter().SkipClean(true)
nodeAuthed.NotFoundHandler = authed
nodeAuthed.Use(authMiddleware(serverConfig, user.NodesGroup))
nodeAuthed.Use(auth.Middleware(serverConfig, user.NodesGroup))
nodeAuthed.Path(prefix + "/connect").Handler(serverConfig.Runtime.Tunnel)
serverAuthed := mux.NewRouter().SkipClean(true)
serverAuthed.NotFoundHandler = nodeAuthed
serverAuthed.Use(authMiddleware(serverConfig, version.Program+":server"))
serverAuthed.Use(auth.Middleware(serverConfig, version.Program+":server"))
serverAuthed.Path(prefix + "/encrypt/status").Handler(encryptionStatusHandler(serverConfig))
serverAuthed.Path(prefix + "/encrypt/config").Handler(encryptionConfigHandler(ctx, serverConfig))
serverAuthed.Path(prefix + "/cert/cacerts").Handler(caCertReplaceHandler(serverConfig))
serverAuthed.Path("/db/info").Handler(nodeAuthed)
serverAuthed.Path(prefix + "/server-bootstrap").Handler(bootstrapHandler(serverConfig.Runtime))
serverAuthed.Path(prefix + "/token").Handler(tokenRequestHandler(ctx, serverConfig))
systemAuthed := mux.NewRouter().SkipClean(true)
systemAuthed.NotFoundHandler = serverAuthed
systemAuthed.MethodNotAllowedHandler = serverAuthed
systemAuthed.Use(authMiddleware(serverConfig, user.SystemPrivilegedGroup))
systemAuthed.Use(auth.Middleware(serverConfig, user.SystemPrivilegedGroup))
systemAuthed.Methods(http.MethodConnect).Handler(serverConfig.Runtime.Tunnel)
staticDir := filepath.Join(serverConfig.DataDir, "static")

View File

@ -7,7 +7,6 @@ import (
"encoding/json"
"fmt"
"io"
"math/big"
"net/http"
"os"
"strings"
@ -45,12 +44,12 @@ type EncryptionRequest struct {
Skip bool `json:"skip"`
}
func getEncryptionRequest(req *http.Request) (EncryptionRequest, error) {
func getEncryptionRequest(req *http.Request) (*EncryptionRequest, error) {
b, err := io.ReadAll(req.Body)
if err != nil {
return EncryptionRequest{}, err
return nil, err
}
result := EncryptionRequest{}
result := &EncryptionRequest{}
err = json.Unmarshal(b, &result)
return result, err
}
@ -63,14 +62,15 @@ func encryptionStatusHandler(server *config.Control) http.Handler {
}
status, err := encryptionStatus(server)
if err != nil {
genErrorMessage(resp, http.StatusInternalServerError, err, "secrets-encrypt")
util.SendErrorWithID(err, "secret-encrypt", resp, req, http.StatusInternalServerError)
return
}
b, err := json.Marshal(status)
if err != nil {
genErrorMessage(resp, http.StatusInternalServerError, err, "secrets-encrypt")
util.SendErrorWithID(err, "secret-encrypt", resp, req, http.StatusInternalServerError)
return
}
resp.Header().Set("Content-Type", "application/json")
resp.Write(b)
})
}
@ -192,7 +192,7 @@ func encryptionConfigHandler(ctx context.Context, server *config.Control) http.H
}
if err != nil {
genErrorMessage(resp, http.StatusBadRequest, err, "secrets-encrypt")
util.SendErrorWithID(err, "secret-encrypt", resp, req, http.StatusBadRequest)
return
}
// If a user kills the k3s server immediately after this call, we run into issues where the files
@ -464,18 +464,3 @@ func verifyEncryptionHashAnnotation(runtime *config.ControlRuntime, core core.In
return nil
}
// genErrorMessage sends and logs a random error ID so that logs can be correlated
// between the REST API (which does not provide any detailed error output, to avoid
// information disclosure) and the server logs.
func genErrorMessage(resp http.ResponseWriter, statusCode int, passedErr error, component string) {
errID, err := rand.Int(rand.Reader, big.NewInt(99999))
if err != nil {
resp.WriteHeader(http.StatusInternalServerError)
resp.Write([]byte(err.Error()))
return
}
logrus.Warnf("%s error ID %05d: %s", component, errID, passedErr.Error())
resp.WriteHeader(statusCode)
resp.Write([]byte(fmt.Sprintf("%s error ID %05d", component, errID)))
}

View File

@ -45,7 +45,7 @@ func tokenRequestHandler(ctx context.Context, server *config.Control) http.Handl
return
}
if err = tokenRotate(ctx, server, *sTokenReq.NewToken); err != nil {
genErrorMessage(resp, http.StatusInternalServerError, err, "token")
util.SendErrorWithID(err, "token", resp, req, http.StatusInternalServerError)
return
}
resp.WriteHeader(http.StatusOK)

View File

@ -1,6 +1,9 @@
package util
import (
"crypto/rand"
"fmt"
"math/big"
"net/http"
"github.com/k3s-io/k3s/pkg/generated/clientset/versioned/scheme"
@ -14,6 +17,15 @@ import (
var ErrNotReady = errors.New("apiserver not ready")
// SendErrorWithID sends and logs a random error ID so that logs can be correlated
// between the REST API (which does not provide any detailed error output, to avoid
// information disclosure) and the server logs.
func SendErrorWithID(err error, component string, resp http.ResponseWriter, req *http.Request, status ...int) {
errID, _ := rand.Int(rand.Reader, big.NewInt(99999))
logrus.Errorf("%s error ID %05d: %v", component, errID, err)
SendError(fmt.Errorf("%s error ID %05d", component, errID), resp, req, status...)
}
// SendError sends a properly formatted error response
func SendError(err error, resp http.ResponseWriter, req *http.Request, status ...int) {
var code int

View File

@ -89,21 +89,18 @@ var _ = Describe("Verify Create", Ordered, func() {
It("save s3 snapshot", func() {
res, err := e2e.RunCmdOnNode("k3s etcd-snapshot save", serverNodeNames[0])
Expect(err).NotTo(HaveOccurred())
Expect(res).To(ContainSubstring("S3 bucket test exists"))
Expect(res).To(ContainSubstring("Uploading snapshot"))
Expect(res).To(ContainSubstring("S3 upload complete for"))
Expect(res).To(ContainSubstring("Snapshot on-demand-server-0"))
})
It("lists saved s3 snapshot", func() {
res, err := e2e.RunCmdOnNode("k3s etcd-snapshot list", serverNodeNames[0])
Expect(err).NotTo(HaveOccurred())
Expect(res).To(ContainSubstring("on-demand-server-0"))
Expect(res).To(ContainSubstring("file:///var/lib/rancher/k3s/server/db/snapshots/on-demand-server-0"))
})
It("save 3 more s3 snapshots", func() {
for _, i := range []string{"1", "2", "3"} {
res, err := e2e.RunCmdOnNode("k3s etcd-snapshot save --name special-"+i, serverNodeNames[0])
Expect(err).NotTo(HaveOccurred())
Expect(res).To(ContainSubstring("Uploading snapshot"))
Expect(res).To(ContainSubstring("S3 upload complete for special-" + i))
Expect(res).To(ContainSubstring("Snapshot special-" + i + "-server-0"))
}
})
It("lists saved s3 snapshot", func() {
@ -117,18 +114,12 @@ var _ = Describe("Verify Create", Ordered, func() {
It("delete first on-demand s3 snapshot", func() {
_, err := e2e.RunCmdOnNode("sudo k3s etcd-snapshot ls >> ./snapshotname.txt", serverNodeNames[0])
Expect(err).NotTo(HaveOccurred())
snapshotName, err := e2e.RunCmdOnNode("grep -Eo 'on-demand-server-0-([0-9]+)' ./snapshotname.txt |head -1", serverNodeNames[0])
snapshotName, err := e2e.RunCmdOnNode("grep -Eo 'on-demand-server-0-([0-9]+)' ./snapshotname.txt | head -1", serverNodeNames[0])
Expect(err).NotTo(HaveOccurred())
res, err := e2e.RunCmdOnNode("sudo k3s etcd-snapshot delete "+snapshotName, serverNodeNames[0])
Expect(err).NotTo(HaveOccurred())
Expect(res).To(ContainSubstring("Reconciling ETCDSnapshotFile resources"))
Expect(res).To(ContainSubstring("Snapshot " + strings.TrimSpace(snapshotName) + " deleted from S3"))
Expect(res).To(ContainSubstring("Reconciliation of ETCDSnapshotFile resources complete"))
Expect(res).To(ContainSubstring("Snapshot " + strings.TrimSpace(snapshotName) + " deleted"))
})
// TODO, there is currently a bug that prevents pruning on s3 snapshots that are not prefixed with "on-demand"
// https://github.com/rancher/rke2/issues/3714
// Once fixed, ensure that the snapshots list are actually reduced to 2
It("prunes s3 snapshots", func() {
_, err := e2e.RunCmdOnNode("k3s etcd-snapshot save", serverNodeNames[0])
Expect(err).NotTo(HaveOccurred())
@ -136,11 +127,8 @@ var _ = Describe("Verify Create", Ordered, func() {
Expect(err).NotTo(HaveOccurred())
res, err := e2e.RunCmdOnNode("k3s etcd-snapshot prune --snapshot-retention 2", serverNodeNames[0])
Expect(err).NotTo(HaveOccurred())
Expect(res).To(ContainSubstring("Reconciliation of ETCDSnapshotFile resources complete"))
_, err = e2e.RunCmdOnNode("k3s etcd-snapshot ls|grep 'on-demand'|wc -l>count", serverNodeNames[0])
Expect(err).NotTo(HaveOccurred())
res, err = e2e.RunCmdOnNode("grep '^[4]$' ./count", serverNodeNames[0])
// There should now be 4 on-demand snapshots - 2 local, and 2 on s3
res, err = e2e.RunCmdOnNode("k3s etcd-snapshot ls 2>/dev/null | grep on-demand | wc -l", serverNodeNames[0])
Expect(err).NotTo(HaveOccurred())
Expect(strings.TrimSpace(res)).To(Equal("4"))
})

View File

@ -58,13 +58,13 @@ var _ = Describe("etcd snapshots", Ordered, func() {
Expect(err).ToNot(HaveOccurred())
snapshotName := reg.FindString(lsResult)
Expect(testutil.K3sCmd("etcd-snapshot", "delete", snapshotName)).
To(ContainSubstring("Snapshot " + snapshotName + " deleted locally"))
To(ContainSubstring("Snapshot " + snapshotName + " deleted"))
})
})
When("saving a custom name", func() {
It("saves an etcd snapshot with a custom name", func() {
Expect(testutil.K3sCmd("etcd-snapshot", "save --name ALIVEBEEF")).
To(ContainSubstring("Saving etcd snapshot to /var/lib/rancher/k3s/server/db/snapshots/ALIVEBEEF"))
To(ContainSubstring("Snapshot ALIVEBEEF-"))
})
It("deletes that snapshot", func() {
lsResult, err := testutil.K3sCmd("etcd-snapshot", "ls")
@ -73,7 +73,7 @@ var _ = Describe("etcd snapshots", Ordered, func() {
Expect(err).ToNot(HaveOccurred())
snapshotName := reg.FindString(lsResult)
Expect(testutil.K3sCmd("etcd-snapshot", "delete", snapshotName)).
To(ContainSubstring("Snapshot " + snapshotName + " deleted locally"))
To(ContainSubstring("Snapshot " + snapshotName + " deleted"))
})
})
When("using etcd snapshot prune", func() {
@ -98,7 +98,7 @@ var _ = Describe("etcd snapshots", Ordered, func() {
})
It("prunes snapshots down to 2", func() {
Expect(testutil.K3sCmd("etcd-snapshot", "prune --snapshot-retention 2 --name PRUNE_TEST")).
To(ContainSubstring("Removing local snapshot"))
To(ContainSubstring(" deleted."))
lsResult, err := testutil.K3sCmd("etcd-snapshot", "ls")
Expect(err).ToNot(HaveOccurred())
reg, err := regexp.Compile(`(?m):///var/lib/rancher/k3s/server/db/snapshots/PRUNE_TEST`)
@ -113,7 +113,7 @@ var _ = Describe("etcd snapshots", Ordered, func() {
Expect(err).ToNot(HaveOccurred())
for _, snapshotName := range reg.FindAllString(lsResult, -1) {
Expect(testutil.K3sCmd("etcd-snapshot", "delete", snapshotName)).
To(ContainSubstring("Snapshot " + snapshotName + " deleted locally"))
To(ContainSubstring("Snapshot " + snapshotName + " deleted"))
}
})
})