Bump kine to fix too many conflicts on mysql

This commit is contained in:
Darren Shepherd 2019-11-14 00:45:05 +00:00 committed by Craig Jellick
parent 7b76db1ad4
commit 19e64c8869
7 changed files with 38 additions and 27 deletions

2
go.mod
View File

@ -102,7 +102,7 @@ require (
github.com/rakelkar/gonetsh v0.0.0-20190719023240-501daadcadf8 // indirect
github.com/rancher/dynamiclistener v0.1.1-0.20191113144757-736b5d5d8b51
github.com/rancher/helm-controller v0.2.2
github.com/rancher/kine v0.2.2
github.com/rancher/kine v0.2.3
github.com/rancher/remotedialer v0.2.0
github.com/rancher/wrangler v0.2.0
github.com/rancher/wrangler-api v0.2.0

4
go.sum
View File

@ -592,8 +592,8 @@ github.com/rancher/go-dqlite v1.1.0-k3s.1 h1:w3ghNkY5vqRnnrcqxvHkpBQr6E+R/nIwJfa
github.com/rancher/go-dqlite v1.1.0-k3s.1/go.mod h1:lj8UhpkZddn/Ag0tBsnkbELbxHGMpzrZLMs/nW9/DX4=
github.com/rancher/helm-controller v0.2.2 h1:MUqisy53/Ay1EYOF2uTCYBbGpgtZLNKKrI01BdxIbQo=
github.com/rancher/helm-controller v0.2.2/go.mod h1:0JkL0UjxddNbT4FmLoESarD4Mz8xzA5YlejqJ/U4g+8=
github.com/rancher/kine v0.2.2 h1:dN5jZK1x3t5CqqEz05EImBjtxQm7ANeLmNJ8rA2jB90=
github.com/rancher/kine v0.2.2/go.mod h1:SdBUuE7e3XyrJvdBxCl9TMMapF+wyZnMZSP/H59OqNE=
github.com/rancher/kine v0.2.3 h1:ln4Pmtb3ReqCLf6hF7aF1QmMLgnYPbq4/VZ1UyJ9v3A=
github.com/rancher/kine v0.2.3/go.mod h1:SdBUuE7e3XyrJvdBxCl9TMMapF+wyZnMZSP/H59OqNE=
github.com/rancher/kubernetes v1.16.2-k3s.1 h1:+oJEecXgQDkEOD/X8z2YUdYVonbXZtGzXsmtKDPYesg=
github.com/rancher/kubernetes v1.16.2-k3s.1/go.mod h1:SmhGgKfQ30imqjFVj8AI+iW+zSyFsswNErKYeTfgoH0=
github.com/rancher/kubernetes/staging/src/k8s.io/api v1.16.2-k3s.1 h1:2kK5KD6MU86txBYKG+tM6j5zbey02DaIDtwpG5JsfnI=

View File

@ -294,8 +294,11 @@ func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) {
return id, err
}
func (d *Generic) After(ctx context.Context, prefix string, rev int64) (*sql.Rows, error) {
func (d *Generic) After(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error) {
sql := d.AfterSQL
if limit > 0 {
sql = fmt.Sprintf("%s LIMIT %d", sql, limit)
}
return d.query(ctx, sql, prefix, rev)
}

View File

@ -13,7 +13,7 @@ type Log interface {
Start(ctx context.Context) error
CurrentRevision(ctx context.Context) (int64, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error)
After(ctx context.Context, prefix string, revision int64) (int64, []*server.Event, error)
After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error)
Watch(ctx context.Context, prefix string) <-chan []*server.Event
Count(ctx context.Context, prefix string) (int64, int64, error)
Append(ctx context.Context, event *server.Event) (int64, error)
@ -185,10 +185,14 @@ func (l *LogStructured) Count(ctx context.Context, prefix string) (revRet int64,
func (l *LogStructured) Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, kvRet *server.KeyValue, updateRet bool, errRet error) {
defer func() {
l.adjustRevision(ctx, &revRet)
logrus.Debugf("UPDATE %s, value=%d, rev=%d, lease=%v => rev=%d, kv=%v, updated=%v, err=%v", key, len(value), revision, lease, revRet, kvRet != nil, updateRet, errRet)
kvRev := int64(0)
if kvRet != nil {
kvRev = kvRet.ModRevision
}
logrus.Debugf("UPDATE %s, value=%d, rev=%d, lease=%v => rev=%d, kvrev=%d, updated=%v, err=%v", key, len(value), revision, lease, revRet, kvRev, updateRet, errRet)
}()
rev, event, err := l.get(ctx, key, revision, false)
rev, event, err := l.get(ctx, key, 0, false)
if err != nil {
return 0, nil, false, err
}
@ -295,7 +299,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64
result := make(chan []*server.Event, 100)
rev, kvs, err := l.log.After(ctx, prefix, revision)
rev, kvs, err := l.log.After(ctx, prefix, revision, 0)
if err != nil {
logrus.Errorf("failed to list %s for revision %d", prefix, revision)
cancel()

View File

@ -31,7 +31,7 @@ type Dialect interface {
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error)
Count(ctx context.Context, prefix string) (int64, int64, error)
CurrentRevision(ctx context.Context) (int64, error)
After(ctx context.Context, prefix string, rev int64) (*sql.Rows, error)
After(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error)
Insert(ctx context.Context, key string, create, delete bool, createRevision, previousRevision int64, ttl int64, value, prevValue []byte) (int64, error)
GetRevision(ctx context.Context, revision int64) (*sql.Rows, error)
DeleteRevision(ctx context.Context, revision int64) error
@ -152,12 +152,12 @@ func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) {
return s.d.CurrentRevision(ctx)
}
func (s *SQLLog) After(ctx context.Context, prefix string, revision int64) (int64, []*server.Event, error) {
func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) {
if strings.HasSuffix(prefix, "/") {
prefix += "%"
}
rows, err := s.d.After(ctx, prefix, revision)
rows, err := s.d.After(ctx, prefix, revision, limit)
if err != nil {
return 0, nil, err
}
@ -280,9 +280,10 @@ func (s *SQLLog) startWatch() (chan interface{}, error) {
func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
var (
last = pollStart
skip int64
skipTime time.Time
last = pollStart
skip int64
skipTime time.Time
waitForMore = true
)
wait := time.NewTicker(time.Second)
@ -290,17 +291,20 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
defer close(result)
for {
select {
case <-s.ctx.Done():
return
case check := <-s.notify:
if check <= last {
continue
if waitForMore {
select {
case <-s.ctx.Done():
return
case check := <-s.notify:
if check <= last {
continue
}
case <-wait.C:
}
case <-wait.C:
}
waitForMore = true
rows, err := s.d.After(s.ctx, "%", last)
rows, err := s.d.After(s.ctx, "%", last, 500)
if err != nil {
logrus.Errorf("fail to list latest changes: %v", err)
continue
@ -316,6 +320,8 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
continue
}
waitForMore = len(events) < 100
rev := last
var (
sequential []*server.Event

View File

@ -104,10 +104,8 @@ func toKVs(kvs ...*KeyValue) []*mvccpb.KeyValue {
ret := make([]*mvccpb.KeyValue, 0, len(kvs))
for _, kv := range kvs {
newKV := toKV(kv)
if newKV == nil {
fmt.Println("HIHIHIH")
} else {
ret = append(ret, toKV(kv))
if newKV != nil {
ret = append(ret, newKV)
}
}
return ret

2
vendor/modules.txt vendored
View File

@ -769,7 +769,7 @@ github.com/rancher/helm-controller/pkg/generated/informers/externalversions/helm
github.com/rancher/helm-controller/pkg/generated/listers/helm.cattle.io/v1
github.com/rancher/helm-controller/pkg/generated/informers/externalversions/internalinterfaces
github.com/rancher/helm-controller/pkg/apis/helm.cattle.io
# github.com/rancher/kine v0.2.2
# github.com/rancher/kine v0.2.3
github.com/rancher/kine/pkg/client
github.com/rancher/kine/pkg/endpoint
github.com/rancher/kine/pkg/drivers/dqlite