2021-03-03 18:14:12 +00:00
package etcd
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"encoding/pem"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"sort"
"strings"
2021-11-29 18:30:04 +00:00
"time"
2021-03-03 18:14:12 +00:00
2022-03-02 23:47:27 +00:00
"github.com/k3s-io/k3s/pkg/daemons/config"
2021-03-03 18:14:12 +00:00
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
2021-11-29 18:30:04 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2021-03-03 18:14:12 +00:00
)
2021-06-30 20:29:03 +00:00
// S3 maintains state for S3 functionality.
type S3 struct {
2021-03-03 18:14:12 +00:00
config * config . Control
client * minio . Client
}
// newS3 creates a new value of type s3 pointer with a
// copy of the config.Control pointer and initializes
// a new Minio client.
2021-06-30 20:29:03 +00:00
func NewS3 ( ctx context . Context , config * config . Control ) ( * S3 , error ) {
2021-11-29 18:30:04 +00:00
if config . EtcdS3BucketName == "" {
return nil , errors . New ( "s3 bucket name was not set" )
}
2021-03-03 18:14:12 +00:00
tr := http . DefaultTransport
2021-09-28 17:13:50 +00:00
switch {
case config . EtcdS3EndpointCA != "" :
2021-03-03 18:14:12 +00:00
trCA , err := setTransportCA ( tr , config . EtcdS3EndpointCA , config . EtcdS3SkipSSLVerify )
if err != nil {
return nil , err
}
tr = trCA
2021-09-28 17:13:50 +00:00
case config . EtcdS3 && config . EtcdS3SkipSSLVerify :
tr . ( * http . Transport ) . TLSClientConfig = & tls . Config {
InsecureSkipVerify : config . EtcdS3SkipSSLVerify ,
}
2021-03-03 18:14:12 +00:00
}
var creds * credentials . Credentials
if len ( config . EtcdS3AccessKey ) == 0 && len ( config . EtcdS3SecretKey ) == 0 {
creds = credentials . NewIAM ( "" ) // for running on ec2 instance
} else {
creds = credentials . NewStaticV4 ( config . EtcdS3AccessKey , config . EtcdS3SecretKey , "" )
}
opt := minio . Options {
Creds : creds ,
2021-09-05 15:56:15 +00:00
Secure : ! config . EtcdS3Insecure ,
2021-03-03 18:14:12 +00:00
Region : config . EtcdS3Region ,
Transport : tr ,
BucketLookup : bucketLookupType ( config . EtcdS3Endpoint ) ,
}
c , err := minio . New ( config . EtcdS3Endpoint , & opt )
if err != nil {
return nil , err
}
logrus . Infof ( "Checking if S3 bucket %s exists" , config . EtcdS3BucketName )
2021-05-07 23:10:04 +00:00
2021-10-15 17:24:14 +00:00
ctx , cancel := context . WithTimeout ( ctx , config . EtcdS3Timeout )
2021-05-07 23:10:04 +00:00
defer cancel ( )
2021-03-03 18:14:12 +00:00
exists , err := c . BucketExists ( ctx , config . EtcdS3BucketName )
if err != nil {
return nil , err
}
if ! exists {
return nil , fmt . Errorf ( "bucket: %s does not exist" , config . EtcdS3BucketName )
}
logrus . Infof ( "S3 bucket %s exists" , config . EtcdS3BucketName )
2021-06-30 20:29:03 +00:00
return & S3 {
2021-03-03 18:14:12 +00:00
config : config ,
client : c ,
} , nil
}
// upload uploads the given snapshot to the configured S3
// compatible backend.
2022-01-14 17:31:22 +00:00
func ( s * S3 ) upload ( ctx context . Context , snapshot , extraMetadata string , now time . Time ) ( * snapshotFile , error ) {
2021-11-29 18:30:04 +00:00
logrus . Infof ( "Uploading snapshot %s to S3" , snapshot )
2021-03-03 18:14:12 +00:00
basename := filepath . Base ( snapshot )
var snapshotFileName string
2022-01-14 17:31:22 +00:00
var sf snapshotFile
2021-03-03 18:14:12 +00:00
if s . config . EtcdS3Folder != "" {
snapshotFileName = filepath . Join ( s . config . EtcdS3Folder , basename )
} else {
snapshotFileName = basename
}
2021-10-15 17:24:14 +00:00
toCtx , cancel := context . WithTimeout ( ctx , s . config . EtcdS3Timeout )
2021-05-07 23:10:04 +00:00
defer cancel ( )
2022-07-09 01:27:05 +00:00
opts := minio . PutObjectOptions { NumThreads : 2 }
if strings . HasSuffix ( snapshot , compressedExtension ) {
opts . ContentType = "application/zip"
} else {
opts . ContentType = "application/octet-stream"
2021-03-03 18:14:12 +00:00
}
2021-11-29 18:30:04 +00:00
uploadInfo , err := s . client . FPutObject ( toCtx , s . config . EtcdS3BucketName , snapshotFileName , snapshot , opts )
if err != nil {
2022-01-14 17:31:22 +00:00
sf = snapshotFile {
2021-11-29 18:30:04 +00:00
Name : filepath . Base ( uploadInfo . Key ) ,
Metadata : extraMetadata ,
NodeName : "s3" ,
CreatedAt : & metav1 . Time {
Time : now ,
} ,
Message : base64 . StdEncoding . EncodeToString ( [ ] byte ( err . Error ( ) ) ) ,
Size : 0 ,
2022-01-14 17:31:22 +00:00
Status : failedSnapshotStatus ,
2021-11-29 18:30:04 +00:00
S3 : & s3Config {
Endpoint : s . config . EtcdS3Endpoint ,
EndpointCA : s . config . EtcdS3EndpointCA ,
SkipSSLVerify : s . config . EtcdS3SkipSSLVerify ,
Bucket : s . config . EtcdS3BucketName ,
Region : s . config . EtcdS3Region ,
Folder : s . config . EtcdS3Folder ,
Insecure : s . config . EtcdS3Insecure ,
} ,
}
logrus . Errorf ( "Error received during snapshot upload to S3: %s" , err )
} else {
ca , err := time . Parse ( time . RFC3339 , uploadInfo . LastModified . Format ( time . RFC3339 ) )
if err != nil {
return nil , err
}
2021-03-03 18:14:12 +00:00
2022-01-14 17:31:22 +00:00
sf = snapshotFile {
2021-11-29 18:30:04 +00:00
Name : filepath . Base ( uploadInfo . Key ) ,
Metadata : extraMetadata ,
NodeName : "s3" ,
CreatedAt : & metav1 . Time {
Time : ca ,
} ,
Size : uploadInfo . Size ,
2022-01-14 17:31:22 +00:00
Status : successfulSnapshotStatus ,
2021-11-29 18:30:04 +00:00
S3 : & s3Config {
Endpoint : s . config . EtcdS3Endpoint ,
EndpointCA : s . config . EtcdS3EndpointCA ,
SkipSSLVerify : s . config . EtcdS3SkipSSLVerify ,
Bucket : s . config . EtcdS3BucketName ,
Region : s . config . EtcdS3Region ,
Folder : s . config . EtcdS3Folder ,
Insecure : s . config . EtcdS3Insecure ,
} ,
}
}
2022-01-14 17:31:22 +00:00
return & sf , nil
2021-03-03 18:14:12 +00:00
}
// download downloads the given snapshot from the configured S3
// compatible backend.
2021-06-30 20:29:03 +00:00
func ( s * S3 ) Download ( ctx context . Context ) error {
2021-03-03 18:14:12 +00:00
var remotePath string
if s . config . EtcdS3Folder != "" {
remotePath = filepath . Join ( s . config . EtcdS3Folder , s . config . ClusterResetRestorePath )
} else {
remotePath = s . config . ClusterResetRestorePath
}
logrus . Debugf ( "retrieving snapshot: %s" , remotePath )
2021-10-15 17:24:14 +00:00
toCtx , cancel := context . WithTimeout ( ctx , s . config . EtcdS3Timeout )
2021-05-07 23:10:04 +00:00
defer cancel ( )
r , err := s . client . GetObject ( toCtx , s . config . EtcdS3BucketName , remotePath , minio . GetObjectOptions { } )
2021-03-03 18:14:12 +00:00
if err != nil {
return nil
}
defer r . Close ( )
2021-08-09 16:04:18 +00:00
snapshotDir , err := snapshotDir ( s . config , true )
2021-03-03 18:14:12 +00:00
if err != nil {
return errors . Wrap ( err , "failed to get the snapshot dir" )
}
fullSnapshotPath := filepath . Join ( snapshotDir , s . config . ClusterResetRestorePath )
sf , err := os . Create ( fullSnapshotPath )
if err != nil {
return err
}
defer sf . Close ( )
stat , err := r . Stat ( )
if err != nil {
return err
}
if _ , err := io . CopyN ( sf , r , stat . Size ) ; err != nil {
return err
}
s . config . ClusterResetRestorePath = fullSnapshotPath
return os . Chmod ( fullSnapshotPath , 0600 )
}
2021-05-01 01:26:39 +00:00
// snapshotPrefix returns the prefix used in the
// naming of the snapshots.
2021-06-30 20:29:03 +00:00
func ( s * S3 ) snapshotPrefix ( ) string {
2023-08-03 13:54:40 +00:00
fullSnapshotPrefix := s . config . EtcdSnapshotName
2021-03-03 18:14:12 +00:00
var prefix string
if s . config . EtcdS3Folder != "" {
2021-05-01 01:26:39 +00:00
prefix = filepath . Join ( s . config . EtcdS3Folder , fullSnapshotPrefix )
2021-03-03 18:14:12 +00:00
} else {
2021-05-01 01:26:39 +00:00
prefix = fullSnapshotPrefix
2021-03-03 18:14:12 +00:00
}
2021-05-01 01:26:39 +00:00
return prefix
}
2021-11-29 18:30:04 +00:00
// snapshotRetention prunes snapshots in the configured S3 compatible backend for this specific node.
2021-06-30 20:29:03 +00:00
func ( s * S3 ) snapshotRetention ( ctx context . Context ) error {
2021-11-29 18:30:04 +00:00
if s . config . EtcdSnapshotRetention < 1 {
return nil
}
logrus . Infof ( "Applying snapshot retention policy to snapshots stored in S3: retention: %d, snapshotPrefix: %s" , s . config . EtcdSnapshotRetention , s . snapshotPrefix ( ) )
2021-05-01 01:26:39 +00:00
var snapshotFiles [ ] minio . ObjectInfo
2021-03-03 18:14:12 +00:00
2021-10-15 17:24:14 +00:00
toCtx , cancel := context . WithTimeout ( ctx , s . config . EtcdS3Timeout )
2021-05-07 23:10:04 +00:00
defer cancel ( )
2021-03-03 18:14:12 +00:00
loo := minio . ListObjectsOptions {
Recursive : true ,
2021-05-01 01:26:39 +00:00
Prefix : s . snapshotPrefix ( ) ,
2021-03-03 18:14:12 +00:00
}
2021-05-07 23:10:04 +00:00
for info := range s . client . ListObjects ( toCtx , s . config . EtcdS3BucketName , loo ) {
2021-03-03 18:14:12 +00:00
if info . Err != nil {
return info . Err
}
snapshotFiles = append ( snapshotFiles , info )
}
if len ( snapshotFiles ) <= s . config . EtcdSnapshotRetention {
return nil
}
2023-08-14 21:48:59 +00:00
sort . Slice ( snapshotFiles , func ( firstSnapshot , secondSnapshot int ) bool {
// it takes the key from the snapshot file ex: etcd-snapshot-example-{date}, makes the split using "-" to find the date, takes the date and sort by date
firstSnapshotName , secondSnapshotName := strings . Split ( snapshotFiles [ firstSnapshot ] . Key , "-" ) , strings . Split ( snapshotFiles [ secondSnapshot ] . Key , "-" )
firstSnapshotDate , secondSnapshotDate := firstSnapshotName [ len ( firstSnapshotName ) - 1 ] , secondSnapshotName [ len ( secondSnapshotName ) - 1 ]
return firstSnapshotDate < secondSnapshotDate
2021-03-03 18:14:12 +00:00
} )
delCount := len ( snapshotFiles ) - s . config . EtcdSnapshotRetention
for _ , df := range snapshotFiles [ : delCount ] {
2021-11-29 18:30:04 +00:00
logrus . Infof ( "Removing S3 snapshot: %s" , df . Key )
2021-03-03 18:14:12 +00:00
if err := s . client . RemoveObject ( ctx , s . config . EtcdS3BucketName , df . Key , minio . RemoveObjectOptions { } ) ; err != nil {
return err
}
}
return nil
}
func readS3EndpointCA ( endpointCA string ) ( [ ] byte , error ) {
ca , err := base64 . StdEncoding . DecodeString ( endpointCA )
if err != nil {
2022-10-08 00:36:57 +00:00
return os . ReadFile ( endpointCA )
2021-03-03 18:14:12 +00:00
}
return ca , nil
}
func setTransportCA ( tr http . RoundTripper , endpointCA string , insecureSkipVerify bool ) ( http . RoundTripper , error ) {
ca , err := readS3EndpointCA ( endpointCA )
if err != nil {
return tr , err
}
if ! isValidCertificate ( ca ) {
return tr , errors . New ( "endpoint-ca is not a valid x509 certificate" )
}
certPool := x509 . NewCertPool ( )
certPool . AppendCertsFromPEM ( ca )
tr . ( * http . Transport ) . TLSClientConfig = & tls . Config {
RootCAs : certPool ,
InsecureSkipVerify : insecureSkipVerify ,
}
return tr , nil
}
// isValidCertificate checks to see if the given
// byte slice is a valid x509 certificate.
func isValidCertificate ( c [ ] byte ) bool {
p , _ := pem . Decode ( c )
if p == nil {
return false
}
if _ , err := x509 . ParseCertificates ( p . Bytes ) ; err != nil {
return false
}
return true
}
func bucketLookupType ( endpoint string ) minio . BucketLookupType {
if strings . Contains ( endpoint , "aliyun" ) { // backwards compt with RKE1
return minio . BucketLookupDNS
}
return minio . BucketLookupAuto
}