Migrate sqlite data to etcd when initializing the cluster

Signed-off-by: Darren Shepherd <darren@rancher.com>
This commit is contained in:
Darren Shepherd 2021-04-26 09:47:53 -07:00 committed by Brad Davidson
parent bfb3d9b19d
commit 741ba95b04

View File

@ -19,6 +19,8 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/k3s-io/kine/pkg/client"
endpoint2 "github.com/k3s-io/kine/pkg/endpoint"
"github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7"
"github.com/pkg/errors" "github.com/pkg/errors"
certutil "github.com/rancher/dynamiclistener/cert" certutil "github.com/rancher/dynamiclistener/cert"
@ -149,6 +151,10 @@ func walDir(config *config.Control) string {
return filepath.Join(etcdDBDir(config), "member", "wal") return filepath.Join(etcdDBDir(config), "member", "wal")
} }
func sqliteFile(config *config.Control) string {
return filepath.Join(config.DataDir, "db", "state.db")
}
// nameFile returns the path to etcdDBDir/name. // nameFile returns the path to etcdDBDir/name.
func nameFile(config *config.Control) string { func nameFile(config *config.Control) string {
return filepath.Join(etcdDBDir(config), "name") return filepath.Join(etcdDBDir(config), "name")
@ -504,11 +510,68 @@ func GetAdvertiseAddress(advertiseIP string) (string, error) {
// newCluster returns options to set up etcd for a new cluster // newCluster returns options to set up etcd for a new cluster
func (e *ETCD) newCluster(ctx context.Context, reset bool) error { func (e *ETCD) newCluster(ctx context.Context, reset bool) error {
return e.cluster(ctx, reset, executor.InitialOptions{ err := e.cluster(ctx, reset, executor.InitialOptions{
AdvertisePeerURL: fmt.Sprintf("https://%s:2380", e.address), AdvertisePeerURL: fmt.Sprintf("https://%s:2380", e.address),
Cluster: fmt.Sprintf("%s=https://%s:2380", e.name, e.address), Cluster: fmt.Sprintf("%s=https://%s:2380", e.name, e.address),
State: "new", State: "new",
}) })
if err != nil {
return err
}
if err := e.migrateFromSQLite(ctx); err != nil {
return fmt.Errorf("failed to migrate content from sqlite to etcd: %w", err)
}
return nil
}
func (e *ETCD) migrateFromSQLite(ctx context.Context) error {
_, err := os.Stat(sqliteFile(e.config))
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
logrus.Infof("Migrating content from sqlite to etcd")
ctx, cancel := context.WithCancel(ctx)
defer cancel()
_, err = endpoint2.Listen(ctx, endpoint2.Config{
Endpoint: endpoint2.SQLiteBackend,
})
if err != nil {
return err
}
sqliteClient, err := client.New(endpoint2.ETCDConfig{
Endpoints: []string{"unix://kine.sock"},
})
if err != nil {
return err
}
defer sqliteClient.Close()
etcdClient, err := GetClient(ctx, e.runtime, "https://localhost:2379")
if err != nil {
return err
}
defer etcdClient.Close()
values, err := sqliteClient.List(ctx, "/registry/", 0)
if err != nil {
return err
}
for _, value := range values {
logrus.Infof("Migrating etcd key %s", value.Key)
_, err := etcdClient.Put(ctx, string(value.Key), string(value.Data))
if err != nil {
return err
}
}
return os.Rename(sqliteFile(e.config), sqliteFile(e.config)+".migrated")
} }
// peerURL returns the peer access address for the local node // peerURL returns the peer access address for the local node