Bump kine for metrics/tls changes

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
This commit is contained in:
Brad Davidson 2021-08-30 13:43:25 -07:00 committed by Brad Davidson
parent ad1a40a96c
commit b8add39b07
26 changed files with 648 additions and 190 deletions

2
go.mod
View File

@ -90,7 +90,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/k3s-io/helm-controller v0.11.3
github.com/k3s-io/kine v0.7.3
github.com/k3s-io/kine v0.8.0
github.com/klauspost/compress v1.12.2
github.com/kubernetes-sigs/cri-tools v0.0.0-00010101000000-000000000000
github.com/lib/pq v1.10.2

4
go.sum
View File

@ -562,8 +562,8 @@ github.com/k3s-io/etcd/server/v3 v3.5.0-k3s1 h1:be2d2LS1w+OxyHE/PF6tmGVzF72+7L2F
github.com/k3s-io/etcd/server/v3 v3.5.0-k3s1/go.mod h1:3Ah5ruV+M+7RZr0+Y/5mNLwC+eQlni+mQmOVdCRJoS4=
github.com/k3s-io/helm-controller v0.11.3 h1:DSPAOCGHxF5pmF4vzQP5AgPT3tiGRJRZu+08hwWtbGI=
github.com/k3s-io/helm-controller v0.11.3/go.mod h1:z0ExsRRIkTO/QC//3/Esn5ItTD6AiQSluwzMaS7RI/4=
github.com/k3s-io/kine v0.7.3 h1:YHj4DQmTGuF4GyriKyFx5gH9ywchCadHhl3Llj5oyuw=
github.com/k3s-io/kine v0.7.3/go.mod h1:tksdHCn3Vof8ZKHd+QJkLRi4LbyQglKBrokaBBrHIcY=
github.com/k3s-io/kine v0.8.0 h1:k6T9bI9DID7lIbktukXxg1QfeFoAQK4EIvAHoyPAe08=
github.com/k3s-io/kine v0.8.0/go.mod h1:gaezUQ9c8iw8vxDV/DI8vc93h2rCpTvY37kMdYPMsyc=
github.com/k3s-io/kubernetes v1.22.1-k3s1 h1:evi+JkMnC0HDEJ8A7aLqPKZ+0cP/CK6udFquf7KVq1I=
github.com/k3s-io/kubernetes v1.22.1-k3s1/go.mod h1:IGQZrV02n2IBp52+/YwLVMurCEQPKXJ/k8hU3mqEOuA=
github.com/k3s-io/kubernetes/staging/src/k8s.io/api v1.22.1-k3s1 h1:BlTOjlJwCV9ZNgTgyByLJXn7emlDAQC/duN14pUtWLk=

View File

@ -111,9 +111,9 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
serverConfig.ControlConfig.ExtraSchedulerAPIArgs = cfg.ExtraSchedulerArgs
serverConfig.ControlConfig.ClusterDomain = cfg.ClusterDomain
serverConfig.ControlConfig.Datastore.Endpoint = cfg.DatastoreEndpoint
serverConfig.ControlConfig.Datastore.CAFile = cfg.DatastoreCAFile
serverConfig.ControlConfig.Datastore.CertFile = cfg.DatastoreCertFile
serverConfig.ControlConfig.Datastore.KeyFile = cfg.DatastoreKeyFile
serverConfig.ControlConfig.Datastore.BackendTLSConfig.CAFile = cfg.DatastoreCAFile
serverConfig.ControlConfig.Datastore.BackendTLSConfig.CertFile = cfg.DatastoreCertFile
serverConfig.ControlConfig.Datastore.BackendTLSConfig.KeyFile = cfg.DatastoreKeyFile
serverConfig.ControlConfig.AdvertiseIP = cfg.AdvertiseIP
serverConfig.ControlConfig.AdvertisePort = cfg.AdvertisePort
serverConfig.ControlConfig.FlannelBackend = cfg.FlannelBackend

View File

