k3s/pkg/dqlite/server.go
2019-12-21 08:51:39 -07:00

246 lines
5.9 KiB
Go

package dqlite
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/canonical/go-dqlite"
"github.com/canonical/go-dqlite/client"
"github.com/pkg/errors"
controllerclient "github.com/rancher/k3s/pkg/dqlite/controller/client"
"github.com/rancher/k3s/pkg/dqlite/dialer"
dqlitedriver "github.com/rancher/kine/pkg/drivers/dqlite"
v1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/net"
)
const (
PeersFile = "peers.db"
NodeIDFile = "node-id"
)
var (
ignoreFile = map[string]bool{
PeersFile: true,
NodeIDFile: true,
}
)
type Certs struct {
ServerTrust *x509.Certificate
ClientTrust *x509.Certificate
ClientCert tls.Certificate
}
type DQLite struct {
ClientCA string
ClientCAKey string
ClientCert string
ClientCertKey string
ServerCA string
ServerCAKey string
AdvertiseIP string
AdvertisePort int
DataDir string
NodeStore client.NodeStore
NodeInfo client.NodeInfo
node *dqlite.Node
StorageEndpoint string
NodeControllerGetter NodeControllerGetter
clientOpts []client.Option
}
type NodeControllerGetter func() v1.NodeController
func New(dataDir, advertiseIP string, advertisePort int, getter NodeControllerGetter) *DQLite {
return &DQLite{
AdvertiseIP: advertiseIP,
AdvertisePort: advertisePort,
DataDir: dataDir,
NodeControllerGetter: getter,
}
}
func (d *DQLite) Start(ctx context.Context, initCluster, resetCluster bool, certs *Certs, next http.Handler) (http.Handler, error) {
bindAddress := d.getBindAddress()
clientTLSConfig, err := getClientTLSConfig(certs.ClientCert, certs.ServerTrust)
if err != nil {
return nil, err
}
advertise, err := getAdvertiseAddress(d.AdvertiseIP, d.AdvertisePort)
if err != nil {
return nil, errors.Wrap(err, "get advertise address")
}
dial, err := getDialer(advertise, bindAddress, clientTLSConfig)
if err != nil {
return nil, err
}
dqlitedriver.Dialer = dial
dqlitedriver.Logger = log()
d.clientOpts = append(d.clientOpts, client.WithDialFunc(dial), client.WithLogFunc(log()))
nodeInfo, node, err := getNode(d.DataDir, advertise, bindAddress, initCluster, dial)
if err != nil {
return nil, err
}
d.NodeInfo = nodeInfo
d.node = node
go func() {
<-ctx.Done()
node.Close()
}()
if err := d.nodeStore(ctx, initCluster); err != nil {
return nil, err
}
go d.startController(ctx)
if !resetCluster {
if err := node.Start(); err != nil {
return nil, err
}
}
return router(ctx, next, nodeInfo, certs.ClientTrust, "kube-apiserver", bindAddress), nil
}
func (d *DQLite) startController(ctx context.Context) {
for {
if nc := d.NodeControllerGetter(); nc != nil {
if os.Getenv("NODE_NAME") == "" {
logrus.Errorf("--disable-agent is not compatible with dqlite")
} else {
break
}
}
time.Sleep(time.Second)
}
controllerclient.Register(ctx, os.Getenv("NODE_NAME"), d.NodeInfo, d.NodeStore, d.NodeControllerGetter(), d.clientOpts)
}
func (d *DQLite) nodeStore(ctx context.Context, initCluster bool) error {
peerDB := filepath.Join(GetDBDir(d.DataDir), PeersFile)
ns, err := client.DefaultNodeStore(peerDB)
if err != nil {
return err
}
d.NodeStore = ns
d.StorageEndpoint = fmt.Sprintf("dqlite://?peer-file=%s", peerDB)
if initCluster {
if err := dqlitedriver.AddPeers(ctx, d.NodeStore, d.NodeInfo); err != nil {
return err
}
}
return nil
}
func getAdvertiseAddress(advertiseIP string, advertisePort int) (string, error) {
ip := advertiseIP
if ip == "" {
ipAddr, err := net.ChooseHostInterface()
if err != nil {
return "", err
}
ip = ipAddr.String()
}
return fmt.Sprintf("%s:%d", ip, advertisePort), nil
}
func getClientTLSConfig(cert tls.Certificate, ca *x509.Certificate) (*tls.Config, error) {
tlsConfig := &tls.Config{
RootCAs: x509.NewCertPool(),
Certificates: []tls.Certificate{
cert,
},
ServerName: "kubernetes",
}
tlsConfig.RootCAs.AddCert(ca)
return tlsConfig, nil
}
func getDialer(advertiseAddress, bindAddress string, tlsConfig *tls.Config) (client.DialFunc, error) {
return dialer.NewHTTPDialer(advertiseAddress, bindAddress, tlsConfig)
}
func GetDBDir(dataDir string) string {
return filepath.Join(dataDir, "db", "state.dqlite")
}
func getNode(dataDir string, advertiseAddress, bindAddress string, initCluster bool, dial client.DialFunc) (dqlite.NodeInfo, *dqlite.Node, error) {
id, err := getClusterID(initCluster, dataDir)
if err != nil {
return dqlite.NodeInfo{}, nil, errors.Wrap(err, "reading cluster id")
}
dbDir := GetDBDir(dataDir)
node, err := dqlite.New(id, advertiseAddress, dbDir,
dqlite.WithBindAddress(bindAddress),
dqlite.WithDialFunc(dial),
dqlite.WithNetworkLatency(20*time.Millisecond))
return dqlite.NodeInfo{
ID: id,
Address: advertiseAddress,
}, node, err
}
func writeClusterID(id uint64, dataDir string) error {
idFile := filepath.Join(GetDBDir(dataDir), NodeIDFile)
if err := os.MkdirAll(filepath.Dir(idFile), 0700); err != nil {
return err
}
return ioutil.WriteFile(idFile, []byte(strconv.FormatUint(id, 10)), 0644)
}
func deleteClusterID(dataDir string) {
os.Remove(filepath.Join(GetDBDir(dataDir), NodeIDFile))
}
func getClusterID(initCluster bool, dataDir string) (uint64, error) {
idFile := filepath.Join(GetDBDir(dataDir), NodeIDFile)
content, err := ioutil.ReadFile(idFile)
if os.IsNotExist(err) {
content = nil
} else if err != nil {
return 0, err
}
idStr := strings.TrimSpace(string(content))
if idStr == "" {
id := uint64(rand.Intn(1 << 20))
if initCluster {
id = 1
}
return id, writeClusterID(id, dataDir)
}
return strconv.ParseUint(idStr, 10, 64)
}
func (d *DQLite) getBindAddress() string {
// only anonymous works???
return "@" + filepath.Join(GetDBDir(d.DataDir), "dqlite.sock")
}