k3s/pkg/etcd/s3.go
Brad Davidson 7464007037 Store extra metadata and cluster ID for snapshots
Write the extra metadata both locally and to S3. These files are placed such that they will not be used by older versions of K3s that do not make use of them.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
2023-10-12 15:04:45 -07:00

449 lines
14 KiB
Go

package etcd
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"encoding/pem"
"fmt"
"io/ioutil"
"net/http"
"net/textproto"
"os"
"path"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"time"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/version"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var (
clusterIDKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-cluster-id")
nodeNameKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-node-name")
)
// S3 maintains state for S3 functionality.
type S3 struct {
config *config.Control
client *minio.Client
clusterID string
nodeName string
}
// newS3 creates a new value of type s3 pointer with a
// copy of the config.Control pointer and initializes
// a new Minio client.
func NewS3(ctx context.Context, config *config.Control) (*S3, error) {
if config.EtcdS3BucketName == "" {
return nil, errors.New("s3 bucket name was not set")
}
tr := http.DefaultTransport
switch {
case config.EtcdS3EndpointCA != "":
trCA, err := setTransportCA(tr, config.EtcdS3EndpointCA, config.EtcdS3SkipSSLVerify)
if err != nil {
return nil, err
}
tr = trCA
case config.EtcdS3 && config.EtcdS3SkipSSLVerify:
tr.(*http.Transport).TLSClientConfig = &tls.Config{
InsecureSkipVerify: config.EtcdS3SkipSSLVerify,
}
}
var creds *credentials.Credentials
if len(config.EtcdS3AccessKey) == 0 && len(config.EtcdS3SecretKey) == 0 {
creds = credentials.NewIAM("") // for running on ec2 instance
} else {
creds = credentials.NewStaticV4(config.EtcdS3AccessKey, config.EtcdS3SecretKey, "")
}
opt := minio.Options{
Creds: creds,
Secure: !config.EtcdS3Insecure,
Region: config.EtcdS3Region,
Transport: tr,
BucketLookup: bucketLookupType(config.EtcdS3Endpoint),
}
c, err := minio.New(config.EtcdS3Endpoint, &opt)
if err != nil {
return nil, err
}
logrus.Infof("Checking if S3 bucket %s exists", config.EtcdS3BucketName)
ctx, cancel := context.WithTimeout(ctx, config.EtcdS3Timeout)
defer cancel()
exists, err := c.BucketExists(ctx, config.EtcdS3BucketName)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("bucket %s does not exist", config.EtcdS3BucketName)
}
logrus.Infof("S3 bucket %s exists", config.EtcdS3BucketName)
for config.Runtime.Core == nil {
runtime.Gosched()
}
// cluster id hack: see https://groups.google.com/forum/#!msg/kubernetes-sig-architecture/mVGobfD4TpY/nkdbkX1iBwAJ
var clusterID string
if ns, err := config.Runtime.Core.Core().V1().Namespace().Get(metav1.NamespaceSystem, metav1.GetOptions{}); err != nil {
logrus.Warnf("Failed to set cluster ID: %v", err)
} else {
clusterID = string(ns.UID)
}
return &S3{
config: config,
client: c,
clusterID: clusterID,
nodeName: os.Getenv("NODE_NAME"),
}, nil
}
// upload uploads the given snapshot to the configured S3
// compatible backend.
func (s *S3) upload(ctx context.Context, snapshot string, extraMetadata *v1.ConfigMap, now time.Time) (*snapshotFile, error) {
logrus.Infof("Uploading snapshot to s3://%s/%s", s.config.EtcdS3BucketName, snapshot)
basename := filepath.Base(snapshot)
metadata := filepath.Join(filepath.Dir(snapshot), "..", metadataDir, basename)
snapshotKey := path.Join(s.config.EtcdS3Folder, basename)
metadataKey := path.Join(s.config.EtcdS3Folder, metadataDir, basename)
sf := &snapshotFile{
Name: basename,
Location: fmt.Sprintf("s3://%s/%s", s.config.EtcdS3BucketName, snapshotKey),
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: now,
},
S3: &s3Config{
Endpoint: s.config.EtcdS3Endpoint,
EndpointCA: s.config.EtcdS3EndpointCA,
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify,
Bucket: s.config.EtcdS3BucketName,
Region: s.config.EtcdS3Region,
Folder: s.config.EtcdS3Folder,
Insecure: s.config.EtcdS3Insecure,
},
Compressed: strings.HasSuffix(snapshot, compressedExtension),
metadataSource: extraMetadata,
}
uploadInfo, err := s.uploadSnapshot(ctx, snapshotKey, snapshot)
if err != nil {
sf.Status = failedSnapshotStatus
sf.Message = base64.StdEncoding.EncodeToString([]byte(err.Error()))
} else {
sf.Status = successfulSnapshotStatus
sf.Size = uploadInfo.Size
}
if _, err := s.uploadSnapshotMetadata(ctx, metadataKey, metadata); err != nil {
logrus.Warnf("Failed to upload snapshot metadata to S3: %v", err)
} else {
logrus.Infof("Uploaded snapshot metadata s3://%s/%s", s.config.EtcdS3BucketName, metadata)
}
return sf, err
}
// uploadSnapshot uploads the snapshot file to S3 using the minio API.
func (s *S3) uploadSnapshot(ctx context.Context, key, path string) (info minio.UploadInfo, err error) {
opts := minio.PutObjectOptions{
NumThreads: 2,
UserMetadata: map[string]string{
clusterIDKey: s.clusterID,
nodeNameKey: s.nodeName,
},
}
if strings.HasSuffix(key, compressedExtension) {
opts.ContentType = "application/zip"
} else {
opts.ContentType = "application/octet-stream"
}
ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
defer cancel()
return s.client.FPutObject(ctx, s.config.EtcdS3BucketName, key, path, opts)
}
// uploadSnapshotMetadata marshals and uploads the snapshot metadata to S3 using the minio API.
// The upload is silently skipped if no extra metadata is provided.
func (s *S3) uploadSnapshotMetadata(ctx context.Context, key, path string) (info minio.UploadInfo, err error) {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return minio.UploadInfo{}, nil
}
return minio.UploadInfo{}, err
}
opts := minio.PutObjectOptions{
NumThreads: 2,
ContentType: "application/json",
UserMetadata: map[string]string{
clusterIDKey: s.clusterID,
nodeNameKey: s.nodeName,
},
}
ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
defer cancel()
return s.client.FPutObject(ctx, s.config.EtcdS3BucketName, key, path, opts)
}
// download downloads the given snapshot from the configured S3
// compatible backend.
func (s *S3) Download(ctx context.Context) error {
snapshotKey := path.Join(s.config.EtcdS3Folder, s.config.ClusterResetRestorePath)
metadataKey := path.Join(s.config.EtcdS3Folder, metadataDir, s.config.ClusterResetRestorePath)
snapshotDir, err := snapshotDir(s.config, true)
if err != nil {
return errors.Wrap(err, "failed to get the snapshot dir")
}
snapshotFile := filepath.Join(snapshotDir, s.config.ClusterResetRestorePath)
metadataFile := filepath.Join(snapshotDir, "..", metadataDir, s.config.ClusterResetRestorePath)
logrus.Debugf("Downloading snapshot from s3://%s/%s", s.config.EtcdS3BucketName, snapshotKey)
if err := s.downloadSnapshot(ctx, snapshotKey, snapshotFile); err != nil {
return err
}
if err := s.downloadSnapshotMetadata(ctx, metadataKey, metadataFile); err != nil {
return err
}
s.config.ClusterResetRestorePath = snapshotFile
return nil
}
// downloadSnapshot downloads the snapshot file from S3 using the minio API.
func (s *S3) downloadSnapshot(ctx context.Context, key, file string) error {
ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
defer cancel()
defer os.Chmod(file, 0600)
return s.client.FGetObject(ctx, s.config.EtcdS3BucketName, key, file, minio.GetObjectOptions{})
}
// downloadSnapshotMetadata downloads the snapshot metadata file from S3 using the minio API.
// No error is returned if the metadata file does not exist, as it is optional.
func (s *S3) downloadSnapshotMetadata(ctx context.Context, key, file string) error {
logrus.Debugf("Downloading snapshot metadata from s3://%s/%s", s.config.EtcdS3BucketName, key)
ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
defer cancel()
defer os.Chmod(file, 0600)
err := s.client.FGetObject(ctx, s.config.EtcdS3BucketName, key, file, minio.GetObjectOptions{})
if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound {
return nil
}
return err
}
// snapshotPrefix returns the prefix used in the
// naming of the snapshots.
func (s *S3) snapshotPrefix() string {
return path.Join(s.config.EtcdS3Folder, s.config.EtcdSnapshotName)
}
// snapshotRetention prunes snapshots in the configured S3 compatible backend for this specific node.
func (s *S3) snapshotRetention(ctx context.Context) error {
if s.config.EtcdSnapshotRetention < 1 {
return nil
}
logrus.Infof("Applying snapshot retention=%d to snapshots stored in s3://%s/%s", s.config.EtcdSnapshotRetention, s.config.EtcdS3BucketName, s.snapshotPrefix())
var snapshotFiles []minio.ObjectInfo
toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
defer cancel()
opts := minio.ListObjectsOptions{
Prefix: s.snapshotPrefix(),
Recursive: true,
}
for info := range s.client.ListObjects(toCtx, s.config.EtcdS3BucketName, opts) {
if info.Err != nil {
return info.Err
}
// skip metadata
if path.Base(path.Dir(info.Key)) == metadataDir {
continue
}
snapshotFiles = append(snapshotFiles, info)
}
if len(snapshotFiles) <= s.config.EtcdSnapshotRetention {
return nil
}
// sort newest-first so we can prune entries past the retention count
sort.Slice(snapshotFiles, func(i, j int) bool {
return snapshotFiles[j].LastModified.Before(snapshotFiles[i].LastModified)
})
for _, df := range snapshotFiles[s.config.EtcdSnapshotRetention:] {
logrus.Infof("Removing S3 snapshot: s3://%s/%s", s.config.EtcdS3BucketName, df.Key)
if err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, df.Key, minio.RemoveObjectOptions{}); err != nil {
return err
}
metadataKey := path.Join(path.Dir(df.Key), metadataDir, path.Base(df.Key))
if err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, metadataKey, minio.RemoveObjectOptions{}); err != nil {
if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound {
return nil
}
return err
}
}
return nil
}
// listSnapshots provides a list of currently stored
// snapshots in S3 along with their relevant
// metadata.
func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) {
snapshots := map[string]snapshotFile{}
metadatas := []string{}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
opts := minio.ListObjectsOptions{
Prefix: s.config.EtcdS3Folder,
Recursive: true,
}
objects := s.client.ListObjects(ctx, s.config.EtcdS3BucketName, opts)
for obj := range objects {
if obj.Err != nil {
return nil, obj.Err
}
if obj.Size == 0 {
continue
}
if o, err := s.client.StatObject(ctx, s.config.EtcdS3BucketName, obj.Key, minio.StatObjectOptions{}); err != nil {
logrus.Warnf("Failed to get object metadata: %v", err)
} else {
obj = o
}
filename := path.Base(obj.Key)
if path.Base(path.Dir(obj.Key)) == metadataDir {
metadatas = append(metadatas, obj.Key)
continue
}
basename, compressed := strings.CutSuffix(filename, compressedExtension)
ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64)
if err != nil {
ts = obj.LastModified.Unix()
}
sf := snapshotFile{
Name: filename,
Location: fmt.Sprintf("s3://%s/%s", s.config.EtcdS3BucketName, obj.Key),
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: time.Unix(ts, 0),
},
Size: obj.Size,
S3: &s3Config{
Endpoint: s.config.EtcdS3Endpoint,
EndpointCA: s.config.EtcdS3EndpointCA,
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify,
Bucket: s.config.EtcdS3BucketName,
Region: s.config.EtcdS3Region,
Folder: s.config.EtcdS3Folder,
Insecure: s.config.EtcdS3Insecure,
},
Status: successfulSnapshotStatus,
Compressed: compressed,
}
sfKey := generateSnapshotConfigMapKey(sf)
snapshots[sfKey] = sf
}
for _, metadataKey := range metadatas {
filename := path.Base(metadataKey)
sfKey := generateSnapshotConfigMapKey(snapshotFile{Name: filename, NodeName: "s3"})
if sf, ok := snapshots[sfKey]; ok {
logrus.Debugf("Loading snapshot metadata from s3://%s/%s", s.config.EtcdS3BucketName, metadataKey)
if obj, err := s.client.GetObject(ctx, s.config.EtcdS3BucketName, metadataKey, minio.GetObjectOptions{}); err != nil {
logrus.Warnf("Failed to get snapshot metadata: %v", err)
} else {
if m, err := ioutil.ReadAll(obj); err != nil {
logrus.Warnf("Failed to read snapshot metadata: %v", err)
} else {
sf.Metadata = base64.StdEncoding.EncodeToString(m)
snapshots[sfKey] = sf
}
}
}
}
return snapshots, nil
}
func readS3EndpointCA(endpointCA string) ([]byte, error) {
ca, err := base64.StdEncoding.DecodeString(endpointCA)
if err != nil {
return os.ReadFile(endpointCA)
}
return ca, nil
}
func setTransportCA(tr http.RoundTripper, endpointCA string, insecureSkipVerify bool) (http.RoundTripper, error) {
ca, err := readS3EndpointCA(endpointCA)
if err != nil {
return tr, err
}
if !isValidCertificate(ca) {
return tr, errors.New("endpoint-ca is not a valid x509 certificate")
}
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(ca)
tr.(*http.Transport).TLSClientConfig = &tls.Config{
RootCAs: certPool,
InsecureSkipVerify: insecureSkipVerify,
}
return tr, nil
}
// isValidCertificate checks to see if the given
// byte slice is a valid x509 certificate.
func isValidCertificate(c []byte) bool {
p, _ := pem.Decode(c)
if p == nil {
return false
}
if _, err := x509.ParseCertificates(p.Bytes); err != nil {
return false
}
return true
}
func bucketLookupType(endpoint string) minio.BucketLookupType {
if strings.Contains(endpoint, "aliyun") { // backwards compt with RKE1
return minio.BucketLookupDNS
}
return minio.BucketLookupAuto
}