@ -147,7 +147,7 @@ func (c *Cluster) startStorage(ctx context.Context) error {
// based on what the kine wrapper tells us about the datastore. Single-node datastores like sqlite don't require
// leader election, while basically all others (etcd, external database, etc) do since they allow multiple servers.
c.etcdConfig = etcdConfig
c.config.Datastore.Config = etcdConfig.TLSConfig
c.config.Datastore.BackendTLSConfig = etcdConfig.TLSConfig
c.config.Datastore.Endpoint = strings.Join(etcdConfig.Endpoints, ",")
c.config.NoLeaderElect = !etcdConfig.LeaderElect
return nil

View File

@ -272,14 +272,14 @@ func setupStorageBackend(argsMap map[string]string, cfg *config.Control) {
argsMap["etcd-servers"] = cfg.Datastore.Endpoint
}
// storage backend tls configuration
if len(cfg.Datastore.CAFile) > 0 {
argsMap["etcd-cafile"] = cfg.Datastore.CAFile
if len(cfg.Datastore.BackendTLSConfig.CAFile) > 0 {
argsMap["etcd-cafile"] = cfg.Datastore.BackendTLSConfig.CAFile
}
if len(cfg.Datastore.CertFile) > 0 {
argsMap["etcd-certfile"] = cfg.Datastore.CertFile
if len(cfg.Datastore.BackendTLSConfig.CertFile) > 0 {
argsMap["etcd-certfile"] = cfg.Datastore.BackendTLSConfig.CertFile
}
if len(cfg.Datastore.KeyFile) > 0 {
argsMap["etcd-keyfile"] = cfg.Datastore.KeyFile
if len(cfg.Datastore.BackendTLSConfig.KeyFile) > 0 {
argsMap["etcd-keyfile"] = cfg.Datastore.BackendTLSConfig.KeyFile
}
}

View File

@ -363,9 +363,9 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt
}
e.address = address
e.config.Datastore.Endpoint = endpoint
e.config.Datastore.Config.CAFile = e.runtime.ETCDServerCA
e.config.Datastore.Config.CertFile = e.runtime.ClientETCDCert
e.config.Datastore.Config.KeyFile = e.runtime.ClientETCDKey
e.config.Datastore.BackendTLSConfig.CAFile = e.runtime.ETCDServerCA
e.config.Datastore.BackendTLSConfig.CertFile = e.runtime.ClientETCDCert
e.config.Datastore.BackendTLSConfig.KeyFile = e.runtime.ClientETCDKey
if err := e.setName(false); err != nil {
return nil, err

118
vendor/github.com/google/uuid/null.go generated vendored Normal file
View File

@ -0,0 +1,118 @@
// Copyright 2021 Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package uuid
import (
"bytes"
"database/sql/driver"
"encoding/json"
"fmt"
)
var jsonNull = []byte("null")
// NullUUID represents a UUID that may be null.
// NullUUID implements the SQL driver.Scanner interface so
// it can be used as a scan destination:
//
// var u uuid.NullUUID
// err := db.QueryRow("SELECT name FROM foo WHERE id=?", id).Scan(&u)
// ...
// if u.Valid {
// // use u.UUID
// } else {
// // NULL value
// }
//
type NullUUID struct {
UUID UUID
Valid bool // Valid is true if UUID is not NULL
}
// Scan implements the SQL driver.Scanner interface.
func (nu *NullUUID) Scan(value interface{}) error {
if value == nil {
nu.UUID, nu.Valid = Nil, false
return nil
}
err := nu.UUID.Scan(value)
if err != nil {
nu.Valid = false
return err
}
nu.Valid = true
return nil
}
// Value implements the driver Valuer interface.
func (nu NullUUID) Value() (driver.Value, error) {
if !nu.Valid {
return nil, nil
}
// Delegate to UUID Value function
return nu.UUID.Value()
}
// MarshalBinary implements encoding.BinaryMarshaler.
func (nu NullUUID) MarshalBinary() ([]byte, error) {
if nu.Valid {
return nu.UUID[:], nil
}
return []byte(nil), nil
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (nu *NullUUID) UnmarshalBinary(data []byte) error {
if len(data) != 16 {
return fmt.Errorf("invalid UUID (got %d bytes)", len(data))
}
copy(nu.UUID[:], data)
nu.Valid = true
return nil
}
// MarshalText implements encoding.TextMarshaler.
func (nu NullUUID) MarshalText() ([]byte, error) {
if nu.Valid {
return nu.UUID.MarshalText()
}
return jsonNull, nil
}
// UnmarshalText implements encoding.TextUnmarshaler.
func (nu *NullUUID) UnmarshalText(data []byte) error {
id, err := ParseBytes(data)
if err != nil {
nu.Valid = false
return err
}
nu.UUID = id
nu.Valid = true
return nil
}
// MarshalJSON implements json.Marshaler.
func (nu NullUUID) MarshalJSON() ([]byte, error) {
if nu.Valid {
return json.Marshal(nu.UUID)
}
return jsonNull, nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (nu *NullUUID) UnmarshalJSON(data []byte) error {
if bytes.Equal(data, jsonNull) {
*nu = NullUUID{}
return nil // valid null UUID
}
err := json.Unmarshal(data, &nu.UUID)
nu.Valid = err == nil
return err
}

View File

@ -3,6 +3,7 @@ package generic
import (
"context"
"database/sql"
"errors"
"fmt"
"regexp"
"strconv"
@ -12,6 +13,7 @@ import (
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/k3s-io/kine/pkg/server"
"github.com/sirupsen/logrus"
)
@ -19,6 +21,9 @@ const (
defaultMaxIdleConns = 2 // copied from database/sql
)
// explicit interface check
var _ server.Dialect = (*Generic)(nil)
var (
columns = "kv.id AS theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value"
revSQL = `
@ -92,6 +97,7 @@ type Generic struct {
InsertSQL string
FillSQL string
InsertLastInsertIDSQL string
GetSizeSQL string
Retry ErrRetry
TranslateErr TranslateErr
}
@ -393,3 +399,15 @@ func (d *Generic) Insert(ctx context.Context, key string, create, delete bool, c
err = row.Scan(&id)
return id, err
}
func (d *Generic) GetSize(ctx context.Context) (int64, error) {
if d.GetSizeSQL == "" {
return 0, errors.New("driver does not support size reporting")
}
var size int64
row := d.queryRow(ctx, d.GetSizeSQL)
if err := row.Scan(&size); err != nil {
return 0, err
}
return size, nil
}

View File

@ -4,15 +4,19 @@ import (
"context"
"database/sql"
"github.com/k3s-io/kine/pkg/server"
"github.com/sirupsen/logrus"
)
// explicit interface check
var _ server.Transaction = (*Tx)(nil)
type Tx struct {
x *sql.Tx
d *Generic
}
func (d *Generic) BeginTx(ctx context.Context, opts *sql.TxOptions) (*Tx, error) {
func (d *Generic) BeginTx(ctx context.Context, opts *sql.TxOptions) (server.Transaction, error) {
logrus.Tracef("TX BEGIN")
x, err := d.DB.BeginTx(ctx, opts)
if err != nil {

View File

@ -68,6 +68,10 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
}
dialect.LastInsertID = true
dialect.GetSizeSQL = `
SELECT SUM(data_length + index_length)
FROM information_schema.TABLES
WHERE table_schema = DATABASE() AND table_name = 'kine'`
dialect.CompactSQL = `
DELETE kv FROM kine AS kv
INNER JOIN (

View File

@ -58,6 +58,7 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
if err != nil {
return nil, err
}
dialect.GetSizeSQL = `SELECT pg_total_relation_size('kine')`
dialect.CompactSQL = `
DELETE FROM kine AS kv
USING (

View File

@ -61,6 +61,7 @@ func NewVariant(ctx context.Context, driverName, dataSourceName string, connPool
}
dialect.LastInsertID = true
dialect.GetSizeSQL = `SELECT SUM(pgsize) FROM dbstat`
dialect.CompactSQL = `
DELETE FROM kine AS kv
WHERE

View File

@ -16,8 +16,10 @@ import (
"github.com/k3s-io/kine/pkg/tls"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/soheilhy/cmux"
"go.etcd.io/etcd/server/v3/embed"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)
@ -35,8 +37,8 @@ type Config struct {
Listener string
Endpoint string
ConnectionPoolConfig generic.ConnectionPoolConfig
tls.Config
ServerTLSConfig tls.Config
BackendTLSConfig tls.Config
}
type ETCDConfig struct {
@ -50,7 +52,7 @@ func Listen(ctx context.Context, config Config) (ETCDConfig, error) {
if driver == ETCDBackend {
return ETCDConfig{
Endpoints: strings.Split(config.Endpoint, ","),
TLSConfig: config.Config,
TLSConfig: config.BackendTLSConfig,
LeaderElect: true,
}, nil
}
@ -64,38 +66,108 @@ func Listen(ctx context.Context, config Config) (ETCDConfig, error) {
return ETCDConfig{}, errors.Wrap(err, "starting kine backend")
}
listen := config.Listener
if listen == "" {
listen = KineSocket
// set up GRPC server and register services
b := server.New(backend, endpointScheme(config))
grpcServer, err := grpcServer(config)
if err != nil {
return ETCDConfig{}, errors.Wrap(err, "creating GRPC server")
}
b := server.New(backend)
grpcServer := grpcServer(config)
b.Register(grpcServer)
listener, err := createListener(listen)
// set up HTTP server with basic mux
httpServer := httpServer()
// Create raw listener and wrap in cmux for protocol switching
listener, err := createListener(config)
if err != nil {
return ETCDConfig{}, err
return ETCDConfig{}, errors.Wrap(err, "creating listener")
}
m := cmux.New(listener)
if config.ServerTLSConfig.CertFile != "" && config.ServerTLSConfig.KeyFile != "" {
// If using TLS, wrap handler in GRPC/HTTP switching handler and serve TLS
httpServer.Handler = grpcHandlerFunc(grpcServer, httpServer.Handler)
anyl := m.Match(cmux.Any())
go func() {
if err := httpServer.ServeTLS(anyl, config.ServerTLSConfig.CertFile, config.ServerTLSConfig.KeyFile); err != nil {
logrus.Errorf("Kine TLS server shutdown: %v", err)
}
}()
} else {
// If using plaintext, use cmux matching for GRPC/HTTP switching
grpcl := m.Match(cmux.HTTP2())
go func() {
if err := grpcServer.Serve(grpcl); err != nil {
logrus.Errorf("Kine GRPC server shutdown: %v", err)
}
}()
httpl := m.Match(cmux.HTTP1())
go func() {
if err := httpServer.Serve(httpl); err != nil {
logrus.Errorf("Kine HTTP server shutdown: %v", err)
}
}()
}
go func() {
if err := grpcServer.Serve(listener); err != nil {
logrus.Errorf("Kine server shutdown: %v", err)
}
<-ctx.Done()
if err := m.Serve(); err != nil {
logrus.Errorf("Kine listener shutdown: %v", err)
grpcServer.Stop()
listener.Close()
}
}()
endpoint := endpointURL(config, listener)
logrus.Infof("Kine available at %s", endpoint)
return ETCDConfig{
LeaderElect: leaderelect,
Endpoints: []string{listen},
Endpoints: []string{endpoint},
TLSConfig: tls.Config{},
}, nil
}
func createListener(listen string) (ret net.Listener, rerr error) {
network, address := networkAndAddress(listen)
// endpointURL returns a URI string suitable for use as a local etcd endpoint.
// For TCP sockets, it is assumed that the port can be reached via the loopback address.
func endpointURL(config Config, listener net.Listener) string {
scheme := endpointScheme(config)
address := listener.Addr().String()
if !strings.HasPrefix(scheme, "unix") {
_, port, err := net.SplitHostPort(address)
if err != nil {
logrus.Warnf("failed to get listener port: %v", err)
port = "2379"
}
address = "127.0.0.1:" + port
}
return scheme + "://" + address
}
// endpointScheme returns the URI scheme for the listener specified by the configuration.
func endpointScheme(config Config) string {
if config.Listener == "" {
config.Listener = KineSocket
}
network, _ := networkAndAddress(config.Listener)
if network != "unix" {
network = "http"
}
if config.ServerTLSConfig.CertFile != "" && config.ServerTLSConfig.KeyFile != "" {
// yes, etcd supports the "unixs" scheme for TLS over unix sockets
network += "s"
}
return network
}
// createListener returns a listener bound to the requested protocol and address.
func createListener(config Config) (ret net.Listener, rerr error) {
if config.Listener == "" {
config.Listener = KineSocket
}
network, address := networkAndAddress(config.Listener)
if network == "unix" {
if err := os.Remove(address); err != nil && !os.IsNotExist(err) {
@ -106,16 +178,20 @@ func createListener(listen string) (ret net.Listener, rerr error) {
rerr = err
}
}()
} else {
network = "tcp"
}
logrus.Infof("Kine listening on %s://%s", network, address)
return net.Listen(network, address)
}
func grpcServer(config Config) *grpc.Server {
// grpcServer returns either a preconfigured GRPC server, or builds a new GRPC
// server using upstream keepalive defaults plus the local Server TLS configuration.
func grpcServer(config Config) (*grpc.Server, error) {
if config.GRPCServer != nil {
return config.GRPCServer
return config.GRPCServer, nil
}
gopts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: embed.DefaultGRPCKeepAliveMinTime,
@ -127,9 +203,20 @@ func grpcServer(config Config) *grpc.Server {
}),
}
return grpc.NewServer(gopts...)
if config.ServerTLSConfig.CertFile != "" && config.ServerTLSConfig.KeyFile != "" {
creds, err := credentials.NewServerTLSFromFile(config.ServerTLSConfig.CertFile, config.ServerTLSConfig.KeyFile)
if err != nil {
return nil, err
}
gopts = append(gopts, grpc.Creds(creds))
}
return grpc.NewServer(gopts...), nil
}
// getKineStorageBackend parses the driver string, and returns a bool
// indicating whether the backend requires leader election, and a suitable
// backend datastore connection.
func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config) (bool, server.Backend, error) {
var (
backend server.Backend
@ -143,9 +230,9 @@ func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config)
case DQLiteBackend:
backend, err = dqlite.New(ctx, dsn, cfg.ConnectionPoolConfig)
case PostgresBackend:
backend, err = pgsql.New(ctx, dsn, cfg.Config, cfg.ConnectionPoolConfig)
backend, err = pgsql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig)
case MySQLBackend:
backend, err = mysql.New(ctx, dsn, cfg.Config, cfg.ConnectionPoolConfig)
backend, err = mysql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig)
default:
return false, nil, fmt.Errorf("storage backend is not defined")
}
@ -153,6 +240,7 @@ func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config)
return leaderElect, backend, err
}
// ParseStorageEndpoint returns the driver name and endpoint string from a datastore endpoint URL.
func ParseStorageEndpoint(storageEndpoint string) (string, string) {
network, address := networkAndAddress(storageEndpoint)
switch network {
@ -166,6 +254,8 @@ func ParseStorageEndpoint(storageEndpoint string) (string, string) {
return network, address
}
// networkAndAddress crudely splits a URL string into network (scheme) and address,
// where the address includes everything after the scheme/authority separator.
func networkAndAddress(str string) (string, string) {
parts := strings.SplitN(str, "://", 2)
if len(parts) > 1 {

66
vendor/github.com/k3s-io/kine/pkg/endpoint/http.go generated vendored Normal file
View File

@ -0,0 +1,66 @@
package endpoint
import (
"log"
"net/http"
"strings"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
var (
etcdVersion = []byte(`{"etcdserver":"3.5.0","etcdcluster":"3.5.0"}`)
versionPath = "/version"
)
// httpServer returns a HTTP server with the basic mux handler.
func httpServer() *http.Server {
// Set up root HTTP mux with basic response handlers
mux := http.NewServeMux()
handleBasic(mux)
return &http.Server{
Handler: mux,
ErrorLog: log.New(logrus.StandardLogger().Writer(), "kinehttp ", log.LstdFlags),
}
}
// handleBasic binds basic HTTP response handlers to a mux.
func handleBasic(mux *http.ServeMux) {
mux.HandleFunc(versionPath, serveVersion)
}
// serveVersion responds with a canned JSON version response.
func serveVersion(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, http.MethodGet) {
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(etcdVersion)
}
// allowMethod returns true if a method is allowed, or false (after sending a
// MethodNotAllowed error to the client) if it is not.
func allowMethod(w http.ResponseWriter, r *http.Request, m string) bool {
if m == r.Method {
return true
}
w.Header().Set("Allow", m)
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return false
}
// grpcHandlerFunc takes a GRPC server and HTTP handler, and returns a handler
// function that will route GRPC requests to the GRPC server, and everything
// else to the HTTP handler. This is based on sample code provided in the GRPC
// ServeHTTP documentation for sharing a port between GRPC and HTTP handlers.
func grpcHandlerFunc(grpcServer *grpc.Server, httpHandler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.ProtoMajor == 2 && strings.HasPrefix(r.Header.Get("Content-Type"), "application/grpc") {
grpcServer.ServeHTTP(w, r)
} else {
httpHandler.ServeHTTP(w, r)
}
})
}

View File

@ -17,6 +17,7 @@ type Log interface {
Watch(ctx context.Context, prefix string) <-chan []*server.Event
Count(ctx context.Context, prefix string) (int64, int64, error)
Append(ctx context.Context, event *server.Event) (int64, error)
DbSize(ctx context.Context) (int64, error)
}
type LogStructured struct {
@ -367,3 +368,7 @@ func filter(events []*server.Event, rev int64) []*server.Event {
return events
}
func (l *LogStructured) DbSize(ctx context.Context) (int64, error) {
return l.log.DbSize(ctx)
}

View File

@ -7,7 +7,6 @@ import (
"time"
"github.com/k3s-io/kine/pkg/broadcaster"
"github.com/k3s-io/kine/pkg/drivers/generic"
"github.com/k3s-io/kine/pkg/server"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -22,13 +21,13 @@ const (
)
type SQLLog struct {
d Dialect
d server.Dialect
broadcaster broadcaster.Broadcaster
ctx context.Context
notify chan int64
}
func New(d Dialect) *SQLLog {
func New(d server.Dialect) *SQLLog {
l := &SQLLog{
d: d,
notify: make(chan int64, 1024),
@ -36,23 +35,6 @@ func New(d Dialect) *SQLLog {
return l
}
type Dialect interface {
ListCurrent(ctx context.Context, prefix string, limit int64, includeDeleted bool) (*sql.Rows, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error)
Count(ctx context.Context, prefix string) (int64, int64, error)
CurrentRevision(ctx context.Context) (int64, error)
After(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error)
Insert(ctx context.Context, key string, create, delete bool, createRevision, previousRevision int64, ttl int64, value, prevValue []byte) (int64, error)
GetRevision(ctx context.Context, revision int64) (*sql.Rows, error)
DeleteRevision(ctx context.Context, revision int64) error
GetCompactRevision(ctx context.Context) (int64, error)
SetCompactRevision(ctx context.Context, revision int64) error
Compact(ctx context.Context, revision int64) (int64, error)
Fill(ctx context.Context, revision int64) error
IsFill(key string) bool
BeginTx(ctx context.Context, opts *sql.TxOptions) (*generic.Tx, error)
}
func (s *SQLLog) Start(ctx context.Context) error {
s.ctx = ctx
return s.compactStart(s.ctx)
@ -568,3 +550,7 @@ func safeCompactRev(targetCompactRev int64, currentRev int64) int64 {
}
return safeRev
}
func (s *SQLLog) DbSize(ctx context.Context) (int64, error) {
return s.d.GetSize(ctx)
}

64
vendor/github.com/k3s-io/kine/pkg/server/cluster.go generated vendored Normal file
View File

@ -0,0 +1,64 @@
package server
import (
"context"
"fmt"
"strings"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"google.golang.org/grpc/metadata"
)
// explicit interface check
var _ etcdserverpb.ClusterServer = (*KVServerBridge)(nil)
func (s *KVServerBridge) MemberAdd(context.Context, *etcdserverpb.MemberAddRequest) (*etcdserverpb.MemberAddResponse, error) {
return nil, fmt.Errorf("member add is not supported")
}
func (s *KVServerBridge) MemberRemove(context.Context, *etcdserverpb.MemberRemoveRequest) (*etcdserverpb.MemberRemoveResponse, error) {
return nil, fmt.Errorf("member remove is not supported")
}
func (s *KVServerBridge) MemberUpdate(context.Context, *etcdserverpb.MemberUpdateRequest) (*etcdserverpb.MemberUpdateResponse, error) {
return nil, fmt.Errorf("member update is not supported")
}
func (s *KVServerBridge) MemberList(ctx context.Context, r *etcdserverpb.MemberListRequest) (*etcdserverpb.MemberListResponse, error) {
listenURL := authorityURL(ctx, s.limited.scheme)
return &etcdserverpb.MemberListResponse{
Header: &etcdserverpb.ResponseHeader{},
Members: []*etcdserverpb.Member{
{
Name: "kine",
ClientURLs: []string{listenURL},
PeerURLs: []string{listenURL},
},
},
}, nil
}
func (s *KVServerBridge) MemberPromote(context.Context, *etcdserverpb.MemberPromoteRequest) (*etcdserverpb.MemberPromoteResponse, error) {
return nil, fmt.Errorf("member promote is not supported")
}
// authorityURL returns the URL of the authority (host) that the client connected to.
// If no scheme is included in the authority data, the provided scheme is used. If no
// authority data is provided, the default etcd endpoint is used.
func authorityURL(ctx context.Context, scheme string) string {
authority := "127.0.0.1:2379"
if md, ok := metadata.FromIncomingContext(ctx); ok {
authList := md.Get(":authority")
if len(authList) > 0 {
authority = authList[0]
// etcd v3.5 encodes the endpoint address list as "#initially=[ADDRESS1;ADDRESS2]"
if strings.HasPrefix(authority, "#initially=[") {
authority = strings.TrimPrefix(authority, "#initially=[")
authority = strings.TrimSuffix(authority, "]")
authority = strings.ReplaceAll(authority, ";", ",")
return authority
}
}
}
return scheme + "://" + authority
}

7
vendor/github.com/k3s-io/kine/pkg/server/dbsize.go generated vendored Normal file
View File

@ -0,0 +1,7 @@
package server
import "context"
func (l *LimitedServer) dbSize(ctx context.Context) (int64, error) {
return l.backend.DbSize(ctx)
}

126
vendor/github.com/k3s-io/kine/pkg/server/kv.go generated vendored Normal file
View File

@ -0,0 +1,126 @@
package server
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
)
// explicit interface check
var _ etcdserverpb.KVServer = (*KVServerBridge)(nil)
func (k *KVServerBridge) Range(ctx context.Context, r *etcdserverpb.RangeRequest) (*etcdserverpb.RangeResponse, error) {
if r.KeysOnly {
return nil, unsupported("keysOnly")
}
if r.MaxCreateRevision != 0 {
return nil, unsupported("maxCreateRevision")
}
if r.SortOrder != 0 {
return nil, unsupported("sortOrder")
}
if r.SortTarget != 0 {
return nil, unsupported("sortTarget")
}
if r.Serializable {
return nil, unsupported("serializable")
}
if r.KeysOnly {
return nil, unsupported("keysOnly")
}
if r.MinModRevision != 0 {
return nil, unsupported("minModRevision")
}
if r.MinCreateRevision != 0 {
return nil, unsupported("minCreateRevision")
}
if r.MaxCreateRevision != 0 {
return nil, unsupported("maxCreateRevision")
}
if r.MaxModRevision != 0 {
return nil, unsupported("maxModRevision")
}
resp, err := k.limited.Range(ctx, r)
if err != nil {
logrus.Errorf("error while range on %s %s: %v", r.Key, r.RangeEnd, err)
return nil, err
}
rangeResponse := &etcdserverpb.RangeResponse{
More: resp.More,
Count: resp.Count,
Header: resp.Header,
Kvs: toKVs(resp.Kvs...),
}
return rangeResponse, nil
}
func toKVs(kvs ...*KeyValue) []*mvccpb.KeyValue {
if len(kvs) == 0 || kvs[0] == nil {
return nil
}
ret := make([]*mvccpb.KeyValue, 0, len(kvs))
for _, kv := range kvs {
newKV := toKV(kv)
if newKV != nil {
ret = append(ret, newKV)
}
}
return ret
}
func toKV(kv *KeyValue) *mvccpb.KeyValue {
if kv == nil {
return nil
}
return &mvccpb.KeyValue{
Key: []byte(kv.Key),
Value: kv.Value,
Lease: kv.Lease,
CreateRevision: kv.CreateRevision,
ModRevision: kv.ModRevision,
}
}
func (k *KVServerBridge) Put(ctx context.Context, r *etcdserverpb.PutRequest) (*etcdserverpb.PutResponse, error) {
return nil, fmt.Errorf("put is not supported")
}
func (k *KVServerBridge) DeleteRange(ctx context.Context, r *etcdserverpb.DeleteRangeRequest) (*etcdserverpb.DeleteRangeResponse, error) {
return nil, fmt.Errorf("delete is not supported")
}
func (k *KVServerBridge) Txn(ctx context.Context, r *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) {
res, err := k.limited.Txn(ctx, r)
if err != nil {
logrus.Errorf("error in txn: %v", err)
}
return res, err
}
func (k *KVServerBridge) Compact(ctx context.Context, r *etcdserverpb.CompactionRequest) (*etcdserverpb.CompactionResponse, error) {
return &etcdserverpb.CompactionResponse{
Header: &etcdserverpb.ResponseHeader{
Revision: r.Revision,
},
}, nil
}
func unsupported(field string) error {
return fmt.Errorf("%s is unsupported", field)
}

View File

@ -7,6 +7,9 @@ import (
"go.etcd.io/etcd/api/v3/etcdserverpb"
)
// explicit interface check
var _ etcdserverpb.LeaseServer = (*KVServerBridge)(nil)
func (s *KVServerBridge) LeaseGrant(ctx context.Context, req *etcdserverpb.LeaseGrantRequest) (*etcdserverpb.LeaseGrantResponse, error) {
return &etcdserverpb.LeaseGrantResponse{
Header: &etcdserverpb.ResponseHeader{},

View File

@ -9,6 +9,7 @@ import (
type LimitedServer struct {
backend Backend
scheme string
}
func (l *LimitedServer) Range(ctx context.Context, r *etcdserverpb.RangeRequest) (*RangeResponse, error) {

View File

@ -0,0 +1,50 @@
package server
import (
"context"
"fmt"
"go.etcd.io/etcd/api/v3/etcdserverpb"
)
// explicit interface check
var _ etcdserverpb.MaintenanceServer = (*KVServerBridge)(nil)
func (s *KVServerBridge) Alarm(context.Context, *etcdserverpb.AlarmRequest) (*etcdserverpb.AlarmResponse, error) {
return nil, fmt.Errorf("alarm is not supported")
}
func (s *KVServerBridge) Status(ctx context.Context, r *etcdserverpb.StatusRequest) (*etcdserverpb.StatusResponse, error) {
size, err := s.limited.dbSize(ctx)
if err != nil {
return nil, err
}
return &etcdserverpb.StatusResponse{
Header: &etcdserverpb.ResponseHeader{},
DbSize: size,
}, nil
}
func (s *KVServerBridge) Defragment(context.Context, *etcdserverpb.DefragmentRequest) (*etcdserverpb.DefragmentResponse, error) {
return nil, fmt.Errorf("defragment is not supported")
}
func (s *KVServerBridge) Hash(context.Context, *etcdserverpb.HashRequest) (*etcdserverpb.HashResponse, error) {
return nil, fmt.Errorf("hash is not supported")
}
func (s *KVServerBridge) HashKV(context.Context, *etcdserverpb.HashKVRequest) (*etcdserverpb.HashKVResponse, error) {
return nil, fmt.Errorf("hash kv is not supported")
}
func (s *KVServerBridge) Snapshot(*etcdserverpb.SnapshotRequest, etcdserverpb.Maintenance_SnapshotServer) error {
return fmt.Errorf("snapshot is not supported")
}
func (s *KVServerBridge) MoveLeader(context.Context, *etcdserverpb.MoveLeaderRequest) (*etcdserverpb.MoveLeaderResponse, error) {
return nil, fmt.Errorf("move leader is not supported")
}
func (s *KVServerBridge) Downgrade(context.Context, *etcdserverpb.DowngradeRequest) (*etcdserverpb.DowngradeResponse, error) {
return nil, fmt.Errorf("downgrade is not supported")
}

View File

@ -1,30 +1,21 @@
package server
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
var (
_ etcdserverpb.KVServer = (*KVServerBridge)(nil)
_ etcdserverpb.WatchServer = (*KVServerBridge)(nil)
)
type KVServerBridge struct {
limited *LimitedServer
}
func New(backend Backend) *KVServerBridge {
func New(backend Backend, scheme string) *KVServerBridge {
return &KVServerBridge{
limited: &LimitedServer{
backend: backend,
scheme: scheme,
},
}
}
@ -33,121 +24,10 @@ func (k *KVServerBridge) Register(server *grpc.Server) {
etcdserverpb.RegisterLeaseServer(server, k)
etcdserverpb.RegisterWatchServer(server, k)
etcdserverpb.RegisterKVServer(server, k)
etcdserverpb.RegisterClusterServer(server, k)
etcdserverpb.RegisterMaintenanceServer(server, k)
hsrv := health.NewServer()
hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(server, hsrv)
}
func (k *KVServerBridge) Range(ctx context.Context, r *etcdserverpb.RangeRequest) (*etcdserverpb.RangeResponse, error) {
if r.KeysOnly {
return nil, unsupported("keysOnly")
}
if r.MaxCreateRevision != 0 {
return nil, unsupported("maxCreateRevision")
}
if r.SortOrder != 0 {
return nil, unsupported("sortOrder")
}
if r.SortTarget != 0 {
return nil, unsupported("sortTarget")
}
if r.Serializable {
return nil, unsupported("serializable")
}
if r.KeysOnly {
return nil, unsupported("keysOnly")
}
if r.MinModRevision != 0 {
return nil, unsupported("minModRevision")
}
if r.MinCreateRevision != 0 {
return nil, unsupported("minCreateRevision")
}
if r.MaxCreateRevision != 0 {
return nil, unsupported("maxCreateRevision")
}
if r.MaxModRevision != 0 {
return nil, unsupported("maxModRevision")
}
resp, err := k.limited.Range(ctx, r)
if err != nil {
logrus.Errorf("error while range on %s %s: %v", r.Key, r.RangeEnd, err)
return nil, err
}
rangeResponse := &etcdserverpb.RangeResponse{
More: resp.More,
Count: resp.Count,
Header: resp.Header,
Kvs: toKVs(resp.Kvs...),
}
return rangeResponse, nil
}
func toKVs(kvs ...*KeyValue) []*mvccpb.KeyValue {
if len(kvs) == 0 || kvs[0] == nil {
return nil
}
ret := make([]*mvccpb.KeyValue, 0, len(kvs))
for _, kv := range kvs {
newKV := toKV(kv)
if newKV != nil {
ret = append(ret, newKV)
}
}
return ret
}
func toKV(kv *KeyValue) *mvccpb.KeyValue {
if kv == nil {
return nil
}
return &mvccpb.KeyValue{
Key: []byte(kv.Key),
Value: kv.Value,
Lease: kv.Lease,
CreateRevision: kv.CreateRevision,
ModRevision: kv.ModRevision,
}
}
func (k *KVServerBridge) Put(ctx context.Context, r *etcdserverpb.PutRequest) (*etcdserverpb.PutResponse, error) {
return nil, fmt.Errorf("put is not supported")
}
func (k *KVServerBridge) DeleteRange(ctx context.Context, r *etcdserverpb.DeleteRangeRequest) (*etcdserverpb.DeleteRangeResponse, error) {
return nil, fmt.Errorf("delete is not supported")
}
func (k *KVServerBridge) Txn(ctx context.Context, r *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) {
res, err := k.limited.Txn(ctx, r)
if err != nil {
logrus.Errorf("error in txn: %v", err)
}
return res, err
}
func (k *KVServerBridge) Compact(ctx context.Context, r *etcdserverpb.CompactionRequest) (*etcdserverpb.CompactionResponse, error) {
return &etcdserverpb.CompactionResponse{
Header: &etcdserverpb.ResponseHeader{
Revision: r.Revision,
},
}, nil
}
func unsupported(field string) error {
return fmt.Errorf("%s is unsupported", field)
}

View File

@ -2,6 +2,7 @@ package server
import (
"context"
"database/sql"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)
@ -20,6 +21,38 @@ type Backend interface {
Count(ctx context.Context, prefix string) (int64, int64, error)
Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, *KeyValue, bool, error)
Watch(ctx context.Context, key string, revision int64) <-chan []*Event
DbSize(ctx context.Context) (int64, error)
}
type Dialect interface {
ListCurrent(ctx context.Context, prefix string, limit int64, includeDeleted bool) (*sql.Rows, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error)
Count(ctx context.Context, prefix string) (int64, int64, error)
CurrentRevision(ctx context.Context) (int64, error)
After(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error)
Insert(ctx context.Context, key string, create, delete bool, createRevision, previousRevision int64, ttl int64, value, prevValue []byte) (int64, error)
GetRevision(ctx context.Context, revision int64) (*sql.Rows, error)
DeleteRevision(ctx context.Context, revision int64) error
GetCompactRevision(ctx context.Context) (int64, error)
SetCompactRevision(ctx context.Context, revision int64) error
Compact(ctx context.Context, revision int64) (int64, error)
Fill(ctx context.Context, revision int64) error
IsFill(key string) bool
BeginTx(ctx context.Context, opts *sql.TxOptions) (Transaction, error)
GetSize(ctx context.Context) (int64, error)
}
type Transaction interface {
Commit() error
MustCommit()
Rollback() error
MustRollback()
GetCompactRevision(ctx context.Context) (int64, error)
SetCompactRevision(ctx context.Context, revision int64) error
Compact(ctx context.Context, revision int64) (int64, error)
GetRevision(ctx context.Context, revision int64) (*sql.Rows, error)
DeleteRevision(ctx context.Context, revision int64) error
CurrentRevision(ctx context.Context) (int64, error)
}
type KeyValue struct {

View File

@ -10,9 +10,10 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb"
)
var (
watchID int64
)
var watchID int64
// explicit interface check
var _ etcdserverpb.WatchServer = (*KVServerBridge)(nil)
func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error {
w := watcher{

2
vendor/modules.txt vendored
View File

@ -712,7 +712,7 @@ github.com/k3s-io/helm-controller/pkg/apis/helm.cattle.io/v1
github.com/k3s-io/helm-controller/pkg/generated/controllers/helm.cattle.io
github.com/k3s-io/helm-controller/pkg/generated/controllers/helm.cattle.io/v1
github.com/k3s-io/helm-controller/pkg/helm
# github.com/k3s-io/kine v0.7.3
# github.com/k3s-io/kine v0.8.0
## explicit
github.com/k3s-io/kine/pkg/broadcaster
github.com/k3s-io/kine/pkg/client