diff --git a/go.mod b/go.mod index d247c143b0..9f8d9b2ed0 100644 --- a/go.mod +++ b/go.mod @@ -102,7 +102,7 @@ require ( github.com/rakelkar/gonetsh v0.0.0-20190719023240-501daadcadf8 // indirect github.com/rancher/dynamiclistener v0.1.1-0.20191113144757-736b5d5d8b51 github.com/rancher/helm-controller v0.2.2 - github.com/rancher/kine v0.2.2 + github.com/rancher/kine v0.2.3 github.com/rancher/remotedialer v0.2.0 github.com/rancher/wrangler v0.2.0 github.com/rancher/wrangler-api v0.2.0 diff --git a/go.sum b/go.sum index 1299d4c280..8910129af1 100644 --- a/go.sum +++ b/go.sum @@ -592,8 +592,8 @@ github.com/rancher/go-dqlite v1.1.0-k3s.1 h1:w3ghNkY5vqRnnrcqxvHkpBQr6E+R/nIwJfa github.com/rancher/go-dqlite v1.1.0-k3s.1/go.mod h1:lj8UhpkZddn/Ag0tBsnkbELbxHGMpzrZLMs/nW9/DX4= github.com/rancher/helm-controller v0.2.2 h1:MUqisy53/Ay1EYOF2uTCYBbGpgtZLNKKrI01BdxIbQo= github.com/rancher/helm-controller v0.2.2/go.mod h1:0JkL0UjxddNbT4FmLoESarD4Mz8xzA5YlejqJ/U4g+8= -github.com/rancher/kine v0.2.2 h1:dN5jZK1x3t5CqqEz05EImBjtxQm7ANeLmNJ8rA2jB90= -github.com/rancher/kine v0.2.2/go.mod h1:SdBUuE7e3XyrJvdBxCl9TMMapF+wyZnMZSP/H59OqNE= +github.com/rancher/kine v0.2.3 h1:ln4Pmtb3ReqCLf6hF7aF1QmMLgnYPbq4/VZ1UyJ9v3A= +github.com/rancher/kine v0.2.3/go.mod h1:SdBUuE7e3XyrJvdBxCl9TMMapF+wyZnMZSP/H59OqNE= github.com/rancher/kubernetes v1.16.2-k3s.1 h1:+oJEecXgQDkEOD/X8z2YUdYVonbXZtGzXsmtKDPYesg= github.com/rancher/kubernetes v1.16.2-k3s.1/go.mod h1:SmhGgKfQ30imqjFVj8AI+iW+zSyFsswNErKYeTfgoH0= github.com/rancher/kubernetes/staging/src/k8s.io/api v1.16.2-k3s.1 h1:2kK5KD6MU86txBYKG+tM6j5zbey02DaIDtwpG5JsfnI= diff --git a/vendor/github.com/rancher/kine/pkg/drivers/generic/generic.go b/vendor/github.com/rancher/kine/pkg/drivers/generic/generic.go index da9a3df3ac..ef0facaa53 100644 --- a/vendor/github.com/rancher/kine/pkg/drivers/generic/generic.go +++ b/vendor/github.com/rancher/kine/pkg/drivers/generic/generic.go @@ -294,8 +294,11 @@ func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) { return id, err } -func (d *Generic) After(ctx context.Context, prefix string, rev int64) (*sql.Rows, error) { +func (d *Generic) After(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error) { sql := d.AfterSQL + if limit > 0 { + sql = fmt.Sprintf("%s LIMIT %d", sql, limit) + } return d.query(ctx, sql, prefix, rev) } diff --git a/vendor/github.com/rancher/kine/pkg/logstructured/logstructured.go b/vendor/github.com/rancher/kine/pkg/logstructured/logstructured.go index 83b116b9ca..fd52922f3f 100644 --- a/vendor/github.com/rancher/kine/pkg/logstructured/logstructured.go +++ b/vendor/github.com/rancher/kine/pkg/logstructured/logstructured.go @@ -13,7 +13,7 @@ type Log interface { Start(ctx context.Context) error CurrentRevision(ctx context.Context) (int64, error) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error) - After(ctx context.Context, prefix string, revision int64) (int64, []*server.Event, error) + After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) Watch(ctx context.Context, prefix string) <-chan []*server.Event Count(ctx context.Context, prefix string) (int64, int64, error) Append(ctx context.Context, event *server.Event) (int64, error) @@ -185,10 +185,14 @@ func (l *LogStructured) Count(ctx context.Context, prefix string) (revRet int64, func (l *LogStructured) Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, kvRet *server.KeyValue, updateRet bool, errRet error) { defer func() { l.adjustRevision(ctx, &revRet) - logrus.Debugf("UPDATE %s, value=%d, rev=%d, lease=%v => rev=%d, kv=%v, updated=%v, err=%v", key, len(value), revision, lease, revRet, kvRet != nil, updateRet, errRet) + kvRev := int64(0) + if kvRet != nil { + kvRev = kvRet.ModRevision + } + logrus.Debugf("UPDATE %s, value=%d, rev=%d, lease=%v => rev=%d, kvrev=%d, updated=%v, err=%v", key, len(value), revision, lease, revRet, kvRev, updateRet, errRet) }() - rev, event, err := l.get(ctx, key, revision, false) + rev, event, err := l.get(ctx, key, 0, false) if err != nil { return 0, nil, false, err } @@ -295,7 +299,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64 result := make(chan []*server.Event, 100) - rev, kvs, err := l.log.After(ctx, prefix, revision) + rev, kvs, err := l.log.After(ctx, prefix, revision, 0) if err != nil { logrus.Errorf("failed to list %s for revision %d", prefix, revision) cancel() diff --git a/vendor/github.com/rancher/kine/pkg/logstructured/sqllog/sql.go b/vendor/github.com/rancher/kine/pkg/logstructured/sqllog/sql.go index 9a82853d25..ede63b12d1 100644 --- a/vendor/github.com/rancher/kine/pkg/logstructured/sqllog/sql.go +++ b/vendor/github.com/rancher/kine/pkg/logstructured/sqllog/sql.go @@ -31,7 +31,7 @@ type Dialect interface { List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error) Count(ctx context.Context, prefix string) (int64, int64, error) CurrentRevision(ctx context.Context) (int64, error) - After(ctx context.Context, prefix string, rev int64) (*sql.Rows, error) + After(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error) Insert(ctx context.Context, key string, create, delete bool, createRevision, previousRevision int64, ttl int64, value, prevValue []byte) (int64, error) GetRevision(ctx context.Context, revision int64) (*sql.Rows, error) DeleteRevision(ctx context.Context, revision int64) error @@ -152,12 +152,12 @@ func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) { return s.d.CurrentRevision(ctx) } -func (s *SQLLog) After(ctx context.Context, prefix string, revision int64) (int64, []*server.Event, error) { +func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) { if strings.HasSuffix(prefix, "/") { prefix += "%" } - rows, err := s.d.After(ctx, prefix, revision) + rows, err := s.d.After(ctx, prefix, revision, limit) if err != nil { return 0, nil, err } @@ -280,9 +280,10 @@ func (s *SQLLog) startWatch() (chan interface{}, error) { func (s *SQLLog) poll(result chan interface{}, pollStart int64) { var ( - last = pollStart - skip int64 - skipTime time.Time + last = pollStart + skip int64 + skipTime time.Time + waitForMore = true ) wait := time.NewTicker(time.Second) @@ -290,17 +291,20 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) { defer close(result) for { - select { - case <-s.ctx.Done(): - return - case check := <-s.notify: - if check <= last { - continue + if waitForMore { + select { + case <-s.ctx.Done(): + return + case check := <-s.notify: + if check <= last { + continue + } + case <-wait.C: } - case <-wait.C: } + waitForMore = true - rows, err := s.d.After(s.ctx, "%", last) + rows, err := s.d.After(s.ctx, "%", last, 500) if err != nil { logrus.Errorf("fail to list latest changes: %v", err) continue @@ -316,6 +320,8 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) { continue } + waitForMore = len(events) < 100 + rev := last var ( sequential []*server.Event diff --git a/vendor/github.com/rancher/kine/pkg/server/server.go b/vendor/github.com/rancher/kine/pkg/server/server.go index f930e2193e..9e080ab795 100644 --- a/vendor/github.com/rancher/kine/pkg/server/server.go +++ b/vendor/github.com/rancher/kine/pkg/server/server.go @@ -104,10 +104,8 @@ func toKVs(kvs ...*KeyValue) []*mvccpb.KeyValue { ret := make([]*mvccpb.KeyValue, 0, len(kvs)) for _, kv := range kvs { newKV := toKV(kv) - if newKV == nil { - fmt.Println("HIHIHIH") - } else { - ret = append(ret, toKV(kv)) + if newKV != nil { + ret = append(ret, newKV) } } return ret diff --git a/vendor/modules.txt b/vendor/modules.txt index a088b96515..c2da59a45b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -769,7 +769,7 @@ github.com/rancher/helm-controller/pkg/generated/informers/externalversions/helm github.com/rancher/helm-controller/pkg/generated/listers/helm.cattle.io/v1 github.com/rancher/helm-controller/pkg/generated/informers/externalversions/internalinterfaces github.com/rancher/helm-controller/pkg/apis/helm.cattle.io -# github.com/rancher/kine v0.2.2 +# github.com/rancher/kine v0.2.3 github.com/rancher/kine/pkg/client github.com/rancher/kine/pkg/endpoint github.com/rancher/kine/pkg/drivers/dqlite