diff --git a/go.mod b/go.mod index 151603c1d0..b6abc94a05 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ replace ( github.com/prometheus/client_model => github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 github.com/prometheus/common => github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 github.com/prometheus/procfs => github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a - github.com/rancher/kine => github.com/ibuildthecloud/kine v0.1.0 + github.com/rancher/kine => github.com/ibuildthecloud/kine v0.1.1 k8s.io/api => github.com/rancher/kubernetes/staging/src/k8s.io/api v1.16.2-k3s.1 k8s.io/apiextensions-apiserver => github.com/rancher/kubernetes/staging/src/k8s.io/apiextensions-apiserver v1.16.2-k3s.1 k8s.io/apimachinery => github.com/rancher/kubernetes/staging/src/k8s.io/apimachinery v1.16.2-k3s.1 diff --git a/go.sum b/go.sum index 742aa9c1fc..dc5641123b 100644 --- a/go.sum +++ b/go.sum @@ -375,8 +375,8 @@ github.com/heketi/tests v0.0.0-20151005000721-f3775cbcefd6/go.mod h1:xGMAM8JLi7U github.com/heketi/utils v0.0.0-20170317161834-435bc5bdfa64/go.mod h1:RYlF4ghFZPPmk2TC5REt5OFwvfb6lzxFWrTWB+qs28s= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/ibuildthecloud/kine v0.1.0 h1:nF/wtPm7qECzZeagEyZgy8seFcwpJz8LRF6pc7hYSl0= -github.com/ibuildthecloud/kine v0.1.0/go.mod h1:TTWUtUeu7dHQan9BrCtlRbKr9eK7epHqrBFOAae15Bg= +github.com/ibuildthecloud/kine v0.1.1 h1:HAqWHrjRDoqQ3+pKuJrk7Sx9nzoW3zotzhxMfAZ4YME= +github.com/ibuildthecloud/kine v0.1.1/go.mod h1:TTWUtUeu7dHQan9BrCtlRbKr9eK7epHqrBFOAae15Bg= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.7 h1:Y+UAYTZ7gDEuOfhxKWy+dvb5dRQ6rJjFSdX2HZY1/gI= github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= diff --git a/vendor/github.com/rancher/kine/pkg/drivers/generic/generic.go b/vendor/github.com/rancher/kine/pkg/drivers/generic/generic.go index b5262b50aa..eeabc83667 100644 --- a/vendor/github.com/rancher/kine/pkg/drivers/generic/generic.go +++ b/vendor/github.com/rancher/kine/pkg/drivers/generic/generic.go @@ -70,6 +70,7 @@ type Generic struct { DeleteSQL string UpdateCompactSQL string InsertSQL string + FillSQL string InsertLastInsertIDSQL string } @@ -162,6 +163,9 @@ func Open(driverName, dataSourceName string, paramCharacter string, numbered boo InsertSQL: q(`INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value) values(?, ?, ?, ?, ?, ?, ?, ?) RETURNING id`, paramCharacter, numbered), + + FillSQL: q(`INSERT INTO kine(id, name, created, deleted, create_revision, prev_revision, lease, value, old_value) + values(?, ?, ?, ?, ?, ?, ?, ?, ?)`, paramCharacter, numbered), }, nil } @@ -265,6 +269,15 @@ func (d *Generic) After(ctx context.Context, prefix string, rev int64) (*sql.Row return d.query(ctx, sql, prefix, rev) } +func (d *Generic) Fill(ctx context.Context, revision int64) error { + _, err := d.execute(ctx, d.FillSQL, revision, fmt.Sprintf("gap-%d", revision), 0, 1, 0, 0, 0, nil, nil) + return err +} + +func (d *Generic) IsFill(key string) bool { + return strings.HasPrefix(key, "gap-") +} + func (d *Generic) Insert(ctx context.Context, key string, create, delete bool, createRevision, previousRevision int64, ttl int64, value, prevValue []byte) (id int64, err error) { cVal := 0 dVal := 0 diff --git a/vendor/github.com/rancher/kine/pkg/drivers/mysql/mysql.go b/vendor/github.com/rancher/kine/pkg/drivers/mysql/mysql.go index a5f8f3121f..c1efe6772d 100644 --- a/vendor/github.com/rancher/kine/pkg/drivers/mysql/mysql.go +++ b/vendor/github.com/rancher/kine/pkg/drivers/mysql/mysql.go @@ -29,8 +29,8 @@ var ( create_revision INTEGER, prev_revision INTEGER, lease INTEGER, - value BLOB, - old_value BLOB, + value MEDIUMBLOB, + old_value MEDIUMBLOB, PRIMARY KEY (id) );`, } diff --git a/vendor/github.com/rancher/kine/pkg/logstructured/logstructured.go b/vendor/github.com/rancher/kine/pkg/logstructured/logstructured.go index 520bb19e10..fe7f64e4f9 100644 --- a/vendor/github.com/rancher/kine/pkg/logstructured/logstructured.go +++ b/vendor/github.com/rancher/kine/pkg/logstructured/logstructured.go @@ -2,6 +2,7 @@ package logstructured import ( "context" + "sync" "time" "github.com/rancher/kine/pkg/server" @@ -106,7 +107,13 @@ func (l *LogStructured) Create(ctx context.Context, key string, value []byte, le createEvent.PrevKV = prevEvent.KV } - return l.log.Append(ctx, createEvent) + revRet, errRet = l.log.Append(ctx, createEvent) + if errRet != nil { + if _, prevEvent, err := l.get(ctx, key, 0, true); err == nil && prevEvent != nil && !prevEvent.Delete { + return 0, server.ErrKeyExists + } + } + return } func (l *LogStructured) Delete(ctx context.Context, key string, revision int64) (revRet int64, kvRet *server.KeyValue, deletedRet bool, errRet error) { @@ -222,22 +229,60 @@ func (l *LogStructured) Update(ctx context.Context, key string, value []byte, re return rev, updateEvent.KV, true, err } -func (l *LogStructured) ttl(ctx context.Context) { - // very naive TTL support - for events := range l.log.Watch(ctx, "/") { - for _, event := range events { - if event.KV.Lease <= 0 { - continue +func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event { + result := make(chan *server.Event) + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + wg.Wait() + close(result) + }() + + go func() { + defer wg.Done() + rev, events, err := l.log.List(ctx, "/", "", 1000, 0, false) + for len(events) > 0 { + if err != nil { + logrus.Errorf("failed to read old events for ttl") + return } - go func(event *server.Event) { - select { - case <-ctx.Done(): - return - case <-time.After(time.Duration(event.KV.Lease) * time.Second): + + for _, event := range events { + if event.KV.Lease > 0 { + result <- event } - l.Delete(ctx, event.KV.Key, event.KV.ModRevision) - }(event) + } + + _, events, err = l.log.List(ctx, "/", events[len(events)-1].KV.Key, 1000, rev, false) } + }() + + go func() { + defer wg.Done() + for events := range l.log.Watch(ctx, "/") { + for _, event := range events { + if event.KV.Lease > 0 { + result <- event + } + } + } + }() + + return result +} + +func (l *LogStructured) ttl(ctx context.Context) { + // vary naive TTL support + for event := range l.ttlEvents(ctx) { + go func(event *server.Event) { + select { + case <-ctx.Done(): + return + case <-time.After(time.Duration(event.KV.Lease) * time.Second): + } + l.Delete(ctx, event.KV.Key, event.KV.ModRevision) + }(event) } } @@ -253,7 +298,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64 revision -= 1 } - result := make(chan []*server.Event) + result := make(chan []*server.Event, 100) rev, kvs, err := l.log.After(ctx, prefix, revision) if err != nil { diff --git a/vendor/github.com/rancher/kine/pkg/logstructured/sqllog/sql.go b/vendor/github.com/rancher/kine/pkg/logstructured/sqllog/sql.go index 93c06d9ebd..c777331a5f 100644 --- a/vendor/github.com/rancher/kine/pkg/logstructured/sqllog/sql.go +++ b/vendor/github.com/rancher/kine/pkg/logstructured/sqllog/sql.go @@ -37,6 +37,8 @@ type Dialect interface { DeleteRevision(ctx context.Context, revision int64) error GetCompactRevision(ctx context.Context) (int64, error) SetCompactRevision(ctx context.Context, revision int64) error + Fill(ctx context.Context, revision int64) error + IsFill(key string) bool } func (s *SQLLog) Start(ctx context.Context) error { @@ -46,7 +48,11 @@ func (s *SQLLog) Start(ctx context.Context) error { } func (s *SQLLog) compact() { - t := time.NewTicker(2 * time.Second) + var ( + nextEnd int64 + ) + t := time.NewTicker(5 * time.Minute) + nextEnd, _ = s.d.CurrentRevision(s.ctx) outer: for { @@ -56,22 +62,23 @@ outer: case <-t.C: } - end, err := s.d.CurrentRevision(s.ctx) + currentRev, err := s.d.CurrentRevision(s.ctx) if err != nil { logrus.Errorf("failed to get current revision: %v", err) continue } + end := nextEnd + nextEnd = currentRev + cursor, err := s.d.GetCompactRevision(s.ctx) if err != nil { logrus.Errorf("failed to get compact revision: %v", err) continue } - if end-cursor < 100 { - // Only run if we have at least 100 rows to process - continue - } + // leave the last 1000 + end = end - 1000 savedCursor := cursor // Purposefully start at the current and redo the current as @@ -224,7 +231,7 @@ func RowsToEvents(rows *sql.Rows) (int64, int64, []*server.Event, error) { } func (s *SQLLog) Watch(ctx context.Context, prefix string) <-chan []*server.Event { - res := make(chan []*server.Event) + res := make(chan []*server.Event, 100) values, err := s.broadcaster.Subscribe(ctx, s.startWatch) if err != nil { return nil @@ -266,10 +273,12 @@ func (s *SQLLog) startWatch() (chan interface{}, error) { func (s *SQLLog) poll(result chan interface{}) { var ( - last int64 + last int64 + skip int64 + skipTime time.Time ) - wait := time.NewTicker(120 * time.Second) + wait := time.NewTicker(time.Second) defer wait.Stop() defer close(result) @@ -284,13 +293,22 @@ func (s *SQLLog) poll(result chan interface{}) { case <-wait.C: } + if last == 0 { + if currentRev, err := s.CurrentRevision(s.ctx); err != nil { + logrus.Errorf("failed to find current revision: %v", err) + continue + } else { + last = currentRev + } + } + rows, err := s.d.After(s.ctx, "%", last) if err != nil { logrus.Errorf("fail to list latest changes: %v", err) continue } - rev, _, events, err := RowsToEvents(rows) + _, _, events, err := RowsToEvents(rows) if err != nil { logrus.Errorf("fail to convert rows changes: %v", err) continue @@ -300,15 +318,73 @@ func (s *SQLLog) poll(result chan interface{}) { continue } + rev := last + var ( + sequential []*server.Event + saveLast bool + ) + for _, event := range events { - logrus.Debugf("TRIGGERED %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete) + next := rev + 1 + // Ensure that we are notifying events in a sequential fashion. For example if we find row 4 before 3 + // we don't want to notify row 4 because 3 is essentially dropped forever. + if event.KV.ModRevision != next { + if canSkipRevision(next, skip, skipTime) { + // This situation should never happen, but we have it here as a fallback just for unknown reasons + // we don't want to pause all watches forever + logrus.Errorf("GAP %s, revision=%d, delete=%v, next=%d", event.KV.Key, event.KV.ModRevision, event.Delete, next) + } else if skip != next { + // This is the first time we have encountered this missing revision, so record time start + // and trigger a quick retry for simple out of order events + skip = next + skipTime = time.Now() + select { + case s.notify <- next: + default: + } + break + } else { + if err := s.d.Fill(s.ctx, next); err == nil { + logrus.Debugf("FILL, revision=%d, err=%v", next, err) + select { + case s.notify <- next: + default: + } + } else { + logrus.Debugf("FILL FAILED, revision=%d, err=%v", next, err) + } + break + } + } + + // we have done something now that we should save the last revision. We don't save here now because + // the next loop could fail leading to saving the reported revision without reporting it. In practice this + // loop right now has no error exit so the next loop shouldn't fail, but if we for some reason add a method + // that returns error, that would be a tricky bug to find. So instead we only save the last revision at + // the same time we write to the channel. + saveLast = true + rev = event.KV.ModRevision + if s.d.IsFill(event.KV.Key) { + logrus.Debugf("NOT TRIGGER FILL %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete) + } else { + sequential = append(sequential, event) + logrus.Debugf("TRIGGERED %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete) + } } - result <- events - last = rev + if saveLast { + last = rev + if len(sequential) > 0 { + result <- sequential + } + } } } +func canSkipRevision(rev, skip int64, skipTime time.Time) bool { + return rev == skip && time.Now().Sub(skipTime) > time.Second +} + func (s *SQLLog) Count(ctx context.Context, prefix string) (int64, int64, error) { if strings.HasSuffix(prefix, "/") { prefix += "%" diff --git a/vendor/github.com/rancher/kine/pkg/server/watch.go b/vendor/github.com/rancher/kine/pkg/server/watch.go index 2eb9eb1c02..151c94fd3f 100644 --- a/vendor/github.com/rancher/kine/pkg/server/watch.go +++ b/vendor/github.com/rancher/kine/pkg/server/watch.go @@ -87,7 +87,7 @@ func (w *watcher) Start(r *etcdserverpb.WatchCreateRequest) { Events: toEvents(events...), }); err != nil { w.Cancel(id) - return + continue } } logrus.Debugf("WATCH CLOSE id=%d, key=%s", id, key) diff --git a/vendor/modules.txt b/vendor/modules.txt index bb1e4d46bf..c032f16d01 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -743,7 +743,7 @@ github.com/rancher/helm-controller/pkg/generated/informers/externalversions/helm github.com/rancher/helm-controller/pkg/generated/listers/helm.cattle.io/v1 github.com/rancher/helm-controller/pkg/generated/informers/externalversions/internalinterfaces github.com/rancher/helm-controller/pkg/apis/helm.cattle.io -# github.com/rancher/kine v0.0.0-00010101000000-000000000000 => github.com/ibuildthecloud/kine v0.1.0 +# github.com/rancher/kine v0.0.0-00010101000000-000000000000 => github.com/ibuildthecloud/kine v0.1.1 github.com/rancher/kine/pkg/endpoint github.com/rancher/kine/pkg/client github.com/rancher/kine/pkg/drivers/mysql