update kine to v0.4.1

Signed-off-by: Brian Downs <brian.downs@gmail.com>
This commit is contained in:
Brian Downs 2020-10-15 10:34:24 -07:00
parent 6b11d86037
commit 299fe83a1f
14 changed files with 76 additions and 48 deletions

2
go.mod
View File

@ -88,7 +88,7 @@ require (
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/rancher/dynamiclistener v0.2.1 github.com/rancher/dynamiclistener v0.2.1
github.com/rancher/helm-controller v0.7.3 github.com/rancher/helm-controller v0.7.3
github.com/rancher/kine v0.4.0 github.com/rancher/kine v0.4.1
github.com/rancher/remotedialer v0.2.0 github.com/rancher/remotedialer v0.2.0
github.com/rancher/wrangler v0.6.1 github.com/rancher/wrangler v0.6.1
github.com/rancher/wrangler-api v0.6.0 github.com/rancher/wrangler-api v0.6.0

2
go.sum
View File

@ -679,6 +679,8 @@ github.com/rancher/helm-controller v0.7.3 h1:WTQHcNF2vl9w6Xd1eBtXDe0JUsYMFFstqX9
github.com/rancher/helm-controller v0.7.3/go.mod h1:ZylsxIMGNADRPRNW+NiBWhrwwks9vnKLQiCHYWb6Bi0= github.com/rancher/helm-controller v0.7.3/go.mod h1:ZylsxIMGNADRPRNW+NiBWhrwwks9vnKLQiCHYWb6Bi0=
github.com/rancher/kine v0.4.0 h1:1IhWy3TzjExG8xnj46eyUEWdzqNAD1WrgL4eEBKm6Uc= github.com/rancher/kine v0.4.0 h1:1IhWy3TzjExG8xnj46eyUEWdzqNAD1WrgL4eEBKm6Uc=
github.com/rancher/kine v0.4.0/go.mod h1:IImtCJ68AIkE+VY/kUI0NkyJL5q5WzO8QvMsSXqbrpA= github.com/rancher/kine v0.4.0/go.mod h1:IImtCJ68AIkE+VY/kUI0NkyJL5q5WzO8QvMsSXqbrpA=
github.com/rancher/kine v0.4.1 h1:CPtGDXsov5t5onXwhZ97VBpaxDoj1MBHeQwB0TSrUu8=
github.com/rancher/kine v0.4.1/go.mod h1:IImtCJ68AIkE+VY/kUI0NkyJL5q5WzO8QvMsSXqbrpA=
github.com/rancher/kubernetes v1.19.2-k3s1 h1:/oTv57BwDcf8kapnr1ViYH98Fwk3vnklWmQdlI3vJE0= github.com/rancher/kubernetes v1.19.2-k3s1 h1:/oTv57BwDcf8kapnr1ViYH98Fwk3vnklWmQdlI3vJE0=
github.com/rancher/kubernetes v1.19.2-k3s1/go.mod h1:yhT1/ltQajQsha3tnYc9QPFYSumGM45nlZdjf7WqE1A= github.com/rancher/kubernetes v1.19.2-k3s1/go.mod h1:yhT1/ltQajQsha3tnYc9QPFYSumGM45nlZdjf7WqE1A=
github.com/rancher/kubernetes/staging/src/k8s.io/api v1.19.2-k3s1 h1:OPBCfsjKfgMaMt0mtWaoy+IirLeD+/CeVxoHXdP5bTE= github.com/rancher/kubernetes/staging/src/k8s.io/api v1.19.2-k3s1 h1:OPBCfsjKfgMaMt0mtWaoy+IirLeD+/CeVxoHXdP5bTE=

View File

@ -15,6 +15,7 @@ import (
"github.com/canonical/go-dqlite/client" "github.com/canonical/go-dqlite/client"
"github.com/canonical/go-dqlite/driver" "github.com/canonical/go-dqlite/driver"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/rancher/kine/pkg/drivers/generic"
"github.com/rancher/kine/pkg/drivers/sqlite" "github.com/rancher/kine/pkg/drivers/sqlite"
"github.com/rancher/kine/pkg/server" "github.com/rancher/kine/pkg/server"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -66,7 +67,7 @@ outer:
return nil return nil
} }
func New(ctx context.Context, datasourceName string) (server.Backend, error) { func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
opts, err := parseOpts(datasourceName) opts, err := parseOpts(datasourceName)
if err != nil { if err != nil {
return nil, err return nil, err
@ -95,7 +96,7 @@ func New(ctx context.Context, datasourceName string) (server.Backend, error) {
} }
sql.Register("dqlite", d) sql.Register("dqlite", d)
backend, generic, err := sqlite.NewVariant(ctx, "dqlite", opts.dsn) backend, generic, err := sqlite.NewVariant(ctx, "dqlite", opts.dsn, connPoolConfig)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "sqlite client") return nil, errors.Wrap(err, "sqlite client")
} }

