Merge pull request #1002 from ibuildthecloud/kine

kine v0.1.1
This commit is contained in:
Erik Wilson 2019-11-03 22:26:44 -07:00 committed by GitHub
commit 9823f60605
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 169 additions and 35 deletions

2
go.mod
View File

@ -31,7 +31,7 @@ replace (
github.com/prometheus/client_model => github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 github.com/prometheus/client_model => github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
github.com/prometheus/common => github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 github.com/prometheus/common => github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
github.com/prometheus/procfs => github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a github.com/prometheus/procfs => github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a
github.com/rancher/kine => github.com/ibuildthecloud/kine v0.1.0 github.com/rancher/kine => github.com/ibuildthecloud/kine v0.1.1
k8s.io/api => github.com/rancher/kubernetes/staging/src/k8s.io/api v1.16.2-k3s.1 k8s.io/api => github.com/rancher/kubernetes/staging/src/k8s.io/api v1.16.2-k3s.1
k8s.io/apiextensions-apiserver => github.com/rancher/kubernetes/staging/src/k8s.io/apiextensions-apiserver v1.16.2-k3s.1 k8s.io/apiextensions-apiserver => github.com/rancher/kubernetes/staging/src/k8s.io/apiextensions-apiserver v1.16.2-k3s.1
k8s.io/apimachinery => github.com/rancher/kubernetes/staging/src/k8s.io/apimachinery v1.16.2-k3s.1 k8s.io/apimachinery => github.com/rancher/kubernetes/staging/src/k8s.io/apimachinery v1.16.2-k3s.1

4
go.sum
View File

@ -375,8 +375,8 @@ github.com/heketi/tests v0.0.0-20151005000721-f3775cbcefd6/go.mod h1:xGMAM8JLi7U
github.com/heketi/utils v0.0.0-20170317161834-435bc5bdfa64/go.mod h1:RYlF4ghFZPPmk2TC5REt5OFwvfb6lzxFWrTWB+qs28s= github.com/heketi/utils v0.0.0-20170317161834-435bc5bdfa64/go.mod h1:RYlF4ghFZPPmk2TC5REt5OFwvfb6lzxFWrTWB+qs28s=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ibuildthecloud/kine v0.1.0 h1:nF/wtPm7qECzZeagEyZgy8seFcwpJz8LRF6pc7hYSl0= github.com/ibuildthecloud/kine v0.1.1 h1:HAqWHrjRDoqQ3+pKuJrk7Sx9nzoW3zotzhxMfAZ4YME=
github.com/ibuildthecloud/kine v0.1.0/go.mod h1:TTWUtUeu7dHQan9BrCtlRbKr9eK7epHqrBFOAae15Bg= github.com/ibuildthecloud/kine v0.1.1/go.mod h1:TTWUtUeu7dHQan9BrCtlRbKr9eK7epHqrBFOAae15Bg=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/imdario/mergo v0.3.7 h1:Y+UAYTZ7gDEuOfhxKWy+dvb5dRQ6rJjFSdX2HZY1/gI= github.com/imdario/mergo v0.3.7 h1:Y+UAYTZ7gDEuOfhxKWy+dvb5dRQ6rJjFSdX2HZY1/gI=
github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=

View File

@ -70,6 +70,7 @@ type Generic struct {
DeleteSQL string DeleteSQL string
UpdateCompactSQL string UpdateCompactSQL string
InsertSQL string InsertSQL string
FillSQL string
InsertLastInsertIDSQL string InsertLastInsertIDSQL string
} }
@ -162,6 +163,9 @@ func Open(driverName, dataSourceName string, paramCharacter string, numbered boo
InsertSQL: q(`INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value) InsertSQL: q(`INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value)
values(?, ?, ?, ?, ?, ?, ?, ?) RETURNING id`, paramCharacter, numbered), values(?, ?, ?, ?, ?, ?, ?, ?) RETURNING id`, paramCharacter, numbered),
FillSQL: q(`INSERT INTO kine(id, name, created, deleted, create_revision, prev_revision, lease, value, old_value)
values(?, ?, ?, ?, ?, ?, ?, ?, ?)`, paramCharacter, numbered),
}, nil }, nil
} }
@ -265,6 +269,15 @@ func (d *Generic) After(ctx context.Context, prefix string, rev int64) (*sql.Row
return d.query(ctx, sql, prefix, rev) return d.query(ctx, sql, prefix, rev)
} }
func (d *Generic) Fill(ctx context.Context, revision int64) error {
_, err := d.execute(ctx, d.FillSQL, revision, fmt.Sprintf("gap-%d", revision), 0, 1, 0, 0, 0, nil, nil)
return err
}
func (d *Generic) IsFill(key string) bool {
return strings.HasPrefix(key, "gap-")
}
func (d *Generic) Insert(ctx context.Context, key string, create, delete bool, createRevision, previousRevision int64, ttl int64, value, prevValue []byte) (id int64, err error) { func (d *Generic) Insert(ctx context.Context, key string, create, delete bool, createRevision, previousRevision int64, ttl int64, value, prevValue []byte) (id int64, err error) {
cVal := 0 cVal := 0
dVal := 0 dVal := 0

View File

@ -29,8 +29,8 @@ var (
create_revision INTEGER, create_revision INTEGER,
prev_revision INTEGER, prev_revision INTEGER,
lease INTEGER, lease INTEGER,
value BLOB, value MEDIUMBLOB,
old_value BLOB, old_value MEDIUMBLOB,
PRIMARY KEY (id) PRIMARY KEY (id)
);`, );`,
} }

