mirror of
https://github.com/k3s-io/k3s.git
synced 2024-06-07 19:41:36 +00:00
ad41fb8c96
Fixes an issue where CRDs were being created without schema, allowing resources with invalid content to be created, later stalling the controller ListWatch event channel when the invalid resources could not be deserialized. This also requires moving Addon GVK tracking from a status field to an annotation, as the GroupVersionKind type has special handling internal to Kubernetes that prevents it from being serialized to the CRD when schema validation is enabled. Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
502 lines
16 KiB
Go
502 lines
16 KiB
Go
package deploy
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/k3s-io/k3s/pkg/agent/util"
|
|
apisv1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
|
|
controllersv1 "github.com/k3s-io/k3s/pkg/generated/controllers/k3s.cattle.io/v1"
|
|
pkgutil "github.com/k3s-io/k3s/pkg/util"
|
|
errors2 "github.com/pkg/errors"
|
|
"github.com/rancher/wrangler/pkg/apply"
|
|
"github.com/rancher/wrangler/pkg/kv"
|
|
"github.com/rancher/wrangler/pkg/merr"
|
|
"github.com/rancher/wrangler/pkg/objectset"
|
|
"github.com/sirupsen/logrus"
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
yamlDecoder "k8s.io/apimachinery/pkg/util/yaml"
|
|
"k8s.io/client-go/discovery"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/tools/record"
|
|
)
|
|
|
|
const (
|
|
ControllerName = "deploy"
|
|
GVKAnnotation = "addon.k3s.cattle.io/gvks"
|
|
startKey = "_start_"
|
|
gvkSep = ";"
|
|
)
|
|
|
|
// WatchFiles sets up an OnChange callback to start a periodic goroutine to watch files for changes once the controller has started up.
|
|
func WatchFiles(ctx context.Context, client kubernetes.Interface, apply apply.Apply, addons controllersv1.AddonController, disables map[string]bool, bases ...string) error {
|
|
w := &watcher{
|
|
apply: apply,
|
|
addonCache: addons.Cache(),
|
|
addons: addons,
|
|
bases: bases,
|
|
disables: disables,
|
|
modTime: map[string]time.Time{},
|
|
gvkCache: map[schema.GroupVersionKind]bool{},
|
|
discovery: client.Discovery(),
|
|
}
|
|
|
|
addons.Enqueue(metav1.NamespaceNone, startKey)
|
|
addons.OnChange(ctx, "addon-start", func(key string, _ *apisv1.Addon) (*apisv1.Addon, error) {
|
|
if key == startKey {
|
|
go w.start(ctx, client)
|
|
}
|
|
return nil, nil
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
type watcher struct {
|
|
sync.Mutex
|
|
|
|
apply apply.Apply
|
|
addonCache controllersv1.AddonCache
|
|
addons controllersv1.AddonClient
|
|
bases []string
|
|
disables map[string]bool
|
|
modTime map[string]time.Time
|
|
gvkCache map[schema.GroupVersionKind]bool
|
|
recorder record.EventRecorder
|
|
discovery discovery.DiscoveryInterface
|
|
}
|
|
|
|
// start calls listFiles at regular intervals to trigger application of manifests that have changed on disk.
|
|
func (w *watcher) start(ctx context.Context, client kubernetes.Interface) {
|
|
w.recorder = pkgutil.BuildControllerEventRecorder(client, ControllerName, metav1.NamespaceSystem)
|
|
force := true
|
|
for {
|
|
if err := w.listFiles(force); err == nil {
|
|
force = false
|
|
} else {
|
|
logrus.Errorf("Failed to process config: %v", err)
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(15 * time.Second):
|
|
}
|
|
}
|
|
}
|
|
|
|
// listFiles calls listFilesIn on a list of paths.
|
|
func (w *watcher) listFiles(force bool) error {
|
|
var errs []error
|
|
for _, base := range w.bases {
|
|
if err := w.listFilesIn(base, force); err != nil {
|
|
errs = append(errs, err)
|
|
}
|
|
}
|
|
return merr.NewErrors(errs...)
|
|
}
|
|
|
|
// listFilesIn recursively processes all files within a path, and checks them against the disable and skip lists. Files found that
|
|
// are not on either list are loaded as Addons and applied to the cluster.
|
|
func (w *watcher) listFilesIn(base string, force bool) error {
|
|
files := map[string]os.FileInfo{}
|
|
if err := filepath.Walk(base, func(path string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
files[path] = info
|
|
return nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Make a map of .skip files - these are used later to indicate that a given file should be ignored
|
|
// For example, 'addon.yaml.skip' will cause 'addon.yaml' to be ignored completely - unless it is also
|
|
// disabled, since disable processing happens first.
|
|
skips := map[string]bool{}
|
|
keys := make([]string, len(files))
|
|
keyIndex := 0
|
|
for path, file := range files {
|
|
if strings.HasSuffix(file.Name(), ".skip") {
|
|
skips[strings.TrimSuffix(file.Name(), ".skip")] = true
|
|
}
|
|
keys[keyIndex] = path
|
|
keyIndex++
|
|
}
|
|
sort.Strings(keys)
|
|
|
|
var errs []error
|
|
for _, path := range keys {
|
|
// Disabled files are not just skipped, but actively deleted from the filesystem
|
|
if shouldDisableFile(base, path, w.disables) {
|
|
if err := w.delete(path); err != nil {
|
|
errs = append(errs, errors2.Wrapf(err, "failed to delete %s", path))
|
|
}
|
|
continue
|
|
}
|
|
// Skipped files are just ignored
|
|
if shouldSkipFile(files[path].Name(), skips) {
|
|
continue
|
|
}
|
|
modTime := files[path].ModTime()
|
|
if !force && modTime.Equal(w.modTime[path]) {
|
|
continue
|
|
}
|
|
if err := w.deploy(path, !force); err != nil {
|
|
errs = append(errs, errors2.Wrapf(err, "failed to process %s", path))
|
|
} else {
|
|
w.modTime[path] = modTime
|
|
}
|
|
}
|
|
|
|
return merr.NewErrors(errs...)
|
|
}
|
|
|
|
// deploy loads yaml from a manifest on disk, creates an AddOn resource to track its application, and then applies
|
|
// all resources contained within to the cluster.
|
|
func (w *watcher) deploy(path string, compareChecksum bool) error {
|
|
name := basename(path)
|
|
addon, err := w.getOrCreateAddon(name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
addon.Spec.Source = path
|
|
|
|
// Create the new Addon now so that we can use it to report Events when parsing/applying the manifest
|
|
// Events need the UID and ObjectRevision set to function properly
|
|
if addon.UID == "" {
|
|
newAddon, err := w.addons.Create(&addon)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
addon = *newAddon
|
|
}
|
|
|
|
content, err := os.ReadFile(path)
|
|
if err != nil {
|
|
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ReadManifestFailed", "Read manifest at %q failed: %v", path, err)
|
|
return err
|
|
}
|
|
|
|
checksum := checksum(content)
|
|
if compareChecksum && checksum == addon.Spec.Checksum {
|
|
logrus.Debugf("Skipping existing deployment of %s, check=%v, checksum %s=%s", path, compareChecksum, checksum, addon.Spec.Checksum)
|
|
return nil
|
|
}
|
|
|
|
// Attempt to parse the YAML/JSON into objects. Failure at this point would be due to bad file content - not YAML/JSON,
|
|
// YAML/JSON that can't be converted to Kubernetes objects, etc.
|
|
objects, err := objectSet(content)
|
|
if err != nil {
|
|
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ParseManifestFailed", "Parse manifest at %q failed: %v", path, err)
|
|
return err
|
|
}
|
|
|
|
// Merge GVK list early for validation
|
|
addonGVKs := objects.GVKs()
|
|
for _, gvkString := range strings.Split(addon.Annotations[GVKAnnotation], gvkSep) {
|
|
if gvk, err := getGVK(gvkString); err == nil {
|
|
addonGVKs = append(addonGVKs, *gvk)
|
|
}
|
|
}
|
|
|
|
// Ensure that we don't try to prune using GVKs that the server doesn't have.
|
|
// This can happen when CRDs are removed or when core types are removed - PodSecurityPolicy, for example.
|
|
addonGVKs, err = w.validateGVKs(addonGVKs)
|
|
if err != nil {
|
|
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ValidateManifestFailed", "Validate GVKs for manifest at %q failed: %v", path, err)
|
|
return err
|
|
}
|
|
|
|
// Attempt to apply the changes. Failure at this point would be due to more complicated issues - invalid changes to
|
|
// existing objects, rejected by validating webhooks, etc.
|
|
// WithGVK searches for objects using both GVKs currently listed in the manifest, as well as GVKs previously
|
|
// applied. This ensures that objects don't get orphaned when they are removed from the file - if the apply
|
|
// doesn't know to search that GVK for owner references, it won't find and delete them.
|
|
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "ApplyingManifest", "Applying manifest at %q", path)
|
|
|
|
if err := w.apply.WithOwner(&addon).WithGVK(addonGVKs...).Apply(objects); err != nil {
|
|
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ApplyManifestFailed", "Applying manifest at %q failed: %v", path, err)
|
|
return err
|
|
}
|
|
|
|
// Emit event, Update Addon checksum and GVKs only if apply was successful
|
|
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "AppliedManifest", "Applied manifest at %q", path)
|
|
if addon.Annotations == nil {
|
|
addon.Annotations = map[string]string{}
|
|
}
|
|
addon.Spec.Checksum = checksum
|
|
addon.Annotations[GVKAnnotation] = getGVKString(objects.GVKs())
|
|
_, err = w.addons.Update(&addon)
|
|
return err
|
|
}
|
|
|
|
// delete completely removes both a manifest, and any resources that it did or would have created. The manifest is
|
|
// parsed, and any resources it specified are deleted. Finally, the file itself is removed from disk.
|
|
func (w *watcher) delete(path string) error {
|
|
name := basename(path)
|
|
addon, err := w.getOrCreateAddon(name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
addonGVKs := []schema.GroupVersionKind{}
|
|
for _, gvkString := range strings.Split(addon.Annotations[GVKAnnotation], gvkSep) {
|
|
if gvk, err := getGVK(gvkString); err == nil {
|
|
addonGVKs = append(addonGVKs, *gvk)
|
|
}
|
|
}
|
|
|
|
content, err := os.ReadFile(path)
|
|
if err != nil {
|
|
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ReadManifestFailed", "Read manifest at %q failed: %v", path, err)
|
|
} else {
|
|
if o, err := objectSet(content); err != nil {
|
|
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ParseManifestFailed", "Parse manifest at %q failed: %v", path, err)
|
|
} else {
|
|
// Search for objects using both GVKs currently listed in the file, as well as GVKs previously applied.
|
|
// This ensures that any conflicts between competing deploy controllers are handled properly.
|
|
addonGVKs = append(addonGVKs, o.GVKs()...)
|
|
}
|
|
}
|
|
|
|
// Ensure that we don't try to delete using GVKs that the server doesn't have.
|
|
// This can happen when CRDs are removed or when core types are removed - PodSecurityPolicy, for example.
|
|
addonGVKs, err = w.validateGVKs(addonGVKs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// ensure that the addon is completely removed before deleting the objectSet,
|
|
// so return when err == nil, otherwise pods may get stuck terminating
|
|
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "DeletingManifest", "Deleting manifest at %q", path)
|
|
if err := w.addons.Delete(addon.Namespace, addon.Name, &metav1.DeleteOptions{}); err == nil || !errors.IsNotFound(err) {
|
|
return err
|
|
}
|
|
|
|
// apply an empty set with owner & gvk data to delete
|
|
if err := w.apply.WithOwner(&addon).WithGVK(addonGVKs...).ApplyObjects(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return os.Remove(path)
|
|
}
|
|
|
|
// getOrCreateAddon attempts to get an Addon by name from the addon namespace, and creates a new one
|
|
// if it cannot be found.
|
|
func (w *watcher) getOrCreateAddon(name string) (apisv1.Addon, error) {
|
|
addon, err := w.addonCache.Get(metav1.NamespaceSystem, name)
|
|
if errors.IsNotFound(err) {
|
|
addon = apisv1.NewAddon(metav1.NamespaceSystem, name, apisv1.Addon{})
|
|
} else if err != nil {
|
|
return apisv1.Addon{}, err
|
|
}
|
|
return *addon, nil
|
|
}
|
|
|
|
// validateGVKs removes from the list any GVKs that the server does not support
|
|
func (w *watcher) validateGVKs(addonGVKs []schema.GroupVersionKind) ([]schema.GroupVersionKind, error) {
|
|
gvks := []schema.GroupVersionKind{}
|
|
for _, gvk := range addonGVKs {
|
|
found, err := w.serverHasGVK(gvk)
|
|
if err != nil {
|
|
return gvks, err
|
|
}
|
|
if found {
|
|
gvks = append(gvks, gvk)
|
|
}
|
|
}
|
|
return gvks, nil
|
|
}
|
|
|
|
// serverHasGVK uses a positive cache of GVKs that the cluster is known to have supported at some
|
|
// point in time. Note this may fail to filter out GVKs that are removed from the cluster after
|
|
// startup (for example, if CRDs are deleted) - but the Wrangler DesiredSet cache has the same issue,
|
|
// so it should be fine.
|
|
func (w *watcher) serverHasGVK(gvk schema.GroupVersionKind) (bool, error) {
|
|
w.Lock()
|
|
defer w.Unlock()
|
|
|
|
if found, ok := w.gvkCache[gvk]; ok {
|
|
return found, nil
|
|
}
|
|
|
|
resources, err := w.discovery.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
|
|
if err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
return false, nil
|
|
}
|
|
return false, err
|
|
}
|
|
|
|
// Cache all Kinds for this GroupVersion to save on future lookups
|
|
for _, resource := range resources.APIResources {
|
|
// Resources in the requested GV are returned with empty GroupVersion.
|
|
// Subresources with different GV may also be returned, but we aren't interested in those.
|
|
if resource.Group == "" && resource.Version == "" {
|
|
w.gvkCache[gvk.GroupVersion().WithKind(resource.Kind)] = true
|
|
}
|
|
}
|
|
|
|
return w.gvkCache[gvk], nil
|
|
}
|
|
|
|
// objectSet returns a new ObjectSet containing all resources from a given yaml chunk
|
|
func objectSet(content []byte) (*objectset.ObjectSet, error) {
|
|
objs, err := yamlToObjects(bytes.NewBuffer(content))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return objectset.NewObjectSet(objs...), nil
|
|
}
|
|
|
|
// basename returns a file's basename by returning everything before the first period
|
|
func basename(path string) string {
|
|
name := filepath.Base(path)
|
|
return strings.SplitN(name, ".", 2)[0]
|
|
}
|
|
|
|
// checksum returns the hex-encoded SHA256 sum of a byte slice
|
|
func checksum(bytes []byte) string {
|
|
d := sha256.Sum256(bytes)
|
|
return hex.EncodeToString(d[:])
|
|
}
|
|
|
|
// isEmptyYaml returns true if a chunk of YAML contains nothing but whitespace, comments, or document separators
|
|
func isEmptyYaml(yaml []byte) bool {
|
|
isEmpty := true
|
|
lines := bytes.Split(yaml, []byte("\n"))
|
|
for _, l := range lines {
|
|
s := bytes.TrimSpace(l)
|
|
if string(s) != "---" && !bytes.HasPrefix(s, []byte("#")) && string(s) != "" {
|
|
isEmpty = false
|
|
}
|
|
}
|
|
return isEmpty
|
|
}
|
|
|
|
// yamlToObjects returns an object slice yielded from documents in a chunk of YAML
|
|
func yamlToObjects(in io.Reader) ([]runtime.Object, error) {
|
|
var result []runtime.Object
|
|
reader := yamlDecoder.NewYAMLReader(bufio.NewReaderSize(in, 4096))
|
|
for {
|
|
raw, err := reader.Read()
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !isEmptyYaml(raw) {
|
|
obj, err := toObjects(raw)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result = append(result, obj...)
|
|
}
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// Returns one or more objects from a single YAML document
|
|
func toObjects(bytes []byte) ([]runtime.Object, error) {
|
|
bytes, err := yamlDecoder.ToJSON(bytes)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
obj, _, err := unstructured.UnstructuredJSONScheme.Decode(bytes, nil, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if l, ok := obj.(*unstructured.UnstructuredList); ok {
|
|
var result []runtime.Object
|
|
for _, obj := range l.Items {
|
|
copy := obj
|
|
result = append(result, ©)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
return []runtime.Object{obj}, nil
|
|
}
|
|
|
|
// Returns true if a file should be skipped. Skips anything from the provided skip map,
|
|
// anything that is a dotfile, and anything that does not have a json/yaml/yml extension.
|
|
func shouldSkipFile(fileName string, skips map[string]bool) bool {
|
|
switch {
|
|
case strings.HasPrefix(fileName, "."):
|
|
return true
|
|
case skips[fileName]:
|
|
return true
|
|
case util.HasSuffixI(fileName, ".yaml", ".yml", ".json"):
|
|
return false
|
|
default:
|
|
return true
|
|
}
|
|
}
|
|
|
|
// Returns true if a file should be disabled, by checking the file basename against a disables map.
|
|
// only json/yaml files are checked.
|
|
func shouldDisableFile(base, fileName string, disables map[string]bool) bool {
|
|
// Check to see if the file is in a subdirectory that is in the disables map.
|
|
// If a file is nested several levels deep, checks 'parent1', 'parent1/parent2', 'parent1/parent2/parent3', etc.
|
|
relFile := strings.TrimPrefix(fileName, base)
|
|
namePath := strings.Split(relFile, string(os.PathSeparator))
|
|
for i := 1; i < len(namePath); i++ {
|
|
subPath := filepath.Join(namePath[0:i]...)
|
|
if disables[subPath] {
|
|
return true
|
|
}
|
|
}
|
|
if !util.HasSuffixI(fileName, ".yaml", ".yml", ".json") {
|
|
return false
|
|
}
|
|
// Check the basename against the disables map
|
|
baseFile := filepath.Base(fileName)
|
|
suffix := filepath.Ext(baseFile)
|
|
baseName := strings.TrimSuffix(baseFile, suffix)
|
|
return disables[baseName]
|
|
}
|
|
|
|
func getGVK(s string) (*schema.GroupVersionKind, error) {
|
|
parts := strings.Split(s, ", Kind=")
|
|
if len(parts) != 2 {
|
|
return nil, fmt.Errorf("invalid GVK format: %s", s)
|
|
}
|
|
gvk := &schema.GroupVersionKind{}
|
|
gvk.Group, gvk.Version = kv.Split(parts[0], "/")
|
|
gvk.Kind = parts[1]
|
|
return gvk, nil
|
|
}
|
|
|
|
func getGVKString(gvks []schema.GroupVersionKind) string {
|
|
strs := make([]string, len(gvks))
|
|
for i, gvk := range gvks {
|
|
strs[i] = gvk.String()
|
|
}
|
|
return strings.Join(strs, gvkSep)
|
|
}
|