diff --git a/go.mod b/go.mod index 319dab61a2..ac665970ce 100644 --- a/go.mod +++ b/go.mod @@ -90,7 +90,7 @@ require ( github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.4.2 github.com/k3s-io/helm-controller v0.11.3 - github.com/k3s-io/kine v0.7.3 + github.com/k3s-io/kine v0.8.0 github.com/klauspost/compress v1.12.2 github.com/kubernetes-sigs/cri-tools v0.0.0-00010101000000-000000000000 github.com/lib/pq v1.10.2 diff --git a/go.sum b/go.sum index c60fcb2e72..7701ee732c 100644 --- a/go.sum +++ b/go.sum @@ -562,8 +562,8 @@ github.com/k3s-io/etcd/server/v3 v3.5.0-k3s1 h1:be2d2LS1w+OxyHE/PF6tmGVzF72+7L2F github.com/k3s-io/etcd/server/v3 v3.5.0-k3s1/go.mod h1:3Ah5ruV+M+7RZr0+Y/5mNLwC+eQlni+mQmOVdCRJoS4= github.com/k3s-io/helm-controller v0.11.3 h1:DSPAOCGHxF5pmF4vzQP5AgPT3tiGRJRZu+08hwWtbGI= github.com/k3s-io/helm-controller v0.11.3/go.mod h1:z0ExsRRIkTO/QC//3/Esn5ItTD6AiQSluwzMaS7RI/4= -github.com/k3s-io/kine v0.7.3 h1:YHj4DQmTGuF4GyriKyFx5gH9ywchCadHhl3Llj5oyuw= -github.com/k3s-io/kine v0.7.3/go.mod h1:tksdHCn3Vof8ZKHd+QJkLRi4LbyQglKBrokaBBrHIcY= +github.com/k3s-io/kine v0.8.0 h1:k6T9bI9DID7lIbktukXxg1QfeFoAQK4EIvAHoyPAe08= +github.com/k3s-io/kine v0.8.0/go.mod h1:gaezUQ9c8iw8vxDV/DI8vc93h2rCpTvY37kMdYPMsyc= github.com/k3s-io/kubernetes v1.22.1-k3s1 h1:evi+JkMnC0HDEJ8A7aLqPKZ+0cP/CK6udFquf7KVq1I= github.com/k3s-io/kubernetes v1.22.1-k3s1/go.mod h1:IGQZrV02n2IBp52+/YwLVMurCEQPKXJ/k8hU3mqEOuA= github.com/k3s-io/kubernetes/staging/src/k8s.io/api v1.22.1-k3s1 h1:BlTOjlJwCV9ZNgTgyByLJXn7emlDAQC/duN14pUtWLk= diff --git a/pkg/cli/server/server.go b/pkg/cli/server/server.go index f3fc780eef..8beb533b06 100644 --- a/pkg/cli/server/server.go +++ b/pkg/cli/server/server.go @@ -111,9 +111,9 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont serverConfig.ControlConfig.ExtraSchedulerAPIArgs = cfg.ExtraSchedulerArgs serverConfig.ControlConfig.ClusterDomain = cfg.ClusterDomain serverConfig.ControlConfig.Datastore.Endpoint = cfg.DatastoreEndpoint - serverConfig.ControlConfig.Datastore.CAFile = cfg.DatastoreCAFile - serverConfig.ControlConfig.Datastore.CertFile = cfg.DatastoreCertFile - serverConfig.ControlConfig.Datastore.KeyFile = cfg.DatastoreKeyFile + serverConfig.ControlConfig.Datastore.BackendTLSConfig.CAFile = cfg.DatastoreCAFile + serverConfig.ControlConfig.Datastore.BackendTLSConfig.CertFile = cfg.DatastoreCertFile + serverConfig.ControlConfig.Datastore.BackendTLSConfig.KeyFile = cfg.DatastoreKeyFile serverConfig.ControlConfig.AdvertiseIP = cfg.AdvertiseIP serverConfig.ControlConfig.AdvertisePort = cfg.AdvertisePort serverConfig.ControlConfig.FlannelBackend = cfg.FlannelBackend diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 58a3f372e7..8708668c2d 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -147,7 +147,7 @@ func (c *Cluster) startStorage(ctx context.Context) error { // based on what the kine wrapper tells us about the datastore. Single-node datastores like sqlite don't require // leader election, while basically all others (etcd, external database, etc) do since they allow multiple servers. c.etcdConfig = etcdConfig - c.config.Datastore.Config = etcdConfig.TLSConfig + c.config.Datastore.BackendTLSConfig = etcdConfig.TLSConfig c.config.Datastore.Endpoint = strings.Join(etcdConfig.Endpoints, ",") c.config.NoLeaderElect = !etcdConfig.LeaderElect return nil diff --git a/pkg/daemons/control/server.go b/pkg/daemons/control/server.go index 80838cc39a..8e8db3c634 100644 --- a/pkg/daemons/control/server.go +++ b/pkg/daemons/control/server.go @@ -272,14 +272,14 @@ func setupStorageBackend(argsMap map[string]string, cfg *config.Control) { argsMap["etcd-servers"] = cfg.Datastore.Endpoint } // storage backend tls configuration - if len(cfg.Datastore.CAFile) > 0 { - argsMap["etcd-cafile"] = cfg.Datastore.CAFile + if len(cfg.Datastore.BackendTLSConfig.CAFile) > 0 { + argsMap["etcd-cafile"] = cfg.Datastore.BackendTLSConfig.CAFile } - if len(cfg.Datastore.CertFile) > 0 { - argsMap["etcd-certfile"] = cfg.Datastore.CertFile + if len(cfg.Datastore.BackendTLSConfig.CertFile) > 0 { + argsMap["etcd-certfile"] = cfg.Datastore.BackendTLSConfig.CertFile } - if len(cfg.Datastore.KeyFile) > 0 { - argsMap["etcd-keyfile"] = cfg.Datastore.KeyFile + if len(cfg.Datastore.BackendTLSConfig.KeyFile) > 0 { + argsMap["etcd-keyfile"] = cfg.Datastore.BackendTLSConfig.KeyFile } } diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index f1084232dc..50abed7beb 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -363,9 +363,9 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt } e.address = address e.config.Datastore.Endpoint = endpoint - e.config.Datastore.Config.CAFile = e.runtime.ETCDServerCA - e.config.Datastore.Config.CertFile = e.runtime.ClientETCDCert - e.config.Datastore.Config.KeyFile = e.runtime.ClientETCDKey + e.config.Datastore.BackendTLSConfig.CAFile = e.runtime.ETCDServerCA + e.config.Datastore.BackendTLSConfig.CertFile = e.runtime.ClientETCDCert + e.config.Datastore.BackendTLSConfig.KeyFile = e.runtime.ClientETCDKey if err := e.setName(false); err != nil { return nil, err diff --git a/vendor/github.com/google/uuid/null.go b/vendor/github.com/google/uuid/null.go new file mode 100644 index 0000000000..d7fcbf2865 --- /dev/null +++ b/vendor/github.com/google/uuid/null.go @@ -0,0 +1,118 @@ +// Copyright 2021 Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package uuid + +import ( + "bytes" + "database/sql/driver" + "encoding/json" + "fmt" +) + +var jsonNull = []byte("null") + +// NullUUID represents a UUID that may be null. +// NullUUID implements the SQL driver.Scanner interface so +// it can be used as a scan destination: +// +// var u uuid.NullUUID +// err := db.QueryRow("SELECT name FROM foo WHERE id=?", id).Scan(&u) +// ... +// if u.Valid { +// // use u.UUID +// } else { +// // NULL value +// } +// +type NullUUID struct { + UUID UUID + Valid bool // Valid is true if UUID is not NULL +} + +// Scan implements the SQL driver.Scanner interface. +func (nu *NullUUID) Scan(value interface{}) error { + if value == nil { + nu.UUID, nu.Valid = Nil, false + return nil + } + + err := nu.UUID.Scan(value) + if err != nil { + nu.Valid = false + return err + } + + nu.Valid = true + return nil +} + +// Value implements the driver Valuer interface. +func (nu NullUUID) Value() (driver.Value, error) { + if !nu.Valid { + return nil, nil + } + // Delegate to UUID Value function + return nu.UUID.Value() +} + +// MarshalBinary implements encoding.BinaryMarshaler. +func (nu NullUUID) MarshalBinary() ([]byte, error) { + if nu.Valid { + return nu.UUID[:], nil + } + + return []byte(nil), nil +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler. +func (nu *NullUUID) UnmarshalBinary(data []byte) error { + if len(data) != 16 { + return fmt.Errorf("invalid UUID (got %d bytes)", len(data)) + } + copy(nu.UUID[:], data) + nu.Valid = true + return nil +} + +// MarshalText implements encoding.TextMarshaler. +func (nu NullUUID) MarshalText() ([]byte, error) { + if nu.Valid { + return nu.UUID.MarshalText() + } + + return jsonNull, nil +} + +// UnmarshalText implements encoding.TextUnmarshaler. +func (nu *NullUUID) UnmarshalText(data []byte) error { + id, err := ParseBytes(data) + if err != nil { + nu.Valid = false + return err + } + nu.UUID = id + nu.Valid = true + return nil +} + +// MarshalJSON implements json.Marshaler. +func (nu NullUUID) MarshalJSON() ([]byte, error) { + if nu.Valid { + return json.Marshal(nu.UUID) + } + + return jsonNull, nil +} + +// UnmarshalJSON implements json.Unmarshaler. +func (nu *NullUUID) UnmarshalJSON(data []byte) error { + if bytes.Equal(data, jsonNull) { + *nu = NullUUID{} + return nil // valid null UUID + } + err := json.Unmarshal(data, &nu.UUID) + nu.Valid = err == nil + return err +} diff --git a/vendor/github.com/k3s-io/kine/pkg/drivers/generic/generic.go b/vendor/github.com/k3s-io/kine/pkg/drivers/generic/generic.go index c76db0e8e8..e7f7efe056 100644 --- a/vendor/github.com/k3s-io/kine/pkg/drivers/generic/generic.go +++ b/vendor/github.com/k3s-io/kine/pkg/drivers/generic/generic.go @@ -3,6 +3,7 @@ package generic import ( "context" "database/sql" + "errors" "fmt" "regexp" "strconv" @@ -12,6 +13,7 @@ import ( "github.com/Rican7/retry/backoff" "github.com/Rican7/retry/strategy" + "github.com/k3s-io/kine/pkg/server" "github.com/sirupsen/logrus" ) @@ -19,6 +21,9 @@ const ( defaultMaxIdleConns = 2 // copied from database/sql ) +// explicit interface check +var _ server.Dialect = (*Generic)(nil) + var ( columns = "kv.id AS theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value" revSQL = ` @@ -92,6 +97,7 @@ type Generic struct { InsertSQL string FillSQL string InsertLastInsertIDSQL string + GetSizeSQL string Retry ErrRetry TranslateErr TranslateErr } @@ -393,3 +399,15 @@ func (d *Generic) Insert(ctx context.Context, key string, create, delete bool, c err = row.Scan(&id) return id, err } + +func (d *Generic) GetSize(ctx context.Context) (int64, error) { + if d.GetSizeSQL == "" { + return 0, errors.New("driver does not support size reporting") + } + var size int64 + row := d.queryRow(ctx, d.GetSizeSQL) + if err := row.Scan(&size); err != nil { + return 0, err + } + return size, nil +} diff --git a/vendor/github.com/k3s-io/kine/pkg/drivers/generic/tx.go b/vendor/github.com/k3s-io/kine/pkg/drivers/generic/tx.go index 166af372a7..092e8107d9 100644 --- a/vendor/github.com/k3s-io/kine/pkg/drivers/generic/tx.go +++ b/vendor/github.com/k3s-io/kine/pkg/drivers/generic/tx.go @@ -4,15 +4,19 @@ import ( "context" "database/sql" + "github.com/k3s-io/kine/pkg/server" "github.com/sirupsen/logrus" ) +// explicit interface check +var _ server.Transaction = (*Tx)(nil) + type Tx struct { x *sql.Tx d *Generic } -func (d *Generic) BeginTx(ctx context.Context, opts *sql.TxOptions) (*Tx, error) { +func (d *Generic) BeginTx(ctx context.Context, opts *sql.TxOptions) (server.Transaction, error) { logrus.Tracef("TX BEGIN") x, err := d.DB.BeginTx(ctx, opts) if err != nil { diff --git a/vendor/github.com/k3s-io/kine/pkg/drivers/mysql/mysql.go b/vendor/github.com/k3s-io/kine/pkg/drivers/mysql/mysql.go index 9a40c33e4a..0e7c1629a6 100644 --- a/vendor/github.com/k3s-io/kine/pkg/drivers/mysql/mysql.go +++ b/vendor/github.com/k3s-io/kine/pkg/drivers/mysql/mysql.go @@ -68,6 +68,10 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo } dialect.LastInsertID = true + dialect.GetSizeSQL = ` + SELECT SUM(data_length + index_length) + FROM information_schema.TABLES + WHERE table_schema = DATABASE() AND table_name = 'kine'` dialect.CompactSQL = ` DELETE kv FROM kine AS kv INNER JOIN ( diff --git a/vendor/github.com/k3s-io/kine/pkg/drivers/pgsql/pgsql.go b/vendor/github.com/k3s-io/kine/pkg/drivers/pgsql/pgsql.go index 8d2669f0dd..328b800f18 100644 --- a/vendor/github.com/k3s-io/kine/pkg/drivers/pgsql/pgsql.go +++ b/vendor/github.com/k3s-io/kine/pkg/drivers/pgsql/pgsql.go @@ -58,6 +58,7 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo if err != nil { return nil, err } + dialect.GetSizeSQL = `SELECT pg_total_relation_size('kine')` dialect.CompactSQL = ` DELETE FROM kine AS kv USING ( diff --git a/vendor/github.com/k3s-io/kine/pkg/drivers/sqlite/sqlite.go b/vendor/github.com/k3s-io/kine/pkg/drivers/sqlite/sqlite.go index 86d1086167..6061e44f7b 100644 --- a/vendor/github.com/k3s-io/kine/pkg/drivers/sqlite/sqlite.go +++ b/vendor/github.com/k3s-io/kine/pkg/drivers/sqlite/sqlite.go @@ -61,6 +61,7 @@ func NewVariant(ctx context.Context, driverName, dataSourceName string, connPool } dialect.LastInsertID = true + dialect.GetSizeSQL = `SELECT SUM(pgsize) FROM dbstat` dialect.CompactSQL = ` DELETE FROM kine AS kv WHERE diff --git a/vendor/github.com/k3s-io/kine/pkg/endpoint/endpoint.go b/vendor/github.com/k3s-io/kine/pkg/endpoint/endpoint.go index e135729cec..f9af22da6c 100644 --- a/vendor/github.com/k3s-io/kine/pkg/endpoint/endpoint.go +++ b/vendor/github.com/k3s-io/kine/pkg/endpoint/endpoint.go @@ -16,8 +16,10 @@ import ( "github.com/k3s-io/kine/pkg/tls" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/soheilhy/cmux" "go.etcd.io/etcd/server/v3/embed" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" ) @@ -35,8 +37,8 @@ type Config struct { Listener string Endpoint string ConnectionPoolConfig generic.ConnectionPoolConfig - - tls.Config + ServerTLSConfig tls.Config + BackendTLSConfig tls.Config } type ETCDConfig struct { @@ -50,7 +52,7 @@ func Listen(ctx context.Context, config Config) (ETCDConfig, error) { if driver == ETCDBackend { return ETCDConfig{ Endpoints: strings.Split(config.Endpoint, ","), - TLSConfig: config.Config, + TLSConfig: config.BackendTLSConfig, LeaderElect: true, }, nil } @@ -64,38 +66,108 @@ func Listen(ctx context.Context, config Config) (ETCDConfig, error) { return ETCDConfig{}, errors.Wrap(err, "starting kine backend") } - listen := config.Listener - if listen == "" { - listen = KineSocket + // set up GRPC server and register services + b := server.New(backend, endpointScheme(config)) + grpcServer, err := grpcServer(config) + if err != nil { + return ETCDConfig{}, errors.Wrap(err, "creating GRPC server") } - - b := server.New(backend) - grpcServer := grpcServer(config) b.Register(grpcServer) - listener, err := createListener(listen) + // set up HTTP server with basic mux + httpServer := httpServer() + + // Create raw listener and wrap in cmux for protocol switching + listener, err := createListener(config) if err != nil { - return ETCDConfig{}, err + return ETCDConfig{}, errors.Wrap(err, "creating listener") + } + m := cmux.New(listener) + + if config.ServerTLSConfig.CertFile != "" && config.ServerTLSConfig.KeyFile != "" { + // If using TLS, wrap handler in GRPC/HTTP switching handler and serve TLS + httpServer.Handler = grpcHandlerFunc(grpcServer, httpServer.Handler) + anyl := m.Match(cmux.Any()) + go func() { + if err := httpServer.ServeTLS(anyl, config.ServerTLSConfig.CertFile, config.ServerTLSConfig.KeyFile); err != nil { + logrus.Errorf("Kine TLS server shutdown: %v", err) + } + }() + } else { + // If using plaintext, use cmux matching for GRPC/HTTP switching + grpcl := m.Match(cmux.HTTP2()) + go func() { + if err := grpcServer.Serve(grpcl); err != nil { + logrus.Errorf("Kine GRPC server shutdown: %v", err) + } + }() + httpl := m.Match(cmux.HTTP1()) + go func() { + if err := httpServer.Serve(httpl); err != nil { + logrus.Errorf("Kine HTTP server shutdown: %v", err) + } + }() } go func() { - if err := grpcServer.Serve(listener); err != nil { - logrus.Errorf("Kine server shutdown: %v", err) + if err := m.Serve(); err != nil { + logrus.Errorf("Kine listener shutdown: %v", err) + grpcServer.Stop() } - <-ctx.Done() - grpcServer.Stop() - listener.Close() }() + endpoint := endpointURL(config, listener) + logrus.Infof("Kine available at %s", endpoint) + return ETCDConfig{ LeaderElect: leaderelect, - Endpoints: []string{listen}, + Endpoints: []string{endpoint}, TLSConfig: tls.Config{}, }, nil } -func createListener(listen string) (ret net.Listener, rerr error) { - network, address := networkAndAddress(listen) +// endpointURL returns a URI string suitable for use as a local etcd endpoint. +// For TCP sockets, it is assumed that the port can be reached via the loopback address. +func endpointURL(config Config, listener net.Listener) string { + scheme := endpointScheme(config) + address := listener.Addr().String() + if !strings.HasPrefix(scheme, "unix") { + _, port, err := net.SplitHostPort(address) + if err != nil { + logrus.Warnf("failed to get listener port: %v", err) + port = "2379" + } + address = "127.0.0.1:" + port + } + + return scheme + "://" + address +} + +// endpointScheme returns the URI scheme for the listener specified by the configuration. +func endpointScheme(config Config) string { + if config.Listener == "" { + config.Listener = KineSocket + } + + network, _ := networkAndAddress(config.Listener) + if network != "unix" { + network = "http" + } + + if config.ServerTLSConfig.CertFile != "" && config.ServerTLSConfig.KeyFile != "" { + // yes, etcd supports the "unixs" scheme for TLS over unix sockets + network += "s" + } + + return network +} + +// createListener returns a listener bound to the requested protocol and address. +func createListener(config Config) (ret net.Listener, rerr error) { + if config.Listener == "" { + config.Listener = KineSocket + } + network, address := networkAndAddress(config.Listener) if network == "unix" { if err := os.Remove(address); err != nil && !os.IsNotExist(err) { @@ -106,16 +178,20 @@ func createListener(listen string) (ret net.Listener, rerr error) { rerr = err } }() + } else { + network = "tcp" } - logrus.Infof("Kine listening on %s://%s", network, address) return net.Listen(network, address) } -func grpcServer(config Config) *grpc.Server { +// grpcServer returns either a preconfigured GRPC server, or builds a new GRPC +// server using upstream keepalive defaults plus the local Server TLS configuration. +func grpcServer(config Config) (*grpc.Server, error) { if config.GRPCServer != nil { - return config.GRPCServer + return config.GRPCServer, nil } + gopts := []grpc.ServerOption{ grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: embed.DefaultGRPCKeepAliveMinTime, @@ -127,9 +203,20 @@ func grpcServer(config Config) *grpc.Server { }), } - return grpc.NewServer(gopts...) + if config.ServerTLSConfig.CertFile != "" && config.ServerTLSConfig.KeyFile != "" { + creds, err := credentials.NewServerTLSFromFile(config.ServerTLSConfig.CertFile, config.ServerTLSConfig.KeyFile) + if err != nil { + return nil, err + } + gopts = append(gopts, grpc.Creds(creds)) + } + + return grpc.NewServer(gopts...), nil } +// getKineStorageBackend parses the driver string, and returns a bool +// indicating whether the backend requires leader election, and a suitable +// backend datastore connection. func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config) (bool, server.Backend, error) { var ( backend server.Backend @@ -143,9 +230,9 @@ func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config) case DQLiteBackend: backend, err = dqlite.New(ctx, dsn, cfg.ConnectionPoolConfig) case PostgresBackend: - backend, err = pgsql.New(ctx, dsn, cfg.Config, cfg.ConnectionPoolConfig) + backend, err = pgsql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig) case MySQLBackend: - backend, err = mysql.New(ctx, dsn, cfg.Config, cfg.ConnectionPoolConfig) + backend, err = mysql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig) default: return false, nil, fmt.Errorf("storage backend is not defined") } @@ -153,6 +240,7 @@ func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config) return leaderElect, backend, err } +// ParseStorageEndpoint returns the driver name and endpoint string from a datastore endpoint URL. func ParseStorageEndpoint(storageEndpoint string) (string, string) { network, address := networkAndAddress(storageEndpoint) switch network { @@ -166,6 +254,8 @@ func ParseStorageEndpoint(storageEndpoint string) (string, string) { return network, address } +// networkAndAddress crudely splits a URL string into network (scheme) and address, +// where the address includes everything after the scheme/authority separator. func networkAndAddress(str string) (string, string) { parts := strings.SplitN(str, "://", 2) if len(parts) > 1 { diff --git a/vendor/github.com/k3s-io/kine/pkg/endpoint/http.go b/vendor/github.com/k3s-io/kine/pkg/endpoint/http.go new file mode 100644 index 0000000000..f66058da3b --- /dev/null +++ b/vendor/github.com/k3s-io/kine/pkg/endpoint/http.go @@ -0,0 +1,66 @@ +package endpoint + +import ( + "log" + "net/http" + "strings" + + "github.com/sirupsen/logrus" + "google.golang.org/grpc" +) + +var ( + etcdVersion = []byte(`{"etcdserver":"3.5.0","etcdcluster":"3.5.0"}`) + versionPath = "/version" +) + +// httpServer returns a HTTP server with the basic mux handler. +func httpServer() *http.Server { + // Set up root HTTP mux with basic response handlers + mux := http.NewServeMux() + handleBasic(mux) + + return &http.Server{ + Handler: mux, + ErrorLog: log.New(logrus.StandardLogger().Writer(), "kinehttp ", log.LstdFlags), + } +} + +// handleBasic binds basic HTTP response handlers to a mux. +func handleBasic(mux *http.ServeMux) { + mux.HandleFunc(versionPath, serveVersion) +} + +// serveVersion responds with a canned JSON version response. +func serveVersion(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r, http.MethodGet) { + return + } + w.Header().Set("Content-Type", "application/json") + w.Write(etcdVersion) +} + +// allowMethod returns true if a method is allowed, or false (after sending a +// MethodNotAllowed error to the client) if it is not. +func allowMethod(w http.ResponseWriter, r *http.Request, m string) bool { + if m == r.Method { + return true + } + w.Header().Set("Allow", m) + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return false +} + +// grpcHandlerFunc takes a GRPC server and HTTP handler, and returns a handler +// function that will route GRPC requests to the GRPC server, and everything +// else to the HTTP handler. This is based on sample code provided in the GRPC +// ServeHTTP documentation for sharing a port between GRPC and HTTP handlers. +func grpcHandlerFunc(grpcServer *grpc.Server, httpHandler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.ProtoMajor == 2 && strings.HasPrefix(r.Header.Get("Content-Type"), "application/grpc") { + grpcServer.ServeHTTP(w, r) + } else { + httpHandler.ServeHTTP(w, r) + } + }) +} diff --git a/vendor/github.com/k3s-io/kine/pkg/logstructured/logstructured.go b/vendor/github.com/k3s-io/kine/pkg/logstructured/logstructured.go index 3db92c125c..ceb867354b 100644 --- a/vendor/github.com/k3s-io/kine/pkg/logstructured/logstructured.go +++ b/vendor/github.com/k3s-io/kine/pkg/logstructured/logstructured.go @@ -17,6 +17,7 @@ type Log interface { Watch(ctx context.Context, prefix string) <-chan []*server.Event Count(ctx context.Context, prefix string) (int64, int64, error) Append(ctx context.Context, event *server.Event) (int64, error) + DbSize(ctx context.Context) (int64, error) } type LogStructured struct { @@ -367,3 +368,7 @@ func filter(events []*server.Event, rev int64) []*server.Event { return events } + +func (l *LogStructured) DbSize(ctx context.Context) (int64, error) { + return l.log.DbSize(ctx) +} diff --git a/vendor/github.com/k3s-io/kine/pkg/logstructured/sqllog/sql.go b/vendor/github.com/k3s-io/kine/pkg/logstructured/sqllog/sql.go index 5aa115ff28..2a1b4c7d92 100644 --- a/vendor/github.com/k3s-io/kine/pkg/logstructured/sqllog/sql.go +++ b/vendor/github.com/k3s-io/kine/pkg/logstructured/sqllog/sql.go @@ -7,7 +7,6 @@ import ( "time" "github.com/k3s-io/kine/pkg/broadcaster" - "github.com/k3s-io/kine/pkg/drivers/generic" "github.com/k3s-io/kine/pkg/server" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -22,13 +21,13 @@ const ( ) type SQLLog struct { - d Dialect + d server.Dialect broadcaster broadcaster.Broadcaster ctx context.Context notify chan int64 } -func New(d Dialect) *SQLLog { +func New(d server.Dialect) *SQLLog { l := &SQLLog{ d: d, notify: make(chan int64, 1024), @@ -36,23 +35,6 @@ func New(d Dialect) *SQLLog { return l } -type Dialect interface { - ListCurrent(ctx context.Context, prefix string, limit int64, includeDeleted bool) (*sql.Rows, error) - List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error) - Count(ctx context.Context, prefix string) (int64, int64, error) - CurrentRevision(ctx context.Context) (int64, error) - After(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error) - Insert(ctx context.Context, key string, create, delete bool, createRevision, previousRevision int64, ttl int64, value, prevValue []byte) (int64, error) - GetRevision(ctx context.Context, revision int64) (*sql.Rows, error) - DeleteRevision(ctx context.Context, revision int64) error - GetCompactRevision(ctx context.Context) (int64, error) - SetCompactRevision(ctx context.Context, revision int64) error - Compact(ctx context.Context, revision int64) (int64, error) - Fill(ctx context.Context, revision int64) error - IsFill(key string) bool - BeginTx(ctx context.Context, opts *sql.TxOptions) (*generic.Tx, error) -} - func (s *SQLLog) Start(ctx context.Context) error { s.ctx = ctx return s.compactStart(s.ctx) @@ -568,3 +550,7 @@ func safeCompactRev(targetCompactRev int64, currentRev int64) int64 { } return safeRev } + +func (s *SQLLog) DbSize(ctx context.Context) (int64, error) { + return s.d.GetSize(ctx) +} diff --git a/vendor/github.com/k3s-io/kine/pkg/server/cluster.go b/vendor/github.com/k3s-io/kine/pkg/server/cluster.go new file mode 100644 index 0000000000..718e105617 --- /dev/null +++ b/vendor/github.com/k3s-io/kine/pkg/server/cluster.go @@ -0,0 +1,64 @@ +package server + +import ( + "context" + "fmt" + "strings" + + "go.etcd.io/etcd/api/v3/etcdserverpb" + "google.golang.org/grpc/metadata" +) + +// explicit interface check +var _ etcdserverpb.ClusterServer = (*KVServerBridge)(nil) + +func (s *KVServerBridge) MemberAdd(context.Context, *etcdserverpb.MemberAddRequest) (*etcdserverpb.MemberAddResponse, error) { + return nil, fmt.Errorf("member add is not supported") +} + +func (s *KVServerBridge) MemberRemove(context.Context, *etcdserverpb.MemberRemoveRequest) (*etcdserverpb.MemberRemoveResponse, error) { + return nil, fmt.Errorf("member remove is not supported") +} + +func (s *KVServerBridge) MemberUpdate(context.Context, *etcdserverpb.MemberUpdateRequest) (*etcdserverpb.MemberUpdateResponse, error) { + return nil, fmt.Errorf("member update is not supported") +} + +func (s *KVServerBridge) MemberList(ctx context.Context, r *etcdserverpb.MemberListRequest) (*etcdserverpb.MemberListResponse, error) { + listenURL := authorityURL(ctx, s.limited.scheme) + return &etcdserverpb.MemberListResponse{ + Header: &etcdserverpb.ResponseHeader{}, + Members: []*etcdserverpb.Member{ + { + Name: "kine", + ClientURLs: []string{listenURL}, + PeerURLs: []string{listenURL}, + }, + }, + }, nil +} + +func (s *KVServerBridge) MemberPromote(context.Context, *etcdserverpb.MemberPromoteRequest) (*etcdserverpb.MemberPromoteResponse, error) { + return nil, fmt.Errorf("member promote is not supported") +} + +// authorityURL returns the URL of the authority (host) that the client connected to. +// If no scheme is included in the authority data, the provided scheme is used. If no +// authority data is provided, the default etcd endpoint is used. +func authorityURL(ctx context.Context, scheme string) string { + authority := "127.0.0.1:2379" + if md, ok := metadata.FromIncomingContext(ctx); ok { + authList := md.Get(":authority") + if len(authList) > 0 { + authority = authList[0] + // etcd v3.5 encodes the endpoint address list as "#initially=[ADDRESS1;ADDRESS2]" + if strings.HasPrefix(authority, "#initially=[") { + authority = strings.TrimPrefix(authority, "#initially=[") + authority = strings.TrimSuffix(authority, "]") + authority = strings.ReplaceAll(authority, ";", ",") + return authority + } + } + } + return scheme + "://" + authority +} diff --git a/vendor/github.com/k3s-io/kine/pkg/server/dbsize.go b/vendor/github.com/k3s-io/kine/pkg/server/dbsize.go new file mode 100644 index 0000000000..d638e352b3 --- /dev/null +++ b/vendor/github.com/k3s-io/kine/pkg/server/dbsize.go @@ -0,0 +1,7 @@ +package server + +import "context" + +func (l *LimitedServer) dbSize(ctx context.Context) (int64, error) { + return l.backend.DbSize(ctx) +} diff --git a/vendor/github.com/k3s-io/kine/pkg/server/kv.go b/vendor/github.com/k3s-io/kine/pkg/server/kv.go new file mode 100644 index 0000000000..05d48524dd --- /dev/null +++ b/vendor/github.com/k3s-io/kine/pkg/server/kv.go @@ -0,0 +1,126 @@ +package server + +import ( + "context" + "fmt" + + "github.com/sirupsen/logrus" + "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/mvccpb" +) + +// explicit interface check +var _ etcdserverpb.KVServer = (*KVServerBridge)(nil) + +func (k *KVServerBridge) Range(ctx context.Context, r *etcdserverpb.RangeRequest) (*etcdserverpb.RangeResponse, error) { + if r.KeysOnly { + return nil, unsupported("keysOnly") + } + + if r.MaxCreateRevision != 0 { + return nil, unsupported("maxCreateRevision") + } + + if r.SortOrder != 0 { + return nil, unsupported("sortOrder") + } + + if r.SortTarget != 0 { + return nil, unsupported("sortTarget") + } + + if r.Serializable { + return nil, unsupported("serializable") + } + + if r.KeysOnly { + return nil, unsupported("keysOnly") + } + + if r.MinModRevision != 0 { + return nil, unsupported("minModRevision") + } + + if r.MinCreateRevision != 0 { + return nil, unsupported("minCreateRevision") + } + + if r.MaxCreateRevision != 0 { + return nil, unsupported("maxCreateRevision") + } + + if r.MaxModRevision != 0 { + return nil, unsupported("maxModRevision") + } + + resp, err := k.limited.Range(ctx, r) + if err != nil { + logrus.Errorf("error while range on %s %s: %v", r.Key, r.RangeEnd, err) + return nil, err + } + + rangeResponse := &etcdserverpb.RangeResponse{ + More: resp.More, + Count: resp.Count, + Header: resp.Header, + Kvs: toKVs(resp.Kvs...), + } + + return rangeResponse, nil +} + +func toKVs(kvs ...*KeyValue) []*mvccpb.KeyValue { + if len(kvs) == 0 || kvs[0] == nil { + return nil + } + + ret := make([]*mvccpb.KeyValue, 0, len(kvs)) + for _, kv := range kvs { + newKV := toKV(kv) + if newKV != nil { + ret = append(ret, newKV) + } + } + return ret +} + +func toKV(kv *KeyValue) *mvccpb.KeyValue { + if kv == nil { + return nil + } + return &mvccpb.KeyValue{ + Key: []byte(kv.Key), + Value: kv.Value, + Lease: kv.Lease, + CreateRevision: kv.CreateRevision, + ModRevision: kv.ModRevision, + } +} + +func (k *KVServerBridge) Put(ctx context.Context, r *etcdserverpb.PutRequest) (*etcdserverpb.PutResponse, error) { + return nil, fmt.Errorf("put is not supported") +} + +func (k *KVServerBridge) DeleteRange(ctx context.Context, r *etcdserverpb.DeleteRangeRequest) (*etcdserverpb.DeleteRangeResponse, error) { + return nil, fmt.Errorf("delete is not supported") +} + +func (k *KVServerBridge) Txn(ctx context.Context, r *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) { + res, err := k.limited.Txn(ctx, r) + if err != nil { + logrus.Errorf("error in txn: %v", err) + } + return res, err +} + +func (k *KVServerBridge) Compact(ctx context.Context, r *etcdserverpb.CompactionRequest) (*etcdserverpb.CompactionResponse, error) { + return &etcdserverpb.CompactionResponse{ + Header: &etcdserverpb.ResponseHeader{ + Revision: r.Revision, + }, + }, nil +} + +func unsupported(field string) error { + return fmt.Errorf("%s is unsupported", field) +} diff --git a/vendor/github.com/k3s-io/kine/pkg/server/lease.go b/vendor/github.com/k3s-io/kine/pkg/server/lease.go index c2c8024b20..62a9f5c74d 100644 --- a/vendor/github.com/k3s-io/kine/pkg/server/lease.go +++ b/vendor/github.com/k3s-io/kine/pkg/server/lease.go @@ -7,6 +7,9 @@ import ( "go.etcd.io/etcd/api/v3/etcdserverpb" ) +// explicit interface check +var _ etcdserverpb.LeaseServer = (*KVServerBridge)(nil) + func (s *KVServerBridge) LeaseGrant(ctx context.Context, req *etcdserverpb.LeaseGrantRequest) (*etcdserverpb.LeaseGrantResponse, error) { return &etcdserverpb.LeaseGrantResponse{ Header: &etcdserverpb.ResponseHeader{}, diff --git a/vendor/github.com/k3s-io/kine/pkg/server/limited.go b/vendor/github.com/k3s-io/kine/pkg/server/limited.go index 2d362fc328..93e2212f9d 100644 --- a/vendor/github.com/k3s-io/kine/pkg/server/limited.go +++ b/vendor/github.com/k3s-io/kine/pkg/server/limited.go @@ -9,6 +9,7 @@ import ( type LimitedServer struct { backend Backend + scheme string } func (l *LimitedServer) Range(ctx context.Context, r *etcdserverpb.RangeRequest) (*RangeResponse, error) { diff --git a/vendor/github.com/k3s-io/kine/pkg/server/maintenance.go b/vendor/github.com/k3s-io/kine/pkg/server/maintenance.go new file mode 100644 index 0000000000..2f2feacbf9 --- /dev/null +++ b/vendor/github.com/k3s-io/kine/pkg/server/maintenance.go @@ -0,0 +1,50 @@ +package server + +import ( + "context" + "fmt" + + "go.etcd.io/etcd/api/v3/etcdserverpb" +) + +// explicit interface check +var _ etcdserverpb.MaintenanceServer = (*KVServerBridge)(nil) + +func (s *KVServerBridge) Alarm(context.Context, *etcdserverpb.AlarmRequest) (*etcdserverpb.AlarmResponse, error) { + return nil, fmt.Errorf("alarm is not supported") +} + +func (s *KVServerBridge) Status(ctx context.Context, r *etcdserverpb.StatusRequest) (*etcdserverpb.StatusResponse, error) { + size, err := s.limited.dbSize(ctx) + if err != nil { + return nil, err + } + return &etcdserverpb.StatusResponse{ + Header: &etcdserverpb.ResponseHeader{}, + DbSize: size, + }, nil +} + +func (s *KVServerBridge) Defragment(context.Context, *etcdserverpb.DefragmentRequest) (*etcdserverpb.DefragmentResponse, error) { + return nil, fmt.Errorf("defragment is not supported") +} + +func (s *KVServerBridge) Hash(context.Context, *etcdserverpb.HashRequest) (*etcdserverpb.HashResponse, error) { + return nil, fmt.Errorf("hash is not supported") +} + +func (s *KVServerBridge) HashKV(context.Context, *etcdserverpb.HashKVRequest) (*etcdserverpb.HashKVResponse, error) { + return nil, fmt.Errorf("hash kv is not supported") +} + +func (s *KVServerBridge) Snapshot(*etcdserverpb.SnapshotRequest, etcdserverpb.Maintenance_SnapshotServer) error { + return fmt.Errorf("snapshot is not supported") +} + +func (s *KVServerBridge) MoveLeader(context.Context, *etcdserverpb.MoveLeaderRequest) (*etcdserverpb.MoveLeaderResponse, error) { + return nil, fmt.Errorf("move leader is not supported") +} + +func (s *KVServerBridge) Downgrade(context.Context, *etcdserverpb.DowngradeRequest) (*etcdserverpb.DowngradeResponse, error) { + return nil, fmt.Errorf("downgrade is not supported") +} diff --git a/vendor/github.com/k3s-io/kine/pkg/server/server.go b/vendor/github.com/k3s-io/kine/pkg/server/server.go index 481584600f..e75a79c179 100644 --- a/vendor/github.com/k3s-io/kine/pkg/server/server.go +++ b/vendor/github.com/k3s-io/kine/pkg/server/server.go @@ -1,30 +1,21 @@ package server import ( - "context" - "fmt" - - "github.com/sirupsen/logrus" "go.etcd.io/etcd/api/v3/etcdserverpb" - "go.etcd.io/etcd/api/v3/mvccpb" "google.golang.org/grpc" "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" ) -var ( - _ etcdserverpb.KVServer = (*KVServerBridge)(nil) - _ etcdserverpb.WatchServer = (*KVServerBridge)(nil) -) - type KVServerBridge struct { limited *LimitedServer } -func New(backend Backend) *KVServerBridge { +func New(backend Backend, scheme string) *KVServerBridge { return &KVServerBridge{ limited: &LimitedServer{ backend: backend, + scheme: scheme, }, } } @@ -33,121 +24,10 @@ func (k *KVServerBridge) Register(server *grpc.Server) { etcdserverpb.RegisterLeaseServer(server, k) etcdserverpb.RegisterWatchServer(server, k) etcdserverpb.RegisterKVServer(server, k) + etcdserverpb.RegisterClusterServer(server, k) + etcdserverpb.RegisterMaintenanceServer(server, k) hsrv := health.NewServer() hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING) healthpb.RegisterHealthServer(server, hsrv) } - -func (k *KVServerBridge) Range(ctx context.Context, r *etcdserverpb.RangeRequest) (*etcdserverpb.RangeResponse, error) { - if r.KeysOnly { - return nil, unsupported("keysOnly") - } - - if r.MaxCreateRevision != 0 { - return nil, unsupported("maxCreateRevision") - } - - if r.SortOrder != 0 { - return nil, unsupported("sortOrder") - } - - if r.SortTarget != 0 { - return nil, unsupported("sortTarget") - } - - if r.Serializable { - return nil, unsupported("serializable") - } - - if r.KeysOnly { - return nil, unsupported("keysOnly") - } - - if r.MinModRevision != 0 { - return nil, unsupported("minModRevision") - } - - if r.MinCreateRevision != 0 { - return nil, unsupported("minCreateRevision") - } - - if r.MaxCreateRevision != 0 { - return nil, unsupported("maxCreateRevision") - } - - if r.MaxModRevision != 0 { - return nil, unsupported("maxModRevision") - } - - resp, err := k.limited.Range(ctx, r) - if err != nil { - logrus.Errorf("error while range on %s %s: %v", r.Key, r.RangeEnd, err) - return nil, err - } - - rangeResponse := &etcdserverpb.RangeResponse{ - More: resp.More, - Count: resp.Count, - Header: resp.Header, - Kvs: toKVs(resp.Kvs...), - } - - return rangeResponse, nil -} - -func toKVs(kvs ...*KeyValue) []*mvccpb.KeyValue { - if len(kvs) == 0 || kvs[0] == nil { - return nil - } - - ret := make([]*mvccpb.KeyValue, 0, len(kvs)) - for _, kv := range kvs { - newKV := toKV(kv) - if newKV != nil { - ret = append(ret, newKV) - } - } - return ret -} - -func toKV(kv *KeyValue) *mvccpb.KeyValue { - if kv == nil { - return nil - } - return &mvccpb.KeyValue{ - Key: []byte(kv.Key), - Value: kv.Value, - Lease: kv.Lease, - CreateRevision: kv.CreateRevision, - ModRevision: kv.ModRevision, - } -} - -func (k *KVServerBridge) Put(ctx context.Context, r *etcdserverpb.PutRequest) (*etcdserverpb.PutResponse, error) { - return nil, fmt.Errorf("put is not supported") -} - -func (k *KVServerBridge) DeleteRange(ctx context.Context, r *etcdserverpb.DeleteRangeRequest) (*etcdserverpb.DeleteRangeResponse, error) { - return nil, fmt.Errorf("delete is not supported") -} - -func (k *KVServerBridge) Txn(ctx context.Context, r *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) { - res, err := k.limited.Txn(ctx, r) - if err != nil { - logrus.Errorf("error in txn: %v", err) - } - return res, err -} - -func (k *KVServerBridge) Compact(ctx context.Context, r *etcdserverpb.CompactionRequest) (*etcdserverpb.CompactionResponse, error) { - return &etcdserverpb.CompactionResponse{ - Header: &etcdserverpb.ResponseHeader{ - Revision: r.Revision, - }, - }, nil -} - -func unsupported(field string) error { - return fmt.Errorf("%s is unsupported", field) -} diff --git a/vendor/github.com/k3s-io/kine/pkg/server/types.go b/vendor/github.com/k3s-io/kine/pkg/server/types.go index 0cd0c13415..768b66de13 100644 --- a/vendor/github.com/k3s-io/kine/pkg/server/types.go +++ b/vendor/github.com/k3s-io/kine/pkg/server/types.go @@ -2,6 +2,7 @@ package server import ( "context" + "database/sql" "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" ) @@ -20,6 +21,38 @@ type Backend interface { Count(ctx context.Context, prefix string) (int64, int64, error) Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, *KeyValue, bool, error) Watch(ctx context.Context, key string, revision int64) <-chan []*Event + DbSize(ctx context.Context) (int64, error) +} + +type Dialect interface { + ListCurrent(ctx context.Context, prefix string, limit int64, includeDeleted bool) (*sql.Rows, error) + List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error) + Count(ctx context.Context, prefix string) (int64, int64, error) + CurrentRevision(ctx context.Context) (int64, error) + After(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error) + Insert(ctx context.Context, key string, create, delete bool, createRevision, previousRevision int64, ttl int64, value, prevValue []byte) (int64, error) + GetRevision(ctx context.Context, revision int64) (*sql.Rows, error) + DeleteRevision(ctx context.Context, revision int64) error + GetCompactRevision(ctx context.Context) (int64, error) + SetCompactRevision(ctx context.Context, revision int64) error + Compact(ctx context.Context, revision int64) (int64, error) + Fill(ctx context.Context, revision int64) error + IsFill(key string) bool + BeginTx(ctx context.Context, opts *sql.TxOptions) (Transaction, error) + GetSize(ctx context.Context) (int64, error) +} + +type Transaction interface { + Commit() error + MustCommit() + Rollback() error + MustRollback() + GetCompactRevision(ctx context.Context) (int64, error) + SetCompactRevision(ctx context.Context, revision int64) error + Compact(ctx context.Context, revision int64) (int64, error) + GetRevision(ctx context.Context, revision int64) (*sql.Rows, error) + DeleteRevision(ctx context.Context, revision int64) error + CurrentRevision(ctx context.Context) (int64, error) } type KeyValue struct { diff --git a/vendor/github.com/k3s-io/kine/pkg/server/watch.go b/vendor/github.com/k3s-io/kine/pkg/server/watch.go index 4735185081..cc38c83c43 100644 --- a/vendor/github.com/k3s-io/kine/pkg/server/watch.go +++ b/vendor/github.com/k3s-io/kine/pkg/server/watch.go @@ -10,9 +10,10 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" ) -var ( - watchID int64 -) +var watchID int64 + +// explicit interface check +var _ etcdserverpb.WatchServer = (*KVServerBridge)(nil) func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { w := watcher{ diff --git a/vendor/modules.txt b/vendor/modules.txt index e109f871fa..ceafb33513 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -712,7 +712,7 @@ github.com/k3s-io/helm-controller/pkg/apis/helm.cattle.io/v1 github.com/k3s-io/helm-controller/pkg/generated/controllers/helm.cattle.io github.com/k3s-io/helm-controller/pkg/generated/controllers/helm.cattle.io/v1 github.com/k3s-io/helm-controller/pkg/helm -# github.com/k3s-io/kine v0.7.3 +# github.com/k3s-io/kine v0.8.0 ## explicit github.com/k3s-io/kine/pkg/broadcaster github.com/k3s-io/kine/pkg/client