View File

@ -2,6 +2,7 @@ package logstructured
import ( import (
"context" "context"
"sync"
"time" "time"
"github.com/rancher/kine/pkg/server" "github.com/rancher/kine/pkg/server"
@ -106,7 +107,13 @@ func (l *LogStructured) Create(ctx context.Context, key string, value []byte, le
createEvent.PrevKV = prevEvent.KV createEvent.PrevKV = prevEvent.KV
} }
return l.log.Append(ctx, createEvent) revRet, errRet = l.log.Append(ctx, createEvent)
if errRet != nil {
if _, prevEvent, err := l.get(ctx, key, 0, true); err == nil && prevEvent != nil && !prevEvent.Delete {
return 0, server.ErrKeyExists
}
}
return
} }
func (l *LogStructured) Delete(ctx context.Context, key string, revision int64) (revRet int64, kvRet *server.KeyValue, deletedRet bool, errRet error) { func (l *LogStructured) Delete(ctx context.Context, key string, revision int64) (revRet int64, kvRet *server.KeyValue, deletedRet bool, errRet error) {
@ -222,22 +229,60 @@ func (l *LogStructured) Update(ctx context.Context, key string, value []byte, re
return rev, updateEvent.KV, true, err return rev, updateEvent.KV, true, err
} }
func (l *LogStructured) ttl(ctx context.Context) { func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event {
// very naive TTL support result := make(chan *server.Event)
for events := range l.log.Watch(ctx, "/") { wg := sync.WaitGroup{}
for _, event := range events { wg.Add(2)
if event.KV.Lease <= 0 {
continue go func() {
wg.Wait()
close(result)
}()
go func() {
defer wg.Done()
rev, events, err := l.log.List(ctx, "/", "", 1000, 0, false)
for len(events) > 0 {
if err != nil {
logrus.Errorf("failed to read old events for ttl")
return
} }
go func(event *server.Event) {
select { for _, event := range events {
case <-ctx.Done(): if event.KV.Lease > 0 {
return result <- event
case <-time.After(time.Duration(event.KV.Lease) * time.Second):
} }
l.Delete(ctx, event.KV.Key, event.KV.ModRevision) }
}(event)
_, events, err = l.log.List(ctx, "/", events[len(events)-1].KV.Key, 1000, rev, false)
} }
}()
go func() {
defer wg.Done()
for events := range l.log.Watch(ctx, "/") {
for _, event := range events {
if event.KV.Lease > 0 {
result <- event
}
}
}
}()
return result
}
func (l *LogStructured) ttl(ctx context.Context) {
// vary naive TTL support
for event := range l.ttlEvents(ctx) {
go func(event *server.Event) {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(event.KV.Lease) * time.Second):
}
l.Delete(ctx, event.KV.Key, event.KV.ModRevision)
}(event)
} }
} }
@ -253,7 +298,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64
revision -= 1 revision -= 1
} }
result := make(chan []*server.Event) result := make(chan []*server.Event, 100)
rev, kvs, err := l.log.After(ctx, prefix, revision) rev, kvs, err := l.log.After(ctx, prefix, revision)
if err != nil { if err != nil {

View File

@ -37,6 +37,8 @@ type Dialect interface {
DeleteRevision(ctx context.Context, revision int64) error DeleteRevision(ctx context.Context, revision int64) error
GetCompactRevision(ctx context.Context) (int64, error) GetCompactRevision(ctx context.Context) (int64, error)
SetCompactRevision(ctx context.Context, revision int64) error SetCompactRevision(ctx context.Context, revision int64) error
Fill(ctx context.Context, revision int64) error
IsFill(key string) bool
} }
func (s *SQLLog) Start(ctx context.Context) error { func (s *SQLLog) Start(ctx context.Context) error {
@ -46,7 +48,11 @@ func (s *SQLLog) Start(ctx context.Context) error {
} }
func (s *SQLLog) compact() { func (s *SQLLog) compact() {
t := time.NewTicker(2 * time.Second) var (
nextEnd int64
)
t := time.NewTicker(5 * time.Minute)
nextEnd, _ = s.d.CurrentRevision(s.ctx)
outer: outer:
for { for {
@ -56,22 +62,23 @@ outer:
case <-t.C: case <-t.C:
} }
end, err := s.d.CurrentRevision(s.ctx) currentRev, err := s.d.CurrentRevision(s.ctx)
if err != nil { if err != nil {
logrus.Errorf("failed to get current revision: %v", err) logrus.Errorf("failed to get current revision: %v", err)
continue continue
} }
end := nextEnd
nextEnd = currentRev
cursor, err := s.d.GetCompactRevision(s.ctx) cursor, err := s.d.GetCompactRevision(s.ctx)
if err != nil { if err != nil {
logrus.Errorf("failed to get compact revision: %v", err) logrus.Errorf("failed to get compact revision: %v", err)
continue continue
} }
if end-cursor < 100 { // leave the last 1000
// Only run if we have at least 100 rows to process end = end - 1000
continue
}
savedCursor := cursor savedCursor := cursor
// Purposefully start at the current and redo the current as // Purposefully start at the current and redo the current as
@ -224,7 +231,7 @@ func RowsToEvents(rows *sql.Rows) (int64, int64, []*server.Event, error) {
} }
func (s *SQLLog) Watch(ctx context.Context, prefix string) <-chan []*server.Event { func (s *SQLLog) Watch(ctx context.Context, prefix string) <-chan []*server.Event {
res := make(chan []*server.Event) res := make(chan []*server.Event, 100)
values, err := s.broadcaster.Subscribe(ctx, s.startWatch) values, err := s.broadcaster.Subscribe(ctx, s.startWatch)
if err != nil { if err != nil {
return nil return nil
@ -266,10 +273,12 @@ func (s *SQLLog) startWatch() (chan interface{}, error) {
func (s *SQLLog) poll(result chan interface{}) { func (s *SQLLog) poll(result chan interface{}) {
var ( var (
last int64 last int64
skip int64
skipTime time.Time
) )
wait := time.NewTicker(120 * time.Second) wait := time.NewTicker(time.Second)
defer wait.Stop() defer wait.Stop()
defer close(result) defer close(result)
@ -284,13 +293,22 @@ func (s *SQLLog) poll(result chan interface{}) {
case <-wait.C: case <-wait.C:
} }
if last == 0 {
if currentRev, err := s.CurrentRevision(s.ctx); err != nil {
logrus.Errorf("failed to find current revision: %v", err)
continue
} else {
last = currentRev
}
}
rows, err := s.d.After(s.ctx, "%", last) rows, err := s.d.After(s.ctx, "%", last)
if err != nil { if err != nil {
logrus.Errorf("fail to list latest changes: %v", err) logrus.Errorf("fail to list latest changes: %v", err)
continue continue
} }
rev, _, events, err := RowsToEvents(rows) _, _, events, err := RowsToEvents(rows)
if err != nil { if err != nil {
logrus.Errorf("fail to convert rows changes: %v", err) logrus.Errorf("fail to convert rows changes: %v", err)
continue continue
@ -300,15 +318,73 @@ func (s *SQLLog) poll(result chan interface{}) {
continue continue
} }
rev := last
var (
sequential []*server.Event
saveLast bool
)
for _, event := range events { for _, event := range events {
logrus.Debugf("TRIGGERED %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete) next := rev + 1
// Ensure that we are notifying events in a sequential fashion. For example if we find row 4 before 3
// we don't want to notify row 4 because 3 is essentially dropped forever.
if event.KV.ModRevision != next {
if canSkipRevision(next, skip, skipTime) {
// This situation should never happen, but we have it here as a fallback just for unknown reasons
// we don't want to pause all watches forever
logrus.Errorf("GAP %s, revision=%d, delete=%v, next=%d", event.KV.Key, event.KV.ModRevision, event.Delete, next)
} else if skip != next {
// This is the first time we have encountered this missing revision, so record time start
// and trigger a quick retry for simple out of order events
skip = next
skipTime = time.Now()
select {
case s.notify <- next:
default:
}
break
} else {
if err := s.d.Fill(s.ctx, next); err == nil {
logrus.Debugf("FILL, revision=%d, err=%v", next, err)
select {
case s.notify <- next:
default:
}
} else {
logrus.Debugf("FILL FAILED, revision=%d, err=%v", next, err)
}
break
}
}
// we have done something now that we should save the last revision. We don't save here now because
// the next loop could fail leading to saving the reported revision without reporting it. In practice this
// loop right now has no error exit so the next loop shouldn't fail, but if we for some reason add a method
// that returns error, that would be a tricky bug to find. So instead we only save the last revision at
// the same time we write to the channel.
saveLast = true
rev = event.KV.ModRevision
if s.d.IsFill(event.KV.Key) {
logrus.Debugf("NOT TRIGGER FILL %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete)
} else {
sequential = append(sequential, event)
logrus.Debugf("TRIGGERED %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete)
}
} }
result <- events if saveLast {
last = rev last = rev
if len(sequential) > 0 {
result <- sequential
}
}
} }
} }
func canSkipRevision(rev, skip int64, skipTime time.Time) bool {
return rev == skip && time.Now().Sub(skipTime) > time.Second
}
func (s *SQLLog) Count(ctx context.Context, prefix string) (int64, int64, error) { func (s *SQLLog) Count(ctx context.Context, prefix string) (int64, int64, error) {
if strings.HasSuffix(prefix, "/") { if strings.HasSuffix(prefix, "/") {
prefix += "%" prefix += "%"

View File

@ -87,7 +87,7 @@ func (w *watcher) Start(r *etcdserverpb.WatchCreateRequest) {
Events: toEvents(events...), Events: toEvents(events...),
}); err != nil { }); err != nil {
w.Cancel(id) w.Cancel(id)
return continue
} }
} }
logrus.Debugf("WATCH CLOSE id=%d, key=%s", id, key) logrus.Debugf("WATCH CLOSE id=%d, key=%s", id, key)

2
vendor/modules.txt vendored
View File

@ -743,7 +743,7 @@ github.com/rancher/helm-controller/pkg/generated/informers/externalversions/helm
github.com/rancher/helm-controller/pkg/generated/listers/helm.cattle.io/v1 github.com/rancher/helm-controller/pkg/generated/listers/helm.cattle.io/v1
github.com/rancher/helm-controller/pkg/generated/informers/externalversions/internalinterfaces github.com/rancher/helm-controller/pkg/generated/informers/externalversions/internalinterfaces
github.com/rancher/helm-controller/pkg/apis/helm.cattle.io github.com/rancher/helm-controller/pkg/apis/helm.cattle.io
# github.com/rancher/kine v0.0.0-00010101000000-000000000000 => github.com/ibuildthecloud/kine v0.1.0 # github.com/rancher/kine v0.0.0-00010101000000-000000000000 => github.com/ibuildthecloud/kine v0.1.1
github.com/rancher/kine/pkg/endpoint github.com/rancher/kine/pkg/endpoint
github.com/rancher/kine/pkg/client github.com/rancher/kine/pkg/client
github.com/rancher/kine/pkg/drivers/mysql github.com/rancher/kine/pkg/drivers/mysql