k3s/vendor/github.com/k3s-io/kine/pkg/client/client.go

134 lines
2.7 KiB
Go
Raw Normal View History

2019-11-11 22:21:21 +00:00
package client
import (
"context"
"errors"
"fmt"
"time"
"github.com/k3s-io/kine/pkg/endpoint"
2019-12-12 01:27:03 +00:00
"go.etcd.io/etcd/clientv3"
2019-11-11 22:21:21 +00:00
)
type Value struct {
2019-12-17 05:20:00 +00:00
Key []byte
2019-11-11 22:21:21 +00:00
Data []byte
Modified int64
}
var (
ErrNotFound = errors.New("etcdwrapper: key not found")
)
type Client interface {
2019-12-17 05:20:00 +00:00
List(ctx context.Context, key string, rev int) ([]Value, error)
2019-11-11 22:21:21 +00:00
Get(ctx context.Context, key string) (Value, error)
Put(ctx context.Context, key string, value []byte) error
Create(ctx context.Context, key string, value []byte) error
Update(ctx context.Context, key string, revision int64, value []byte) error
Close() error
}
type client struct {
c *clientv3.Client
}
func New(config endpoint.ETCDConfig) (Client, error) {
tlsConfig, err := config.TLSConfig.ClientConfig()
if err != nil {
return nil, err
}
c, err := clientv3.New(clientv3.Config{
Endpoints: config.Endpoints,
DialTimeout: 5 * time.Second,
TLS: tlsConfig,
})
if err != nil {
return nil, err
}
return &client{
c: c,
}, nil
}
2019-12-17 05:20:00 +00:00
func (c *client) List(ctx context.Context, key string, rev int) ([]Value, error) {
resp, err := c.c.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithRev(int64(rev)))
if err != nil {
return nil, err
}
var vals []Value
for _, kv := range resp.Kvs {
vals = append(vals, Value{
Key: kv.Key,
Data: kv.Value,
Modified: kv.ModRevision,
})
}
return vals, nil
}
2019-11-11 22:21:21 +00:00
func (c *client) Get(ctx context.Context, key string) (Value, error) {
resp, err := c.c.Get(ctx, key)
if err != nil {
return Value{}, err
}
if len(resp.Kvs) == 1 {
return Value{
2019-12-17 05:20:00 +00:00
Key: resp.Kvs[0].Key,
2019-11-11 22:21:21 +00:00
Data: resp.Kvs[0].Value,
Modified: resp.Kvs[0].ModRevision,
}, nil
}
return Value{}, ErrNotFound
}
func (c *client) Put(ctx context.Context, key string, value []byte) error {
val, err := c.Get(ctx, key)
if err != nil {
return err
}
if val.Modified == 0 {
return c.Create(ctx, key, value)
}
return c.Update(ctx, key, val.Modified, value)
}
func (c *client) Create(ctx context.Context, key string, value []byte) error {
resp, err := c.c.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", 0)).
Then(clientv3.OpPut(key, string(value))).
Commit()
if err != nil {
return err
}
if !resp.Succeeded {
return fmt.Errorf("key exists")
}
return nil
}
func (c *client) Update(ctx context.Context, key string, revision int64, value []byte) error {
resp, err := c.c.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", revision)).
Then(clientv3.OpPut(key, string(value))).
Else(clientv3.OpGet(key)).
Commit()
if err != nil {
return err
}
if !resp.Succeeded {
return fmt.Errorf("revision %d doesnt match", revision)
}
return nil
}
func (c *client) Close() error {
return c.c.Close()
}