Add jitter to client config retry

Also:
* Replaces labeled for/continue RETRY loops with wait helpers for improved readability
* Pulls secrets and nodes from cache for node password verification
* Migrate nodepassword tests to wrangler mocks for better code reuse

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
This commit is contained in:
Brad Davidson 2023-11-15 01:24:58 +00:00 committed by Brad Davidson
parent fa4c180637
commit 6c544a4679
6 changed files with 132 additions and 150 deletions

2
go.mod
View File

@ -104,6 +104,7 @@ require (
github.com/go-bindata/go-bindata v3.1.2+incompatible
github.com/go-sql-driver/mysql v1.7.1
github.com/go-test/deep v1.0.7
github.com/golang/mock v1.6.0
github.com/google/cadvisor v0.47.3
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
@ -257,7 +258,6 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/cel-go v0.16.1 // indirect

View File

@ -33,6 +33,7 @@ import (
"github.com/rancher/wrangler/pkg/slice"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/wait"
utilsnet "k8s.io/utils/net"
)
@ -42,22 +43,28 @@ const (
// Get returns a pointer to a completed Node configuration struct,
// containing a merging of the local CLI configuration with settings from the server.
// Node configuration includes client certificates, which requires node password verification,
// so this is somewhat computationally expensive on the server side, and is retried with jitter
// to avoid having clients hammer on the server at fixed periods.
// A call to this will bock until agent configuration is successfully returned by the
// server.
func Get(ctx context.Context, agent cmds.Agent, proxy proxy.Proxy) *config.Node {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
RETRY:
for {
agentConfig, err := get(ctx, &agent, proxy)
var agentConfig *config.Node
var err error
// This would be more clear as wait.PollImmediateUntilWithContext, but that function
// does not support jittering, so we instead use wait.JitterUntilWithContext, and cancel
// the context on success.
ctx, cancel := context.WithCancel(ctx)
wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
agentConfig, err = get(ctx, &agent, proxy)
if err != nil {
logrus.Infof("Waiting to retrieve agent configuration; server is not ready: %v", err)
for range ticker.C {
continue RETRY
}
} else {
cancel()
}
return agentConfig
}
}, 5*time.Second, 1.0, true)
return agentConfig
}
// KubeProxyDisabled returns a bool indicating whether or not kube-proxy has been disabled in the
@ -65,42 +72,40 @@ RETRY:
// after all startup hooks have completed, so a call to this will block until after the server's
// readyz endpoint returns OK.
func KubeProxyDisabled(ctx context.Context, node *config.Node, proxy proxy.Proxy) bool {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
RETRY:
for {
disabled, err := getKubeProxyDisabled(ctx, node, proxy)
var disabled bool
var err error
wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) {
disabled, err = getKubeProxyDisabled(ctx, node, proxy)
if err != nil {
logrus.Infof("Waiting to retrieve kube-proxy configuration; server is not ready: %v", err)
for range ticker.C {
continue RETRY
}
return false, nil
}
return disabled
}
return true, nil
})
return disabled
}
// APIServers returns a list of apiserver endpoints, suitable for seeding client loadbalancer configurations.
// This function will block until it can return a populated list of apiservers, or if the remote server returns
// an error (indicating that it does not support this functionality).
func APIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) []string {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
RETRY:
for {
addresses, err := getAPIServers(ctx, node, proxy)
var addresses []string
var err error
wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) {
addresses, err = getAPIServers(ctx, node, proxy)
if err != nil {
logrus.Infof("Failed to retrieve list of apiservers from server: %v", err)
return nil
return false, err
}
if len(addresses) == 0 {
logrus.Infof("Waiting for apiserver addresses")
for range ticker.C {
continue RETRY
}
return false, nil
}
return addresses
}
return true, nil
})
return addresses
}
type HTTPRequester func(u string, client *http.Client, username, password, token string) ([]byte, error)

View File

