kine v0.1.1

This commit is contained in:
Darren Shepherd 2019-11-01 17:05:00 -07:00
parent 8833bfd961
commit 6290029e6d
8 changed files with 169 additions and 35 deletions

2
go.mod
View File

@ -31,7 +31,7 @@ replace (
github.com/prometheus/client_model => github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
github.com/prometheus/common => github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
github.com/prometheus/procfs => github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a
github.com/rancher/kine => github.com/ibuildthecloud/kine v0.1.0
github.com/rancher/kine => github.com/ibuildthecloud/kine v0.1.1
k8s.io/api => github.com/rancher/kubernetes/staging/src/k8s.io/api v1.16.2-k3s.1
k8s.io/apiextensions-apiserver => github.com/rancher/kubernetes/staging/src/k8s.io/apiextensions-apiserver v1.16.2-k3s.1
k8s.io/apimachinery => github.com/rancher/kubernetes/staging/src/k8s.io/apimachinery v1.16.2-k3s.1

4
go.sum
View File

@ -375,8 +375,8 @@ github.com/heketi/tests v0.0.0-20151005000721-f3775cbcefd6/go.mod h1:xGMAM8JLi7U
github.com/heketi/utils v0.0.0-20170317161834-435bc5bdfa64/go.mod h1:RYlF4ghFZPPmk2TC5REt5OFwvfb6lzxFWrTWB+qs28s=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ibuildthecloud/kine v0.1.0 h1:nF/wtPm7qECzZeagEyZgy8seFcwpJz8LRF6pc7hYSl0=
github.com/ibuildthecloud/kine v0.1.0/go.mod h1:TTWUtUeu7dHQan9BrCtlRbKr9eK7epHqrBFOAae15Bg=
github.com/ibuildthecloud/kine v0.1.1 h1:HAqWHrjRDoqQ3+pKuJrk7Sx9nzoW3zotzhxMfAZ4YME=
github.com/ibuildthecloud/kine v0.1.1/go.mod h1:TTWUtUeu7dHQan9BrCtlRbKr9eK7epHqrBFOAae15Bg=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/imdario/mergo v0.3.7 h1:Y+UAYTZ7gDEuOfhxKWy+dvb5dRQ6rJjFSdX2HZY1/gI=
github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=

View File

@ -70,6 +70,7 @@ type Generic struct {
DeleteSQL string
UpdateCompactSQL string
InsertSQL string
FillSQL string
InsertLastInsertIDSQL string
}
@ -162,6 +163,9 @@ func Open(driverName, dataSourceName string, paramCharacter string, numbered boo
InsertSQL: q(`INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value)
values(?, ?, ?, ?, ?, ?, ?, ?) RETURNING id`, paramCharacter, numbered),
FillSQL: q(`INSERT INTO kine(id, name, created, deleted, create_revision, prev_revision, lease, value, old_value)
values(?, ?, ?, ?, ?, ?, ?, ?, ?)`, paramCharacter, numbered),
}, nil
}
@ -265,6 +269,15 @@ func (d *Generic) After(ctx context.Context, prefix string, rev int64) (*sql.Row
return d.query(ctx, sql, prefix, rev)
}
func (d *Generic) Fill(ctx context.Context, revision int64) error {
_, err := d.execute(ctx, d.FillSQL, revision, fmt.Sprintf("gap-%d", revision), 0, 1, 0, 0, 0, nil, nil)
return err
}
func (d *Generic) IsFill(key string) bool {
return strings.HasPrefix(key, "gap-")
}
func (d *Generic) Insert(ctx context.Context, key string, create, delete bool, createRevision, previousRevision int64, ttl int64, value, prevValue []byte) (id int64, err error) {
cVal := 0
dVal := 0

View File

@ -29,8 +29,8 @@ var (
create_revision INTEGER,
prev_revision INTEGER,
lease INTEGER,
value BLOB,
old_value BLOB,
value MEDIUMBLOB,
old_value MEDIUMBLOB,
PRIMARY KEY (id)
);`,
}

View File

@ -2,6 +2,7 @@ package logstructured
import (
"context"
"sync"
"time"
"github.com/rancher/kine/pkg/server"
@ -106,7 +107,13 @@ func (l *LogStructured) Create(ctx context.Context, key string, value []byte, le
createEvent.PrevKV = prevEvent.KV
}
return l.log.Append(ctx, createEvent)
revRet, errRet = l.log.Append(ctx, createEvent)
if errRet != nil {
if _, prevEvent, err := l.get(ctx, key, 0, true); err == nil && prevEvent != nil && !prevEvent.Delete {
return 0, server.ErrKeyExists
}
}
return
}
func (l *LogStructured) Delete(ctx context.Context, key string, revision int64) (revRet int64, kvRet *server.KeyValue, deletedRet bool, errRet error) {
@ -222,22 +229,60 @@ func (l *LogStructured) Update(ctx context.Context, key string, value []byte, re
return rev, updateEvent.KV, true, err
}
func (l *LogStructured) ttl(ctx context.Context) {
// very naive TTL support
for events := range l.log.Watch(ctx, "/") {
for _, event := range events {
if event.KV.Lease <= 0 {
continue
func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event {
result := make(chan *server.Event)
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
wg.Wait()
close(result)
}()
go func() {
defer wg.Done()
rev, events, err := l.log.List(ctx, "/", "", 1000, 0, false)
for len(events) > 0 {
if err != nil {
logrus.Errorf("failed to read old events for ttl")
return
}
go func(event *server.Event) {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(event.KV.Lease) * time.Second):
for _, event := range events {
if event.KV.Lease > 0 {
result <- event
}
l.Delete(ctx, event.KV.Key, event.KV.ModRevision)
}(event)
}
_, events, err = l.log.List(ctx, "/", events[len(events)-1].KV.Key, 1000, rev, false)
}
}()
go func() {
defer wg.Done()
for events := range l.log.Watch(ctx, "/") {
for _, event := range events {
if event.KV.Lease > 0 {
result <- event
}
}
}
}()
return result
}
func (l *LogStructured) ttl(ctx context.Context) {
// vary naive TTL support
for event := range l.ttlEvents(ctx) {
go func(event *server.Event) {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(event.KV.Lease) * time.Second):
}
l.Delete(ctx, event.KV.Key, event.KV.ModRevision)
}(event)
}
}
@ -253,7 +298,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64
revision -= 1
}
result := make(chan []*server.Event)
result := make(chan []*server.Event, 100)
rev, kvs, err := l.log.After(ctx, prefix, revision)
if err != nil {

View File

@ -37,6 +37,8 @@ type Dialect interface {
DeleteRevision(ctx context.Context, revision int64) error
GetCompactRevision(ctx context.Context) (int64, error)
SetCompactRevision(ctx context.Context, revision int64) error
Fill(ctx context.Context, revision int64) error
IsFill(key string) bool
}
func (s *SQLLog) Start(ctx context.Context) error {
@ -46,7 +48,11 @@ func (s *SQLLog) Start(ctx context.Context) error {
}
func (s *SQLLog) compact() {
t := time.NewTicker(2 * time.Second)
var (
nextEnd int64
)
t := time.NewTicker(5 * time.Minute)
nextEnd, _ = s.d.CurrentRevision(s.ctx)
outer:
for {
@ -56,22 +62,23 @@ outer:
case <-t.C:
}
end, err := s.d.CurrentRevision(s.ctx)
currentRev, err := s.d.CurrentRevision(s.ctx)
if err != nil {
logrus.Errorf("failed to get current revision: %v", err)
continue
}
end := nextEnd
nextEnd = currentRev
cursor, err := s.d.GetCompactRevision(s.ctx)
if err != nil {
logrus.Errorf("failed to get compact revision: %v", err)
continue
}
if end-cursor < 100 {
// Only run if we have at least 100 rows to process
continue
}
// leave the last 1000
end = end - 1000
savedCursor := cursor
// Purposefully start at the current and redo the current as
@ -224,7 +231,7 @@ func RowsToEvents(rows *sql.Rows) (int64, int64, []*server.Event, error) {
}
func (s *SQLLog) Watch(ctx context.Context, prefix string) <-chan []*server.Event {
res := make(chan []*server.Event)
res := make(chan []*server.Event, 100)
values, err := s.broadcaster.Subscribe(ctx, s.startWatch)
if err != nil {
return nil
@ -266,10 +273,12 @@ func (s *SQLLog) startWatch() (chan interface{}, error) {
func (s *SQLLog) poll(result chan interface{}) {
var (
last int64
last int64
skip int64
skipTime time.Time
)
wait := time.NewTicker(120 * time.Second)
wait := time.NewTicker(time.Second)
defer wait.Stop()
defer close(result)
@ -284,13 +293,22 @@ func (s *SQLLog) poll(result chan interface{}) {
case <-wait.C:
}
if last == 0 {
if currentRev, err := s.CurrentRevision(s.ctx); err != nil {
logrus.Errorf("failed to find current revision: %v", err)
continue
} else {
last = currentRev
}
}
rows, err := s.d.After(s.ctx, "%", last)
if err != nil {
logrus.Errorf("fail to list latest changes: %v", err)
continue
}
rev, _, events, err := RowsToEvents(rows)
_, _, events, err := RowsToEvents(rows)
if err != nil {
logrus.Errorf("fail to convert rows changes: %v", err)
continue
@ -300,15 +318,73 @@ func (s *SQLLog) poll(result chan interface{}) {
continue
}
rev := last
var (
sequential []*server.Event
saveLast bool
)
for _, event := range events {
logrus.Debugf("TRIGGERED %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete)
next := rev + 1
// Ensure that we are notifying events in a sequential fashion. For example if we find row 4 before 3
// we don't want to notify row 4 because 3 is essentially dropped forever.
if event.KV.ModRevision != next {
if canSkipRevision(next, skip, skipTime) {
// This situation should never happen, but we have it here as a fallback just for unknown reasons
// we don't want to pause all watches forever
logrus.Errorf("GAP %s, revision=%d, delete=%v, next=%d", event.KV.Key, event.KV.ModRevision, event.Delete, next)
} else if skip != next {
// This is the first time we have encountered this missing revision, so record time start
// and trigger a quick retry for simple out of order events
skip = next
skipTime = time.Now()
select {
case s.notify <- next:
default:
}
break
} else {
if err := s.d.Fill(s.ctx, next); err == nil {
logrus.Debugf("FILL, revision=%d, err=%v", next, err)
select {
case s.notify <- next:
default:
}
} else {
logrus.Debugf("FILL FAILED, revision=%d, err=%v", next, err)
}
break
}
}
// we have done something now that we should save the last revision. We don't save here now because
// the next loop could fail leading to saving the reported revision without reporting it. In practice this
// loop right now has no error exit so the next loop shouldn't fail, but if we for some reason add a method
// that returns error, that would be a tricky bug to find. So instead we only save the last revision at
// the same time we write to the channel.
saveLast = true
rev = event.KV.ModRevision
if s.d.IsFill(event.KV.Key) {
logrus.Debugf("NOT TRIGGER FILL %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete)
} else {
sequential = append(sequential, event)
logrus.Debugf("TRIGGERED %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete)
}
}
result <- events
last = rev
if saveLast {
last = rev
if len(sequential) > 0 {
result <- sequential
}
}
}
}
func canSkipRevision(rev, skip int64, skipTime time.Time) bool {
return rev == skip && time.Now().Sub(skipTime) > time.Second
}
func (s *SQLLog) Count(ctx context.Context, prefix string) (int64, int64, error) {
if strings.HasSuffix(prefix, "/") {
prefix += "%"

View File

@ -87,7 +87,7 @@ func (w *watcher) Start(r *etcdserverpb.WatchCreateRequest) {
Events: toEvents(events...),
}); err != nil {
w.Cancel(id)
return
continue
}
}
logrus.Debugf("WATCH CLOSE id=%d, key=%s", id, key)

2
vendor/modules.txt vendored
View File

@ -743,7 +743,7 @@ github.com/rancher/helm-controller/pkg/generated/informers/externalversions/helm
github.com/rancher/helm-controller/pkg/generated/listers/helm.cattle.io/v1
github.com/rancher/helm-controller/pkg/generated/informers/externalversions/internalinterfaces
github.com/rancher/helm-controller/pkg/apis/helm.cattle.io
# github.com/rancher/kine v0.0.0-00010101000000-000000000000 => github.com/ibuildthecloud/kine v0.1.0
# github.com/rancher/kine v0.0.0-00010101000000-000000000000 => github.com/ibuildthecloud/kine v0.1.1
github.com/rancher/kine/pkg/endpoint
github.com/rancher/kine/pkg/client
github.com/rancher/kine/pkg/drivers/mysql