Add etcd members as learners (#2066)

* Add etcd members as learners

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* Ignore errors in promote member

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>
This commit is contained in:
Hussein Galal 2020-07-29 22:52:49 +02:00 committed by GitHub
parent a33494802b
commit 169ee63907
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 86 additions and 25 deletions

View File

@ -22,7 +22,7 @@ func (c *Cluster) testClusterDB(ctx context.Context) (<-chan struct{}, error) {
go func() {
defer close(result)
for {
if err := c.managedDB.Test(ctx); err != nil {
if err := c.managedDB.Test(ctx, c.clientAccessInfo); err != nil {
logrus.Infof("Failed to test data store connection: %v", err)
} else {
logrus.Infof("Data store connection OK")
@ -46,7 +46,7 @@ func (c *Cluster) start(ctx context.Context) error {
}
if c.config.ClusterReset {
return c.managedDB.Reset(ctx)
return c.managedDB.Reset(ctx, c.clientAccessInfo)
}
return c.managedDB.Start(ctx, c.clientAccessInfo)

View File

@ -17,9 +17,9 @@ var (
type Driver interface {
IsInitialized(ctx context.Context, config *config.Control) (bool, error)
Register(ctx context.Context, config *config.Control, l net.Listener, handler http.Handler) (net.Listener, http.Handler, error)
Reset(ctx context.Context) error
Start(ctx context.Context, clientAccess *clientaccess.Info) error
Test(ctx context.Context) error
Reset(ctx context.Context, clientAccessInfo *clientaccess.Info) error
Start(ctx context.Context, clientAccessInfo *clientaccess.Info) error
Test(ctx context.Context, clientAccessInfo *clientaccess.Info) error
EndpointName() string
}

View File

@ -43,9 +43,19 @@ func (e *ETCD) EndpointName() string {
return "etcd"
}
func (e *ETCD) Test(ctx context.Context) error {
func (e *ETCD) Test(ctx context.Context, clientAccessInfo *clientaccess.Info) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
status, err := e.client.Status(ctx, "https://127.0.0.1:2379")
if err != nil {
return err
}
if status.IsLearner {
if err := e.promoteMember(ctx, clientAccessInfo); err != nil {
return err
}
}
members, err := e.client.MemberList(ctx)
if err != nil {
return err
@ -89,11 +99,12 @@ func (e *ETCD) IsInitialized(ctx context.Context, config *config.Control) (bool,
}
}
func (e *ETCD) Reset(ctx context.Context) error {
func (e *ETCD) Reset(ctx context.Context, clientAccessInfo *clientaccess.Info) error {
go func() {
for {
time.Sleep(5 * time.Second)
if err := e.Test(ctx); err == nil {
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for range t.C {
if err := e.Test(ctx, clientAccessInfo); err == nil {
members, err := e.client.MemberList(ctx)
if err != nil {
continue
@ -109,7 +120,7 @@ func (e *ETCD) Reset(ctx context.Context) error {
return e.newCluster(ctx, true)
}
func (e *ETCD) Start(ctx context.Context, clientAccess *clientaccess.Info) error {
func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) error {
existingCluster, err := e.IsInitialized(ctx, e.config)
if err != nil {
return errors.Wrapf(err, "failed to validation")
@ -128,29 +139,19 @@ func (e *ETCD) Start(ctx context.Context, clientAccess *clientaccess.Info) error
return e.cluster(ctx, false, opt)
}
if clientAccess == nil {
if clientAccessInfo == nil {
return e.newCluster(ctx, false)
}
err = e.join(ctx, clientAccess)
err = e.join(ctx, clientAccessInfo)
return errors.Wrap(err, "joining etcd cluster")
}
func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) error {
resp, err := clientaccess.Get("/db/info", clientAccessInfo)
clientURLs, memberList, err := e.clientURLs(ctx, clientAccessInfo)
if err != nil {
return err
}
var memberList Members
if err := json.Unmarshal(resp, &memberList); err != nil {
return err
}
var clientURLs []string
for _, member := range memberList.Members {
clientURLs = append(clientURLs, member.ClientURLs...)
}
client, err := joinClient(ctx, e.runtime, clientURLs)
if err != nil {
return err
@ -197,12 +198,14 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er
if add {
logrus.Infof("Adding %s to etcd cluster %v", e.peerURL(), cluster)
if _, err = client.MemberAdd(ctx, []string{e.peerURL()}); err != nil {
if _, err = client.MemberAddAsLearner(ctx, []string{e.peerURL()}); err != nil {
return err
}
cluster = append(cluster, fmt.Sprintf("%s=%s", e.name, e.peerURL()))
}
go e.promoteMember(ctx, clientAccessInfo)
logrus.Infof("Starting etcd for cluster %v", cluster)
return e.cluster(ctx, false, executor.InitialOptions{
Cluster: strings.Join(cluster, ","),
@ -419,3 +422,61 @@ func (e *ETCD) removePeer(ctx context.Context, id, address string) error {
return nil
}
func (e *ETCD) promoteMember(ctx context.Context, clientAccessInfo *clientaccess.Info) error {
clientURLs, _, err := e.clientURLs(ctx, clientAccessInfo)
if err != nil {
return err
}
memberPromoted := true
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for range t.C {
client, err := joinClient(ctx, e.runtime, clientURLs)
// continue on errors to keep trying to promote member
// grpc error are shown so no need to re log them
if err != nil {
continue
}
members, err := client.MemberList(ctx)
if err != nil {
continue
}
for _, member := range members.Members {
// only one learner can exist in the cluster
if !member.IsLearner {
continue
}
if _, err := client.MemberPromote(ctx, member.ID); err != nil {
memberPromoted = false
break
}
}
if memberPromoted {
break
}
}
return nil
}
func (e *ETCD) clientURLs(ctx context.Context, clientAccessInfo *clientaccess.Info) ([]string, Members, error) {
var memberList Members
resp, err := clientaccess.Get("/db/info", clientAccessInfo)
if err != nil {
return nil, memberList, err
}
if err := json.Unmarshal(resp, &memberList); err != nil {
return nil, memberList, err
}
var clientURLs []string
for _, member := range memberList.Members {
// excluding learner member from the client list
if member.IsLearner {
continue
}
clientURLs = append(clientURLs, member.ClientURLs...)
}
return clientURLs, memberList, nil
}