@ -13,15 +13,14 @@ import (
func Register(ctx context.Context,
modCoreDNS bool,
secretClient coreclient.SecretClient,
configMap coreclient.ConfigMapController,
secrets coreclient.SecretController,
configMaps coreclient.ConfigMapController,
nodes coreclient.NodeController,
) error {
h := &handler{
modCoreDNS: modCoreDNS,
secretClient: secretClient,
configCache: configMap.Cache(),
configClient: configMap,
modCoreDNS: modCoreDNS,
secrets: secrets,
configMaps: configMaps,
}
nodes.OnChange(ctx, "node", h.onChange)
nodes.OnRemove(ctx, "node", h.onRemove)
@ -30,10 +29,9 @@ func Register(ctx context.Context,
}
type handler struct {
modCoreDNS bool
secretClient coreclient.SecretClient
configCache coreclient.ConfigMapCache
configClient coreclient.ConfigMapClient
modCoreDNS bool
secrets coreclient.SecretController
configMaps coreclient.ConfigMapController
}
func (h *handler) onChange(key string, node *core.Node) (*core.Node, error) {
@ -78,7 +76,7 @@ func (h *handler) updateCoreDNSConfigMap(nodeName, nodeAddress string, removed b
return nil
}
configMapCache, err := h.configCache.Get("kube-system", "coredns")
configMapCache, err := h.configMaps.Cache().Get("kube-system", "coredns")
if err != nil || configMapCache == nil {
logrus.Warn(errors.Wrap(err, "Unable to fetch coredns config map"))
return nil
@ -120,7 +118,7 @@ func (h *handler) updateCoreDNSConfigMap(nodeName, nodeAddress string, removed b
}
configMap.Data["NodeHosts"] = newHosts
if _, err := h.configClient.Update(configMap); err != nil {
if _, err := h.configMaps.Update(configMap); err != nil {
return err
}
@ -135,5 +133,5 @@ func (h *handler) updateCoreDNSConfigMap(nodeName, nodeAddress string, removed b
}
func (h *handler) removeNodePassword(nodeName string) error {
return nodepassword.Delete(h.secretClient, nodeName)
return nodepassword.Delete(h.secrets, nodeName)
}

View File

@ -51,9 +51,9 @@ func getSecretName(nodeName string) string {
return strings.ToLower(nodeName + ".node-password." + version.Program)
}
func verifyHash(secretClient coreclient.SecretClient, nodeName, pass string) error {
func verifyHash(secretClient coreclient.SecretController, nodeName, pass string) error {
name := getSecretName(nodeName)
secret, err := secretClient.Get(metav1.NamespaceSystem, name, metav1.GetOptions{})
secret, err := secretClient.Cache().Get(metav1.NamespaceSystem, name)
if err != nil {
return &passwordError{node: nodeName, err: err}
}
@ -67,7 +67,7 @@ func verifyHash(secretClient coreclient.SecretClient, nodeName, pass string) err
}
// Ensure will verify a node-password secret if it exists, otherwise it will create one
func Ensure(secretClient coreclient.SecretClient, nodeName, pass string) error {
func Ensure(secretClient coreclient.SecretController, nodeName, pass string) error {
err := verifyHash(secretClient, nodeName, pass)
if apierrors.IsNotFound(err) {
var hash string
@ -88,12 +88,12 @@ func Ensure(secretClient coreclient.SecretClient, nodeName, pass string) error {
}
// Delete will remove a node-password secret
func Delete(secretClient coreclient.SecretClient, nodeName string) error {
func Delete(secretClient coreclient.SecretController, nodeName string) error {
return secretClient.Delete(metav1.NamespaceSystem, getSecretName(nodeName), &metav1.DeleteOptions{})
}
// MigrateFile moves password file entries to secrets
func MigrateFile(secretClient coreclient.SecretClient, nodeClient coreclient.NodeClient, passwordFile string) error {
func MigrateFile(secretClient coreclient.SecretController, nodeClient coreclient.NodeController, passwordFile string) error {
_, err := os.Stat(passwordFile)
if os.IsNotExist(err) {
return nil
@ -108,11 +108,9 @@ func MigrateFile(secretClient coreclient.SecretClient, nodeClient coreclient.Nod
}
nodeNames := []string{}
nodeList, _ := nodeClient.List(metav1.ListOptions{})
if nodeList != nil {
for _, node := range nodeList.Items {
nodeNames = append(nodeNames, node.Name)
}
nodeList, _ := nodeClient.Cache().List(nil)
for _, node := range nodeList {
nodeNames = append(nodeNames, node.Name)
}
if len(nodeNames) == 0 {
nodeNames = append(nodeNames, passwd.Users()...)

View File

@ -8,14 +8,13 @@ import (
"runtime"
"testing"
"github.com/rancher/wrangler/pkg/generic"
"github.com/golang/mock/gomock"
"github.com/rancher/wrangler/pkg/generic/fake"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
)
const migrateNumNodes = 10
@ -29,20 +28,29 @@ func Test_UnitAsserts(t *testing.T) {
func Test_UnitEnsureDelete(t *testing.T) {
logMemUsage(t)
secretClient := &mockSecretClient{}
ctrl := gomock.NewController(t)
secretClient := fake.NewMockControllerInterface[*v1.Secret, *v1.SecretList](ctrl)
secretCache := fake.NewMockCacheInterface[*v1.Secret](ctrl)
secretStore := &mockSecretStore{}
// Set up expected call counts for tests
// Expect to see 2 creates, any number of cache gets, and 2 deletes.
secretClient.EXPECT().Create(gomock.Any()).Times(2).DoAndReturn(secretStore.Create)
secretClient.EXPECT().Delete(gomock.Any(), gomock.Any(), gomock.Any()).Times(2).DoAndReturn(secretStore.Delete)
secretClient.EXPECT().Cache().AnyTimes().Return(secretCache)
secretCache.EXPECT().Get(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(secretStore.Get)
// Run tests
assertEqual(t, Ensure(secretClient, "node1", "Hello World"), nil)
assertEqual(t, Ensure(secretClient, "node1", "Hello World"), nil)
assertNotEqual(t, Ensure(secretClient, "node1", "Goodbye World"), nil)
assertEqual(t, secretClient.created, 1)
assertEqual(t, Delete(secretClient, "node1"), nil)
assertNotEqual(t, Delete(secretClient, "node1"), nil)
assertEqual(t, secretClient.deleted, 1)
assertEqual(t, Ensure(secretClient, "node1", "Hello Universe"), nil)
assertNotEqual(t, Ensure(secretClient, "node1", "Hello World"), nil)
assertEqual(t, Ensure(secretClient, "node1", "Hello Universe"), nil)
assertEqual(t, secretClient.created, 2)
logMemUsage(t)
}
@ -51,16 +59,32 @@ func Test_UnitMigrateFile(t *testing.T) {
nodePasswordFile := generateNodePasswordFile(migrateNumNodes)
defer os.Remove(nodePasswordFile)
secretClient := &mockSecretClient{}
nodeClient := &mockNodeClient{}
ctrl := gomock.NewController(t)
secretClient := fake.NewMockControllerInterface[*v1.Secret, *v1.SecretList](ctrl)
secretCache := fake.NewMockCacheInterface[*v1.Secret](ctrl)
secretStore := &mockSecretStore{}
nodeClient := fake.NewMockNonNamespacedControllerInterface[*v1.Node, *v1.NodeList](ctrl)
nodeCache := fake.NewMockNonNamespacedCacheInterface[*v1.Node](ctrl)
nodeStore := &mockNodeStore{}
// Set up expected call counts for tests
// Expect to see 1 node list, any number of cache gets, and however many
// creates as we are migrating.
secretClient.EXPECT().Create(gomock.Any()).Times(migrateNumNodes).DoAndReturn(secretStore.Create)
secretClient.EXPECT().Cache().AnyTimes().Return(secretCache)
secretCache.EXPECT().Get(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(secretStore.Get)
nodeClient.EXPECT().Cache().AnyTimes().Return(nodeCache)
nodeCache.EXPECT().List(gomock.Any()).Times(1).DoAndReturn(nodeStore.List)
// Run tests
logMemUsage(t)
if err := MigrateFile(secretClient, nodeClient, nodePasswordFile); err != nil {
log.Fatal(err)
t.Fatal(err)
}
logMemUsage(t)
assertEqual(t, secretClient.created, migrateNumNodes)
assertNotEqual(t, Ensure(secretClient, "node1", "Hello World"), nil)
assertEqual(t, Ensure(secretClient, "node1", "node1"), nil)
}
@ -69,25 +93,43 @@ func Test_UnitMigrateFileNodes(t *testing.T) {
nodePasswordFile := generateNodePasswordFile(migrateNumNodes)
defer os.Remove(nodePasswordFile)
secretClient := &mockSecretClient{}
nodeClient := &mockNodeClient{}
nodeClient.nodes = make([]v1.Node, createNumNodes, createNumNodes)
for i := range nodeClient.nodes {
nodeClient.nodes[i].Name = fmt.Sprintf("node%d", i+1)
ctrl := gomock.NewController(t)
secretClient := fake.NewMockControllerInterface[*v1.Secret, *v1.SecretList](ctrl)
secretCache := fake.NewMockCacheInterface[*v1.Secret](ctrl)
secretStore := &mockSecretStore{}
nodeClient := fake.NewMockNonNamespacedControllerInterface[*v1.Node, *v1.NodeList](ctrl)
nodeCache := fake.NewMockNonNamespacedCacheInterface[*v1.Node](ctrl)
nodeStore := &mockNodeStore{}
nodeStore.nodes = make([]v1.Node, createNumNodes, createNumNodes)
for i := range nodeStore.nodes {
nodeStore.nodes[i].Name = fmt.Sprintf("node%d", i+1)
}
// Set up expected call counts for tests
// Expect to see 1 node list, any number of cache gets, and however many
// creates as we are migrating - plus an extra new node at the end.
secretClient.EXPECT().Create(gomock.Any()).Times(migrateNumNodes + 1).DoAndReturn(secretStore.Create)
secretClient.EXPECT().Cache().AnyTimes().Return(secretCache)
secretCache.EXPECT().Get(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(secretStore.Get)
nodeClient.EXPECT().Cache().AnyTimes().Return(nodeCache)
nodeCache.EXPECT().List(gomock.Any()).Times(1).DoAndReturn(nodeStore.List)
// Run tests
logMemUsage(t)
if err := MigrateFile(secretClient, nodeClient, nodePasswordFile); err != nil {
log.Fatal(err)
t.Fatal(err)
}
logMemUsage(t)
assertEqual(t, secretClient.created, createNumNodes)
for _, node := range nodeClient.nodes {
for _, node := range nodeStore.nodes {
assertNotEqual(t, Ensure(secretClient, node.Name, "wrong-password"), nil)
assertEqual(t, Ensure(secretClient, node.Name, node.Name), nil)
}
newNode := fmt.Sprintf("node%d", createNumNodes+1)
newNode := fmt.Sprintf("node%d", migrateNumNodes+1)
assertEqual(t, Ensure(secretClient, newNode, "new-password"), nil)
assertNotEqual(t, Ensure(secretClient, newNode, "wrong-password"), nil)
}
@ -100,16 +142,13 @@ func Test_PasswordError(t *testing.T) {
}
// --------------------------
// mock secret store interface
// mock secret client interface
type mockSecretClient struct {
type mockSecretStore struct {
entries map[string]map[string]v1.Secret
created int
deleted int
}
func (m *mockSecretClient) Create(secret *v1.Secret) (*v1.Secret, error) {
func (m *mockSecretStore) Create(secret *v1.Secret) (*v1.Secret, error) {
if m.entries == nil {
m.entries = map[string]map[string]v1.Secret{}
}
@ -119,16 +158,11 @@ func (m *mockSecretClient) Create(secret *v1.Secret) (*v1.Secret, error) {
if _, ok := m.entries[secret.Namespace][secret.Name]; ok {
return nil, errorAlreadyExists()
}
m.created++
m.entries[secret.Namespace][secret.Name] = *secret
return secret, nil
}
func (m *mockSecretClient) Update(secret *v1.Secret) (*v1.Secret, error) {
return nil, errorNotImplemented()
}
func (m *mockSecretClient) Delete(namespace, name string, options *metav1.DeleteOptions) error {
func (m *mockSecretStore) Delete(namespace, name string, options *metav1.DeleteOptions) error {
if m.entries == nil {
return errorNotFound()
}
@ -138,12 +172,11 @@ func (m *mockSecretClient) Delete(namespace, name string, options *metav1.Delete
if _, ok := m.entries[namespace][name]; !ok {
return errorNotFound()
}
m.deleted++
delete(m.entries[namespace], name)
return nil
}
func (m *mockSecretClient) Get(namespace, name string, options metav1.GetOptions) (*v1.Secret, error) {
func (m *mockSecretStore) Get(namespace, name string) (*v1.Secret, error) {
if m.entries == nil {
return nil, errorNotFound()
}
@ -156,65 +189,18 @@ func (m *mockSecretClient) Get(namespace, name string, options metav1.GetOptions
return nil, errorNotFound()
}
func (m *mockSecretClient) List(namespace string, opts metav1.ListOptions) (*v1.SecretList, error) {
return nil, errorNotImplemented()
}
func (m *mockSecretClient) Watch(namespace string, opts metav1.ListOptions) (watch.Interface, error) {
return nil, errorNotImplemented()
}
func (m *mockSecretClient) Patch(namespace, name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.Secret, err error) {
return nil, errorNotImplemented()
}
func (m *mockSecretClient) UpdateStatus(secret *v1.Secret) (*v1.Secret, error) {
return nil, errorNotImplemented()
}
func (m *mockSecretClient) WithImpersonation(rest.ImpersonationConfig) (generic.ClientInterface[*v1.Secret, *v1.SecretList], error) {
return nil, errorNotImplemented()
}
// --------------------------
// mock node store interface
// mock node client interface
type mockNodeClient struct {
type mockNodeStore struct {
nodes []v1.Node
}
func (m *mockNodeClient) Create(node *v1.Node) (*v1.Node, error) {
return nil, errorNotImplemented()
}
func (m *mockNodeClient) Update(node *v1.Node) (*v1.Node, error) {
return nil, errorNotImplemented()
}
func (m *mockNodeClient) UpdateStatus(node *v1.Node) (*v1.Node, error) {
return nil, errorNotImplemented()
}
func (m *mockNodeClient) Delete(name string, options *metav1.DeleteOptions) error {
return errorNotImplemented()
}
func (m *mockNodeClient) Get(name string, options metav1.GetOptions) (*v1.Node, error) {
return nil, errorNotImplemented()
}
func (m *mockNodeClient) List(opts metav1.ListOptions) (*v1.NodeList, error) {
return &v1.NodeList{Items: m.nodes}, nil
}
func (m *mockNodeClient) Watch(opts metav1.ListOptions) (watch.Interface, error) {
return nil, errorNotImplemented()
}
func (m *mockNodeClient) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.Node, err error) {
return nil, errorNotImplemented()
}
func (m *mockNodeClient) WithImpersonation(rest.ImpersonationConfig) (generic.NonNamespacedClientInterface[*v1.Node, *v1.NodeList], error) {
return nil, errorNotImplemented()
func (m *mockNodeStore) List(ls labels.Selector) ([]v1.Node, error) {
return m.nodes, nil
}
// --------------------------
// utility functions
func assertEqual(t *testing.T, a interface{}, b interface{}) {
@ -255,11 +241,6 @@ func errorAlreadyExists() error {
return apierrors.NewAlreadyExists(schema.GroupResource{}, "already-exists")
}
func errorNotImplemented() error {
log.Fatal("not implemented")
return apierrors.NewMethodNotSupported(schema.GroupResource{}, "not-implemented")
}
func logMemUsage(t *testing.T) {
var stats runtime.MemStats
runtime.ReadMemStats(&stats)

View File

@ -431,8 +431,8 @@ type nodeInfo struct {
func passwordBootstrap(ctx context.Context, config *Config) nodePassBootstrapper {
runtime := config.ControlConfig.Runtime
deferredNodes := map[string]bool{}
var secretClient coreclient.SecretClient
var nodeClient coreclient.NodeClient
var secretClient coreclient.SecretController
var nodeClient coreclient.NodeController
var mu sync.Mutex
return nodePassBootstrapper(func(req *http.Request) (string, int, error) {
@ -535,9 +535,9 @@ func verifyRemotePassword(ctx context.Context, config *Config, mu *sync.Mutex, d
return node.Name, http.StatusOK, nil
}
func verifyNode(ctx context.Context, nodeClient coreclient.NodeClient, node *nodeInfo) error {
func verifyNode(ctx context.Context, nodeClient coreclient.NodeController, node *nodeInfo) error {
if nodeName, isNodeAuth := identifier.NodeIdentity(node.User); isNodeAuth {
if _, err := nodeClient.Get(nodeName, metav1.GetOptions{}); err != nil {
if _, err := nodeClient.Cache().Get(nodeName); err != nil {
return errors.Wrap(err, "unable to verify node identity")
}
}