View File

@ -4,11 +4,12 @@ package dqlite
import ( import (
"context" "context"
"fmt" "errors"
"github.com/rancher/kine/pkg/drivers/generic"
"github.com/rancher/kine/pkg/server" "github.com/rancher/kine/pkg/server"
) )
func New(ctx context.Context, datasourceName string) (server.Backend, error) { func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
return nil, fmt.Errorf("dqlite is not support, compile with \"-tags dqlite\"") return nil, errors.New(`this binary is built without dqlite support, compile with "-tags dqlite"`)
} }

View File

@ -15,6 +15,10 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
const (
defaultMaxIdleConns = 2 // copied from database/sql
)
var ( var (
columns = "kv.id as theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value" columns = "kv.id as theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value"
revSQL = ` revSQL = `
@ -64,6 +68,12 @@ func (s Stripped) String() string {
type ErrRetry func(error) bool type ErrRetry func(error) bool
type TranslateErr func(error) error type TranslateErr func(error) error
type ConnectionPoolConfig struct {
MaxIdle int // zero means defaultMaxIdleConns; negative means 0
MaxOpen int // <= 0 means unlimited
MaxLifetime time.Duration // maximum amount of time a connection may be reused
}
type Generic struct { type Generic struct {
sync.Mutex sync.Mutex
@ -128,6 +138,20 @@ func (d *Generic) Migrate(ctx context.Context) {
} }
} }
func configureConnectionPooling(connPoolConfig ConnectionPoolConfig, db *sql.DB) {
// behavior copied from database/sql - zero means defaultMaxIdleConns; negative means 0
if connPoolConfig.MaxIdle < 0 {
connPoolConfig.MaxIdle = 0
} else if connPoolConfig.MaxIdle == 0 {
connPoolConfig.MaxIdle = defaultMaxIdleConns
}
logrus.Infof("Configuring DB connection pooling: maxIdleConns=%d, maxOpenConns=%d, connMaxLifetime=%s", connPoolConfig.MaxIdle, connPoolConfig.MaxOpen, connPoolConfig.MaxLifetime)
db.SetMaxIdleConns(connPoolConfig.MaxIdle)
db.SetMaxOpenConns(connPoolConfig.MaxOpen)
db.SetConnMaxLifetime(connPoolConfig.MaxLifetime)
}
func openAndTest(driverName, dataSourceName string) (*sql.DB, error) { func openAndTest(driverName, dataSourceName string) (*sql.DB, error) {
db, err := sql.Open(driverName, dataSourceName) db, err := sql.Open(driverName, dataSourceName)
if err != nil { if err != nil {
@ -144,7 +168,7 @@ func openAndTest(driverName, dataSourceName string) (*sql.DB, error) {
return db, nil return db, nil
} }
func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter string, numbered bool) (*Generic, error) { func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig ConnectionPoolConfig, paramCharacter string, numbered bool) (*Generic, error) {
var ( var (
db *sql.DB db *sql.DB
err error err error
@ -164,6 +188,8 @@ func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter
} }
} }
configureConnectionPooling(connPoolConfig, db)
return &Generic{ return &Generic{
DB: db, DB: db,
@ -229,11 +255,7 @@ func (d *Generic) execute(ctx context.Context, sql string, args ...interface{})
wait := strategy.Backoff(backoff.Linear(100 + time.Millisecond)) wait := strategy.Backoff(backoff.Linear(100 + time.Millisecond))
for i := uint(0); i < 20; i++ { for i := uint(0); i < 20; i++ {
if i > 2 {
logrus.Debugf("EXEC (try: %d) %v : %s", i, args, Stripped(sql))
} else {
logrus.Tracef("EXEC (try: %d) %v : %s", i, args, Stripped(sql)) logrus.Tracef("EXEC (try: %d) %v : %s", i, args, Stripped(sql))
}
result, err = d.DB.ExecContext(ctx, sql, args...) result, err = d.DB.ExecContext(ctx, sql, args...)
if err != nil && d.Retry != nil && d.Retry(err) { if err != nil && d.Retry != nil && d.Retry(err) {
wait(i) wait(i)

View File

@ -40,7 +40,7 @@ var (
createDB = "create database if not exists " createDB = "create database if not exists "
) )
func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config) (server.Backend, error) { func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
tlsConfig, err := tlsInfo.ClientConfig() tlsConfig, err := tlsInfo.ClientConfig()
if err != nil { if err != nil {
return nil, err return nil, err
@ -59,7 +59,7 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config) (server
return nil, err return nil, err
} }
dialect, err := generic.Open(ctx, "mysql", parsedDSN, "?", false) dialect, err := generic.Open(ctx, "mysql", parsedDSN, connPoolConfig, "?", false)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -41,7 +41,7 @@ var (
createDB = "create database " createDB = "create database "
) )
func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config) (server.Backend, error) { func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
parsedDSN, err := prepareDSN(dataSourceName, tlsInfo) parsedDSN, err := prepareDSN(dataSourceName, tlsInfo)
if err != nil { if err != nil {
return nil, err return nil, err
@ -51,7 +51,7 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config) (server
return nil, err return nil, err
} }
dialect, err := generic.Open(ctx, "postgres", parsedDSN, "$", true) dialect, err := generic.Open(ctx, "postgres", parsedDSN, connPoolConfig, "$", true)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -39,12 +39,12 @@ var (
} }
) )
func New(ctx context.Context, dataSourceName string) (server.Backend, error) { func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName) backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName, connPoolConfig)
return backend, err return backend, err
} }
func NewVariant(ctx context.Context, driverName, dataSourceName string) (server.Backend, *generic.Generic, error) { func NewVariant(ctx context.Context, driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, *generic.Generic, error) {
if dataSourceName == "" { if dataSourceName == "" {
if err := os.MkdirAll("./db", 0700); err != nil { if err := os.MkdirAll("./db", 0700); err != nil {
return nil, nil, err return nil, nil, err
@ -52,7 +52,7 @@ func NewVariant(ctx context.Context, driverName, dataSourceName string) (server.
dataSourceName = "./db/state.db?_journal=WAL&cache=shared" dataSourceName = "./db/state.db?_journal=WAL&cache=shared"
} }
dialect, err := generic.Open(ctx, driverName, dataSourceName, "?", false) dialect, err := generic.Open(ctx, driverName, dataSourceName, connPoolConfig, "?", false)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -13,11 +13,11 @@ import (
var errNoCgo = errors.New("this binary is built without CGO, sqlite is disabled") var errNoCgo = errors.New("this binary is built without CGO, sqlite is disabled")
func New(ctx context.Context, dataSourceName string) (server.Backend, error) { func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
return nil, errNoCgo return nil, errNoCgo
} }
func NewVariant(driverName, dataSourceName string) (server.Backend, *generic.Generic, error) { func NewVariant(driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, *generic.Generic, error) {
return nil, nil, errNoCgo return nil, nil, errNoCgo
} }

View File

@ -9,6 +9,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/rancher/kine/pkg/drivers/dqlite" "github.com/rancher/kine/pkg/drivers/dqlite"
"github.com/rancher/kine/pkg/drivers/generic"
"github.com/rancher/kine/pkg/drivers/mysql" "github.com/rancher/kine/pkg/drivers/mysql"
"github.com/rancher/kine/pkg/drivers/pgsql" "github.com/rancher/kine/pkg/drivers/pgsql"
"github.com/rancher/kine/pkg/drivers/sqlite" "github.com/rancher/kine/pkg/drivers/sqlite"
@ -31,6 +32,7 @@ type Config struct {
GRPCServer *grpc.Server GRPCServer *grpc.Server
Listener string Listener string
Endpoint string Endpoint string
ConnectionPoolConfig generic.ConnectionPoolConfig
tls.Config tls.Config
} }
@ -124,13 +126,13 @@ func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config)
switch driver { switch driver {
case SQLiteBackend: case SQLiteBackend:
leaderElect = false leaderElect = false
backend, err = sqlite.New(ctx, dsn) backend, err = sqlite.New(ctx, dsn, cfg.ConnectionPoolConfig)
case DQLiteBackend: case DQLiteBackend:
backend, err = dqlite.New(ctx, dsn) backend, err = dqlite.New(ctx, dsn, cfg.ConnectionPoolConfig)
case PostgresBackend: case PostgresBackend:
backend, err = pgsql.New(ctx, dsn, cfg.Config) backend, err = pgsql.New(ctx, dsn, cfg.Config, cfg.ConnectionPoolConfig)
case MySQLBackend: case MySQLBackend:
backend, err = mysql.New(ctx, dsn, cfg.Config) backend, err = mysql.New(ctx, dsn, cfg.Config, cfg.ConnectionPoolConfig)
default: default:
return false, nil, fmt.Errorf("storage backend is not defined") return false, nil, fmt.Errorf("storage backend is not defined")
} }

View File

@ -41,7 +41,7 @@ func (l *LogStructured) Start(ctx context.Context) error {
func (l *LogStructured) Get(ctx context.Context, key string, revision int64) (revRet int64, kvRet *server.KeyValue, errRet error) { func (l *LogStructured) Get(ctx context.Context, key string, revision int64) (revRet int64, kvRet *server.KeyValue, errRet error) {
defer func() { defer func() {
l.adjustRevision(ctx, &revRet) l.adjustRevision(ctx, &revRet)
logrus.Debugf("GET %s, rev=%d => rev=%d, kv=%v, err=%v", key, revision, revRet, kvRet != nil, errRet) logrus.Tracef("GET %s, rev=%d => rev=%d, kv=%v, err=%v", key, revision, revRet, kvRet != nil, errRet)
}() }()
rev, event, err := l.get(ctx, key, revision, false) rev, event, err := l.get(ctx, key, revision, false)
@ -82,7 +82,7 @@ func (l *LogStructured) adjustRevision(ctx context.Context, rev *int64) {
func (l *LogStructured) Create(ctx context.Context, key string, value []byte, lease int64) (revRet int64, errRet error) { func (l *LogStructured) Create(ctx context.Context, key string, value []byte, lease int64) (revRet int64, errRet error) {
defer func() { defer func() {
l.adjustRevision(ctx, &revRet) l.adjustRevision(ctx, &revRet)
logrus.Debugf("CREATE %s, size=%d, lease=%d => rev=%d, err=%v", key, len(value), lease, revRet, errRet) logrus.Tracef("CREATE %s, size=%d, lease=%d => rev=%d, err=%v", key, len(value), lease, revRet, errRet)
}() }()
rev, prevEvent, err := l.get(ctx, key, 0, true) rev, prevEvent, err := l.get(ctx, key, 0, true)
@ -114,7 +114,7 @@ func (l *LogStructured) Create(ctx context.Context, key string, value []byte, le
func (l *LogStructured) Delete(ctx context.Context, key string, revision int64) (revRet int64, kvRet *server.KeyValue, deletedRet bool, errRet error) { func (l *LogStructured) Delete(ctx context.Context, key string, revision int64) (revRet int64, kvRet *server.KeyValue, deletedRet bool, errRet error) {
defer func() { defer func() {
l.adjustRevision(ctx, &revRet) l.adjustRevision(ctx, &revRet)
logrus.Debugf("DELETE %s, rev=%d => rev=%d, kv=%v, deleted=%v, err=%v", key, revision, revRet, kvRet != nil, deletedRet, errRet) logrus.Tracef("DELETE %s, rev=%d => rev=%d, kv=%v, deleted=%v, err=%v", key, revision, revRet, kvRet != nil, deletedRet, errRet)
}() }()
rev, event, err := l.get(ctx, key, 0, true) rev, event, err := l.get(ctx, key, 0, true)
@ -155,7 +155,7 @@ func (l *LogStructured) Delete(ctx context.Context, key string, revision int64)
func (l *LogStructured) List(ctx context.Context, prefix, startKey string, limit, revision int64) (revRet int64, kvRet []*server.KeyValue, errRet error) { func (l *LogStructured) List(ctx context.Context, prefix, startKey string, limit, revision int64) (revRet int64, kvRet []*server.KeyValue, errRet error) {
defer func() { defer func() {
logrus.Debugf("LIST %s, start=%s, limit=%d, rev=%d => rev=%d, kvs=%d, err=%v", prefix, startKey, limit, revision, revRet, len(kvRet), errRet) logrus.Tracef("LIST %s, start=%s, limit=%d, rev=%d => rev=%d, kvs=%d, err=%v", prefix, startKey, limit, revision, revRet, len(kvRet), errRet)
}() }()
rev, events, err := l.log.List(ctx, prefix, startKey, limit, revision, false) rev, events, err := l.log.List(ctx, prefix, startKey, limit, revision, false)
@ -185,7 +185,7 @@ func (l *LogStructured) List(ctx context.Context, prefix, startKey string, limit
func (l *LogStructured) Count(ctx context.Context, prefix string) (revRet int64, count int64, err error) { func (l *LogStructured) Count(ctx context.Context, prefix string) (revRet int64, count int64, err error) {
defer func() { defer func() {
logrus.Debugf("COUNT %s => rev=%d, count=%d, err=%v", prefix, revRet, count, err) logrus.Tracef("COUNT %s => rev=%d, count=%d, err=%v", prefix, revRet, count, err)
}() }()
rev, count, err := l.log.Count(ctx, prefix) rev, count, err := l.log.Count(ctx, prefix)
if err != nil { if err != nil {
@ -211,7 +211,7 @@ func (l *LogStructured) Update(ctx context.Context, key string, value []byte, re
if kvRet != nil { if kvRet != nil {
kvRev = kvRet.ModRevision kvRev = kvRet.ModRevision
} }
logrus.Debugf("UPDATE %s, value=%d, rev=%d, lease=%v => rev=%d, kvrev=%d, updated=%v, err=%v", key, len(value), revision, lease, revRet, kvRev, updateRet, errRet) logrus.Tracef("UPDATE %s, value=%d, rev=%d, lease=%v => rev=%d, kvrev=%d, updated=%v, err=%v", key, len(value), revision, lease, revRet, kvRev, updateRet, errRet)
}() }()
rev, event, err := l.get(ctx, key, 0, false) rev, event, err := l.get(ctx, key, 0, false)
@ -311,7 +311,7 @@ func (l *LogStructured) ttl(ctx context.Context) {
} }
func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event { func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event {
logrus.Debugf("WATCH %s, revision=%d", prefix, revision) logrus.Tracef("WATCH %s, revision=%d", prefix, revision)
// starting watching right away so we don't miss anything // starting watching right away so we don't miss anything
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
@ -319,7 +319,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64
// include the current revision in list // include the current revision in list
if revision > 0 { if revision > 0 {
revision -= 1 revision--
} }
result := make(chan []*server.Event, 100) result := make(chan []*server.Event, 100)
@ -330,7 +330,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64
cancel() cancel()
} }
logrus.Debugf("WATCH LIST key=%s rev=%d => rev=%d kvs=%d", prefix, revision, rev, len(kvs)) logrus.Tracef("WATCH LIST key=%s rev=%d => rev=%d kvs=%d", prefix, revision, rev, len(kvs))
go func() { go func() {
lastRevision := revision lastRevision := revision

View File

@ -411,13 +411,13 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
break break
} else { } else {
if err := s.d.Fill(s.ctx, next); err == nil { if err := s.d.Fill(s.ctx, next); err == nil {
logrus.Debugf("FILL, revision=%d, err=%v", next, err) logrus.Tracef("FILL, revision=%d, err=%v", next, err)
select { select {
case s.notify <- next: case s.notify <- next:
default: default:
} }
} else { } else {
logrus.Debugf("FILL FAILED, revision=%d, err=%v", next, err) logrus.Tracef("FILL FAILED, revision=%d, err=%v", next, err)
} }
break break
} }
@ -431,10 +431,10 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
saveLast = true saveLast = true
rev = event.KV.ModRevision rev = event.KV.ModRevision
if s.d.IsFill(event.KV.Key) { if s.d.IsFill(event.KV.Key) {
logrus.Debugf("NOT TRIGGER FILL %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete) logrus.Tracef("NOT TRIGGER FILL %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete)
} else { } else {
sequential = append(sequential, event) sequential = append(sequential, event)
logrus.Debugf("TRIGGERED %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete) logrus.Tracef("TRIGGERED %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete)
} }
} }

View File

@ -31,7 +31,7 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error {
if msg.GetCreateRequest() != nil { if msg.GetCreateRequest() != nil {
w.Start(ws.Context(), msg.GetCreateRequest()) w.Start(ws.Context(), msg.GetCreateRequest())
} else if msg.GetCancelRequest() != nil { } else if msg.GetCancelRequest() != nil {
logrus.Debugf("WATCH CANCEL REQ id=%d", msg.GetCancelRequest().GetWatchId()) logrus.Tracef("WATCH CANCEL REQ id=%d", msg.GetCancelRequest().GetWatchId())
w.Cancel(msg.GetCancelRequest().WatchId, nil) w.Cancel(msg.GetCancelRequest().WatchId, nil)
} }
} }
@ -58,7 +58,7 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest)
key := string(r.Key) key := string(r.Key)
logrus.Debugf("WATCH START id=%d, count=%d, key=%s, revision=%d", id, len(w.watches), key, r.StartRevision) logrus.Tracef("WATCH START id=%d, count=%d, key=%s, revision=%d", id, len(w.watches), key, r.StartRevision)
go func() { go func() {
defer w.wg.Done() defer w.wg.Done()
@ -78,7 +78,7 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest)
if logrus.IsLevelEnabled(logrus.DebugLevel) { if logrus.IsLevelEnabled(logrus.DebugLevel) {
for _, event := range events { for _, event := range events {
logrus.Debugf("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision) logrus.Tracef("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision)
} }
} }
@ -92,7 +92,7 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest)
} }
} }
w.Cancel(id, nil) w.Cancel(id, nil)
logrus.Debugf("WATCH CLOSE id=%d, key=%s", id, key) logrus.Tracef("WATCH CLOSE id=%d, key=%s", id, key)
}() }()
} }
@ -130,7 +130,7 @@ func (w *watcher) Cancel(watchID int64, err error) {
if err != nil { if err != nil {
reason = err.Error() reason = err.Error()
} }
logrus.Debugf("WATCH CANCEL id=%d reason=%s", watchID, reason) logrus.Tracef("WATCH CANCEL id=%d reason=%s", watchID, reason)
serr := w.server.Send(&etcdserverpb.WatchResponse{ serr := w.server.Send(&etcdserverpb.WatchResponse{
Header: &etcdserverpb.ResponseHeader{}, Header: &etcdserverpb.ResponseHeader{},
Canceled: true, Canceled: true,

2
vendor/modules.txt vendored
View File

@ -836,7 +836,7 @@ github.com/rancher/helm-controller/pkg/generated/informers/externalversions/helm
github.com/rancher/helm-controller/pkg/generated/informers/externalversions/internalinterfaces github.com/rancher/helm-controller/pkg/generated/informers/externalversions/internalinterfaces
github.com/rancher/helm-controller/pkg/generated/listers/helm.cattle.io/v1 github.com/rancher/helm-controller/pkg/generated/listers/helm.cattle.io/v1
github.com/rancher/helm-controller/pkg/helm github.com/rancher/helm-controller/pkg/helm
# github.com/rancher/kine v0.4.0 # github.com/rancher/kine v0.4.1
## explicit ## explicit
github.com/rancher/kine/pkg/broadcaster github.com/rancher/kine/pkg/broadcaster
github.com/rancher/kine/pkg/client github.com/rancher/kine/pkg/client