mirror of
https://github.com/k3s-io/k3s.git
synced 2024-06-07 19:41:36 +00:00
Create CRDs with schema
Fixes an issue where CRDs were being created without schema, allowing resources with invalid content to be created, later stalling the controller ListWatch event channel when the invalid resources could not be deserialized. This also requires moving Addon GVK tracking from a status field to an annotation, as the GroupVersionKind type has special handling internal to Kubernetes that prevents it from being serialized to the CRD when schema validation is enabled. Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
This commit is contained in:
parent
bc5b42c279
commit
ad41fb8c96
2
go.mod
2
go.mod
@ -124,7 +124,7 @@ require (
|
||||
github.com/rancher/lasso v0.0.0-20221227210133-6ea88ca2fbcc
|
||||
github.com/rancher/remotedialer v0.3.0
|
||||
github.com/rancher/wharfie v0.5.3
|
||||
github.com/rancher/wrangler v1.1.1
|
||||
github.com/rancher/wrangler v1.1.1-0.20230419173538-80fdf092be3b
|
||||
github.com/robfig/cron/v3 v3.0.1
|
||||
github.com/rootless-containers/rootlesskit v1.0.1
|
||||
github.com/sirupsen/logrus v1.9.0
|
||||
|
4
go.sum
4
go.sum
@ -939,8 +939,8 @@ github.com/rancher/remotedialer v0.3.0 h1:y1EO8JCsgZo0RcqTUp6U8FXcBAv27R+TLnWRcp
|
||||
github.com/rancher/remotedialer v0.3.0/go.mod h1:BwwztuvViX2JrLLUwDlsYt5DiyUwHLlzynRwkZLAY0Q=
|
||||
github.com/rancher/wharfie v0.5.3 h1:6hiO26H7YTgChbLAE6JppxFRjaH3tbKfMItv/LqV0Q0=
|
||||
github.com/rancher/wharfie v0.5.3/go.mod h1:Ebpai7digxegLroBseeC54XRBt5we3DgFS6kAE2ho+o=
|
||||
github.com/rancher/wrangler v1.1.1 h1:wmqUwqc2M7ADfXnBCJTFkTB5ZREWpD78rnZMzmxwMvM=
|
||||
github.com/rancher/wrangler v1.1.1/go.mod h1:ioVbKupzcBOdzsl55MvEDN0R1wdGggj8iNCYGFI5JvM=
|
||||
github.com/rancher/wrangler v1.1.1-0.20230419173538-80fdf092be3b h1:rs3WYld8iaRcSzCmM/CrCIVz9uVgfd96o7FsufIdoVI=
|
||||
github.com/rancher/wrangler v1.1.1-0.20230419173538-80fdf092be3b/go.mod h1:D6Tu6oVX8aGtCHsMCtYaysgVK3ad920MTSeAu7rzb5U=
|
||||
github.com/rasky/go-xdr v0.0.0-20170217172119-4930550ba2e2/go.mod h1:Nfe4efndBz4TibWycNE+lqyJZiMX4ycx+QKV8Ta0f/o=
|
||||
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
|
||||
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
|
||||
|
@ -2,7 +2,6 @@ package v1
|
||||
|
||||
import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
)
|
||||
|
||||
// +genclient
|
||||
@ -12,15 +11,10 @@ type Addon struct {
|
||||
metav1.TypeMeta `json:",inline"`
|
||||
metav1.ObjectMeta `json:"metadata,omitempty"`
|
||||
|
||||
Spec AddonSpec `json:"spec,omitempty"`
|
||||
Status AddonStatus `json:"status,omitempty"`
|
||||
Spec AddonSpec `json:"spec,omitempty"`
|
||||
}
|
||||
|
||||
type AddonSpec struct {
|
||||
Source string `json:"source,omitempty"`
|
||||
Checksum string `json:"checksum,omitempty"`
|
||||
}
|
||||
|
||||
type AddonStatus struct {
|
||||
GVKs []schema.GroupVersionKind `json:"gvks,omitempty"`
|
||||
}
|
||||
|
@ -23,7 +23,6 @@ package v1
|
||||
|
||||
import (
|
||||
runtime "k8s.io/apimachinery/pkg/runtime"
|
||||
schema "k8s.io/apimachinery/pkg/runtime/schema"
|
||||
)
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
@ -32,7 +31,6 @@ func (in *Addon) DeepCopyInto(out *Addon) {
|
||||
out.TypeMeta = in.TypeMeta
|
||||
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
|
||||
out.Spec = in.Spec
|
||||
in.Status.DeepCopyInto(&out.Status)
|
||||
return
|
||||
}
|
||||
|
||||
@ -102,24 +100,3 @@ func (in *AddonSpec) DeepCopy() *AddonSpec {
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *AddonStatus) DeepCopyInto(out *AddonStatus) {
|
||||
*out = *in
|
||||
if in.GVKs != nil {
|
||||
in, out := &in.GVKs, &out.GVKs
|
||||
*out = make([]schema.GroupVersionKind, len(*in))
|
||||
copy(*out, *in)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AddonStatus.
|
||||
func (in *AddonStatus) DeepCopy() *AddonStatus {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(AddonStatus)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
15
pkg/crd/crds.go
Normal file
15
pkg/crd/crds.go
Normal file
@ -0,0 +1,15 @@
|
||||
package crd
|
||||
|
||||
import (
|
||||
v1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
|
||||
"github.com/rancher/wrangler/pkg/crd"
|
||||
)
|
||||
|
||||
func List() []crd.CRD {
|
||||
addon := crd.NamespacedType("Addon.k3s.cattle.io/v1").
|
||||
WithSchemaFromStruct(v1.Addon{}).
|
||||
WithColumn("Source", ".spec.source").
|
||||
WithColumn("Checksum", ".spec.checksum")
|
||||
|
||||
return []crd.CRD{addon}
|
||||
}
|
@ -6,6 +6,7 @@ import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@ -20,6 +21,7 @@ import (
|
||||
pkgutil "github.com/k3s-io/k3s/pkg/util"
|
||||
errors2 "github.com/pkg/errors"
|
||||
"github.com/rancher/wrangler/pkg/apply"
|
||||
"github.com/rancher/wrangler/pkg/kv"
|
||||
"github.com/rancher/wrangler/pkg/merr"
|
||||
"github.com/rancher/wrangler/pkg/objectset"
|
||||
"github.com/sirupsen/logrus"
|
||||
@ -38,7 +40,9 @@ import (
|
||||
|
||||
const (
|
||||
ControllerName = "deploy"
|
||||
GVKAnnotation = "addon.k3s.cattle.io/gvks"
|
||||
startKey = "_start_"
|
||||
gvkSep = ";"
|
||||
)
|
||||
|
||||
// WatchFiles sets up an OnChange callback to start a periodic goroutine to watch files for changes once the controller has started up.
|
||||
@ -206,11 +210,17 @@ func (w *watcher) deploy(path string, compareChecksum bool) error {
|
||||
}
|
||||
|
||||
// Merge GVK list early for validation
|
||||
addon.Status.GVKs = append(addon.Status.GVKs, objects.GVKs()...)
|
||||
addonGVKs := objects.GVKs()
|
||||
for _, gvkString := range strings.Split(addon.Annotations[GVKAnnotation], gvkSep) {
|
||||
if gvk, err := getGVK(gvkString); err == nil {
|
||||
addonGVKs = append(addonGVKs, *gvk)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that we don't try to prune using GVKs that the server doesn't have.
|
||||
// This can happen when CRDs are removed or when core types are removed - PodSecurityPolicy, for example.
|
||||
if err := w.validateGVKs(&addon); err != nil {
|
||||
addonGVKs, err = w.validateGVKs(addonGVKs)
|
||||
if err != nil {
|
||||
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ValidateManifestFailed", "Validate GVKs for manifest at %q failed: %v", path, err)
|
||||
return err
|
||||
}
|
||||
@ -222,15 +232,18 @@ func (w *watcher) deploy(path string, compareChecksum bool) error {
|
||||
// doesn't know to search that GVK for owner references, it won't find and delete them.
|
||||
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "ApplyingManifest", "Applying manifest at %q", path)
|
||||
|
||||
if err := w.apply.WithOwner(&addon).WithGVK(addon.Status.GVKs...).Apply(objects); err != nil {
|
||||
if err := w.apply.WithOwner(&addon).WithGVK(addonGVKs...).Apply(objects); err != nil {
|
||||
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ApplyManifestFailed", "Applying manifest at %q failed: %v", path, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Emit event, Update Addon checksum and GVKs only if apply was successful
|
||||
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "AppliedManifest", "Applied manifest at %q", path)
|
||||
if addon.Annotations == nil {
|
||||
addon.Annotations = map[string]string{}
|
||||
}
|
||||
addon.Spec.Checksum = checksum
|
||||
addon.Status.GVKs = objects.GVKs()
|
||||
addon.Annotations[GVKAnnotation] = getGVKString(objects.GVKs())
|
||||
_, err = w.addons.Update(&addon)
|
||||
return err
|
||||
}
|
||||
@ -244,6 +257,13 @@ func (w *watcher) delete(path string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
addonGVKs := []schema.GroupVersionKind{}
|
||||
for _, gvkString := range strings.Split(addon.Annotations[GVKAnnotation], gvkSep) {
|
||||
if gvk, err := getGVK(gvkString); err == nil {
|
||||
addonGVKs = append(addonGVKs, *gvk)
|
||||
}
|
||||
}
|
||||
|
||||
content, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ReadManifestFailed", "Read manifest at %q failed: %v", path, err)
|
||||
@ -253,13 +273,14 @@ func (w *watcher) delete(path string) error {
|
||||
} else {
|
||||
// Search for objects using both GVKs currently listed in the file, as well as GVKs previously applied.
|
||||
// This ensures that any conflicts between competing deploy controllers are handled properly.
|
||||
addon.Status.GVKs = append(addon.Status.GVKs, o.GVKs()...)
|
||||
addonGVKs = append(addonGVKs, o.GVKs()...)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that we don't try to delete using GVKs that the server doesn't have.
|
||||
// This can happen when CRDs are removed or when core types are removed - PodSecurityPolicy, for example.
|
||||
if err := w.validateGVKs(&addon); err != nil {
|
||||
addonGVKs, err = w.validateGVKs(addonGVKs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -271,7 +292,7 @@ func (w *watcher) delete(path string) error {
|
||||
}
|
||||
|
||||
// apply an empty set with owner & gvk data to delete
|
||||
if err := w.apply.WithOwner(&addon).WithGVK(addon.Status.GVKs...).ApplyObjects(); err != nil {
|
||||
if err := w.apply.WithOwner(&addon).WithGVK(addonGVKs...).ApplyObjects(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -290,22 +311,19 @@ func (w *watcher) getOrCreateAddon(name string) (apisv1.Addon, error) {
|
||||
return *addon, nil
|
||||
}
|
||||
|
||||
// validateGVKs removes from the Addon status any GVKs that the server does not support
|
||||
func (w *watcher) validateGVKs(addon *apisv1.Addon) error {
|
||||
// validateGVKs removes from the list any GVKs that the server does not support
|
||||
func (w *watcher) validateGVKs(addonGVKs []schema.GroupVersionKind) ([]schema.GroupVersionKind, error) {
|
||||
gvks := []schema.GroupVersionKind{}
|
||||
for _, gvk := range addon.Status.GVKs {
|
||||
for _, gvk := range addonGVKs {
|
||||
found, err := w.serverHasGVK(gvk)
|
||||
if err != nil {
|
||||
return err
|
||||
return gvks, err
|
||||
}
|
||||
if found {
|
||||
gvks = append(gvks, gvk)
|
||||
} else {
|
||||
logrus.Warnf("Pruned unknown GVK from %s %s/%s: %s", addon.TypeMeta.GroupVersionKind(), addon.Namespace, addon.Name, gvk)
|
||||
}
|
||||
}
|
||||
addon.Status.GVKs = gvks
|
||||
return nil
|
||||
return gvks, nil
|
||||
}
|
||||
|
||||
// serverHasGVK uses a positive cache of GVKs that the cluster is known to have supported at some
|
||||
@ -462,3 +480,22 @@ func shouldDisableFile(base, fileName string, disables map[string]bool) bool {
|
||||
baseName := strings.TrimSuffix(baseFile, suffix)
|
||||
return disables[baseName]
|
||||
}
|
||||
|
||||
func getGVK(s string) (*schema.GroupVersionKind, error) {
|
||||
parts := strings.Split(s, ", Kind=")
|
||||
if len(parts) != 2 {
|
||||
return nil, fmt.Errorf("invalid GVK format: %s", s)
|
||||
}
|
||||
gvk := &schema.GroupVersionKind{}
|
||||
gvk.Group, gvk.Version = kv.Split(parts[0], "/")
|
||||
gvk.Kind = parts[1]
|
||||
return gvk, nil
|
||||
}
|
||||
|
||||
func getGVKString(gvks []schema.GroupVersionKind) string {
|
||||
strs := make([]string, len(gvks))
|
||||
for i, gvk := range gvks {
|
||||
strs[i] = gvk.String()
|
||||
}
|
||||
return strings.Join(strs, gvkSep)
|
||||
}
|
||||
|
@ -40,7 +40,6 @@ type AddonsGetter interface {
|
||||
type AddonInterface interface {
|
||||
Create(ctx context.Context, addon *v1.Addon, opts metav1.CreateOptions) (*v1.Addon, error)
|
||||
Update(ctx context.Context, addon *v1.Addon, opts metav1.UpdateOptions) (*v1.Addon, error)
|
||||
UpdateStatus(ctx context.Context, addon *v1.Addon, opts metav1.UpdateOptions) (*v1.Addon, error)
|
||||
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
|
||||
DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
|
||||
Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Addon, error)
|
||||
@ -136,22 +135,6 @@ func (c *addons) Update(ctx context.Context, addon *v1.Addon, opts metav1.Update
|
||||
return
|
||||
}
|
||||
|
||||
// UpdateStatus was generated because the type contains a Status member.
|
||||
// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus().
|
||||
func (c *addons) UpdateStatus(ctx context.Context, addon *v1.Addon, opts metav1.UpdateOptions) (result *v1.Addon, err error) {
|
||||
result = &v1.Addon{}
|
||||
err = c.client.Put().
|
||||
Namespace(c.ns).
|
||||
Resource("addons").
|
||||
Name(addon.Name).
|
||||
SubResource("status").
|
||||
VersionedParams(&opts, scheme.ParameterCodec).
|
||||
Body(addon).
|
||||
Do(ctx).
|
||||
Into(result)
|
||||
return
|
||||
}
|
||||
|
||||
// Delete takes name of the addon and deletes it. Returns an error if one occurs.
|
||||
func (c *addons) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error {
|
||||
return c.client.Delete().
|
||||
|
@ -101,18 +101,6 @@ func (c *FakeAddons) Update(ctx context.Context, addon *v1.Addon, opts metav1.Up
|
||||
return obj.(*v1.Addon), err
|
||||
}
|
||||
|
||||
// UpdateStatus was generated because the type contains a Status member.
|
||||
// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus().
|
||||
func (c *FakeAddons) UpdateStatus(ctx context.Context, addon *v1.Addon, opts metav1.UpdateOptions) (*v1.Addon, error) {
|
||||
obj, err := c.Fake.
|
||||
Invokes(testing.NewUpdateSubresourceAction(addonsResource, "status", c.ns, addon), &v1.Addon{})
|
||||
|
||||
if obj == nil {
|
||||
return nil, err
|
||||
}
|
||||
return obj.(*v1.Addon), err
|
||||
}
|
||||
|
||||
// Delete takes name of the addon and deletes it. Returns an error if one occurs.
|
||||
func (c *FakeAddons) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error {
|
||||
_, err := c.Fake.
|
||||
|
@ -23,354 +23,110 @@ import (
|
||||
"time"
|
||||
|
||||
v1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
|
||||
"github.com/rancher/lasso/pkg/client"
|
||||
"github.com/rancher/lasso/pkg/controller"
|
||||
"github.com/rancher/wrangler/pkg/apply"
|
||||
"github.com/rancher/wrangler/pkg/condition"
|
||||
"github.com/rancher/wrangler/pkg/generic"
|
||||
"github.com/rancher/wrangler/pkg/kv"
|
||||
"k8s.io/apimachinery/pkg/api/equality"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
type AddonHandler func(string, *v1.Addon) (*v1.Addon, error)
|
||||
|
||||
// AddonController interface for managing Addon resources.
|
||||
type AddonController interface {
|
||||
generic.ControllerMeta
|
||||
AddonClient
|
||||
|
||||
// OnChange runs the given handler when the controller detects a resource was changed.
|
||||
OnChange(ctx context.Context, name string, sync AddonHandler)
|
||||
|
||||
// OnRemove runs the given handler when the controller detects a resource was changed.
|
||||
OnRemove(ctx context.Context, name string, sync AddonHandler)
|
||||
|
||||
// Enqueue adds the resource with the given name to the worker queue of the controller.
|
||||
Enqueue(namespace, name string)
|
||||
|
||||
// EnqueueAfter runs Enqueue after the provided duration.
|
||||
EnqueueAfter(namespace, name string, duration time.Duration)
|
||||
|
||||
// Cache returns a cache for the resource type T.
|
||||
Cache() AddonCache
|
||||
}
|
||||
|
||||
// AddonClient interface for managing Addon resources in Kubernetes.
|
||||
type AddonClient interface {
|
||||
// Create creates a new object and return the newly created Object or an error.
|
||||
Create(*v1.Addon) (*v1.Addon, error)
|
||||
|
||||
// Update updates the object and return the newly updated Object or an error.
|
||||
Update(*v1.Addon) (*v1.Addon, error)
|
||||
UpdateStatus(*v1.Addon) (*v1.Addon, error)
|
||||
|
||||
// Delete deletes the Object in the given name.
|
||||
Delete(namespace, name string, options *metav1.DeleteOptions) error
|
||||
|
||||
// Get will attempt to retrieve the resource with the specified name.
|
||||
Get(namespace, name string, options metav1.GetOptions) (*v1.Addon, error)
|
||||
|
||||
// List will attempt to find multiple resources.
|
||||
List(namespace string, opts metav1.ListOptions) (*v1.AddonList, error)
|
||||
|
||||
// Watch will start watching resources.
|
||||
Watch(namespace string, opts metav1.ListOptions) (watch.Interface, error)
|
||||
|
||||
// Patch will patch the resource with the matching name.
|
||||
Patch(namespace, name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.Addon, err error)
|
||||
}
|
||||
|
||||
// AddonCache interface for retrieving Addon resources in memory.
|
||||
type AddonCache interface {
|
||||
// Get returns the resources with the specified name from the cache.
|
||||
Get(namespace, name string) (*v1.Addon, error)
|
||||
|
||||
// List will attempt to find resources from the Cache.
|
||||
List(namespace string, selector labels.Selector) ([]*v1.Addon, error)
|
||||
|
||||
// AddIndexer adds a new Indexer to the cache with the provided name.
|
||||
// If you call this after you already have data in the store, the results are undefined.
|
||||
AddIndexer(indexName string, indexer AddonIndexer)
|
||||
|
||||
// GetByIndex returns the stored objects whose set of indexed values
|
||||
// for the named index includes the given indexed value.
|
||||
GetByIndex(indexName, key string) ([]*v1.Addon, error)
|
||||
}
|
||||
|
||||
// AddonHandler is function for performing any potential modifications to a Addon resource.
|
||||
type AddonHandler func(string, *v1.Addon) (*v1.Addon, error)
|
||||
|
||||
// AddonIndexer computes a set of indexed values for the provided object.
|
||||
type AddonIndexer func(obj *v1.Addon) ([]string, error)
|
||||
|
||||
type addonController struct {
|
||||
controller controller.SharedController
|
||||
client *client.Client
|
||||
gvk schema.GroupVersionKind
|
||||
groupResource schema.GroupResource
|
||||
// AddonGenericController wraps wrangler/pkg/generic.Controller so that the function definitions adhere to AddonController interface.
|
||||
type AddonGenericController struct {
|
||||
generic.ControllerInterface[*v1.Addon, *v1.AddonList]
|
||||
}
|
||||
|
||||
func NewAddonController(gvk schema.GroupVersionKind, resource string, namespaced bool, controller controller.SharedControllerFactory) AddonController {
|
||||
c := controller.ForResourceKind(gvk.GroupVersion().WithResource(resource), gvk.Kind, namespaced)
|
||||
return &addonController{
|
||||
controller: c,
|
||||
client: c.Client(),
|
||||
gvk: gvk,
|
||||
groupResource: schema.GroupResource{
|
||||
Group: gvk.Group,
|
||||
Resource: resource,
|
||||
},
|
||||
// OnChange runs the given resource handler when the controller detects a resource was changed.
|
||||
func (c *AddonGenericController) OnChange(ctx context.Context, name string, sync AddonHandler) {
|
||||
c.ControllerInterface.OnChange(ctx, name, generic.ObjectHandler[*v1.Addon](sync))
|
||||
}
|
||||
|
||||
// OnRemove runs the given object handler when the controller detects a resource was changed.
|
||||
func (c *AddonGenericController) OnRemove(ctx context.Context, name string, sync AddonHandler) {
|
||||
c.ControllerInterface.OnRemove(ctx, name, generic.ObjectHandler[*v1.Addon](sync))
|
||||
}
|
||||
|
||||
// Cache returns a cache of resources in memory.
|
||||
func (c *AddonGenericController) Cache() AddonCache {
|
||||
return &AddonGenericCache{
|
||||
c.ControllerInterface.Cache(),
|
||||
}
|
||||
}
|
||||
|
||||
func FromAddonHandlerToHandler(sync AddonHandler) generic.Handler {
|
||||
return func(key string, obj runtime.Object) (ret runtime.Object, err error) {
|
||||
var v *v1.Addon
|
||||
if obj == nil {
|
||||
v, err = sync(key, nil)
|
||||
} else {
|
||||
v, err = sync(key, obj.(*v1.Addon))
|
||||
}
|
||||
if v == nil {
|
||||
return nil, err
|
||||
}
|
||||
return v, err
|
||||
}
|
||||
// AddonGenericCache wraps wrangler/pkg/generic.Cache so the function definitions adhere to AddonCache interface.
|
||||
type AddonGenericCache struct {
|
||||
generic.CacheInterface[*v1.Addon]
|
||||
}
|
||||
|
||||
func (c *addonController) Updater() generic.Updater {
|
||||
return func(obj runtime.Object) (runtime.Object, error) {
|
||||
newObj, err := c.Update(obj.(*v1.Addon))
|
||||
if newObj == nil {
|
||||
return nil, err
|
||||
}
|
||||
return newObj, err
|
||||
}
|
||||
}
|
||||
|
||||
func UpdateAddonDeepCopyOnChange(client AddonClient, obj *v1.Addon, handler func(obj *v1.Addon) (*v1.Addon, error)) (*v1.Addon, error) {
|
||||
if obj == nil {
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
copyObj := obj.DeepCopy()
|
||||
newObj, err := handler(copyObj)
|
||||
if newObj != nil {
|
||||
copyObj = newObj
|
||||
}
|
||||
if obj.ResourceVersion == copyObj.ResourceVersion && !equality.Semantic.DeepEqual(obj, copyObj) {
|
||||
return client.Update(copyObj)
|
||||
}
|
||||
|
||||
return copyObj, err
|
||||
}
|
||||
|
||||
func (c *addonController) AddGenericHandler(ctx context.Context, name string, handler generic.Handler) {
|
||||
c.controller.RegisterHandler(ctx, name, controller.SharedControllerHandlerFunc(handler))
|
||||
}
|
||||
|
||||
func (c *addonController) AddGenericRemoveHandler(ctx context.Context, name string, handler generic.Handler) {
|
||||
c.AddGenericHandler(ctx, name, generic.NewRemoveHandler(name, c.Updater(), handler))
|
||||
}
|
||||
|
||||
func (c *addonController) OnChange(ctx context.Context, name string, sync AddonHandler) {
|
||||
c.AddGenericHandler(ctx, name, FromAddonHandlerToHandler(sync))
|
||||
}
|
||||
|
||||
func (c *addonController) OnRemove(ctx context.Context, name string, sync AddonHandler) {
|
||||
c.AddGenericHandler(ctx, name, generic.NewRemoveHandler(name, c.Updater(), FromAddonHandlerToHandler(sync)))
|
||||
}
|
||||
|
||||
func (c *addonController) Enqueue(namespace, name string) {
|
||||
c.controller.Enqueue(namespace, name)
|
||||
}
|
||||
|
||||
func (c *addonController) EnqueueAfter(namespace, name string, duration time.Duration) {
|
||||
c.controller.EnqueueAfter(namespace, name, duration)
|
||||
}
|
||||
|
||||
func (c *addonController) Informer() cache.SharedIndexInformer {
|
||||
return c.controller.Informer()
|
||||
}
|
||||
|
||||
func (c *addonController) GroupVersionKind() schema.GroupVersionKind {
|
||||
return c.gvk
|
||||
}
|
||||
|
||||
func (c *addonController) Cache() AddonCache {
|
||||
return &addonCache{
|
||||
indexer: c.Informer().GetIndexer(),
|
||||
resource: c.groupResource,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *addonController) Create(obj *v1.Addon) (*v1.Addon, error) {
|
||||
result := &v1.Addon{}
|
||||
return result, c.client.Create(context.TODO(), obj.Namespace, obj, result, metav1.CreateOptions{})
|
||||
}
|
||||
|
||||
func (c *addonController) Update(obj *v1.Addon) (*v1.Addon, error) {
|
||||
result := &v1.Addon{}
|
||||
return result, c.client.Update(context.TODO(), obj.Namespace, obj, result, metav1.UpdateOptions{})
|
||||
}
|
||||
|
||||
func (c *addonController) UpdateStatus(obj *v1.Addon) (*v1.Addon, error) {
|
||||
result := &v1.Addon{}
|
||||
return result, c.client.UpdateStatus(context.TODO(), obj.Namespace, obj, result, metav1.UpdateOptions{})
|
||||
}
|
||||
|
||||
func (c *addonController) Delete(namespace, name string, options *metav1.DeleteOptions) error {
|
||||
if options == nil {
|
||||
options = &metav1.DeleteOptions{}
|
||||
}
|
||||
return c.client.Delete(context.TODO(), namespace, name, *options)
|
||||
}
|
||||
|
||||
func (c *addonController) Get(namespace, name string, options metav1.GetOptions) (*v1.Addon, error) {
|
||||
result := &v1.Addon{}
|
||||
return result, c.client.Get(context.TODO(), namespace, name, result, options)
|
||||
}
|
||||
|
||||
func (c *addonController) List(namespace string, opts metav1.ListOptions) (*v1.AddonList, error) {
|
||||
result := &v1.AddonList{}
|
||||
return result, c.client.List(context.TODO(), namespace, result, opts)
|
||||
}
|
||||
|
||||
func (c *addonController) Watch(namespace string, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return c.client.Watch(context.TODO(), namespace, opts)
|
||||
}
|
||||
|
||||
func (c *addonController) Patch(namespace, name string, pt types.PatchType, data []byte, subresources ...string) (*v1.Addon, error) {
|
||||
result := &v1.Addon{}
|
||||
return result, c.client.Patch(context.TODO(), namespace, name, pt, data, result, metav1.PatchOptions{}, subresources...)
|
||||
}
|
||||
|
||||
type addonCache struct {
|
||||
indexer cache.Indexer
|
||||
resource schema.GroupResource
|
||||
}
|
||||
|
||||
func (c *addonCache) Get(namespace, name string) (*v1.Addon, error) {
|
||||
obj, exists, err := c.indexer.GetByKey(namespace + "/" + name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, errors.NewNotFound(c.resource, name)
|
||||
}
|
||||
return obj.(*v1.Addon), nil
|
||||
}
|
||||
|
||||
func (c *addonCache) List(namespace string, selector labels.Selector) (ret []*v1.Addon, err error) {
|
||||
|
||||
err = cache.ListAllByNamespace(c.indexer, namespace, selector, func(m interface{}) {
|
||||
ret = append(ret, m.(*v1.Addon))
|
||||
})
|
||||
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (c *addonCache) AddIndexer(indexName string, indexer AddonIndexer) {
|
||||
utilruntime.Must(c.indexer.AddIndexers(map[string]cache.IndexFunc{
|
||||
indexName: func(obj interface{}) (strings []string, e error) {
|
||||
return indexer(obj.(*v1.Addon))
|
||||
},
|
||||
}))
|
||||
}
|
||||
|
||||
func (c *addonCache) GetByIndex(indexName, key string) (result []*v1.Addon, err error) {
|
||||
objs, err := c.indexer.ByIndex(indexName, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = make([]*v1.Addon, 0, len(objs))
|
||||
for _, obj := range objs {
|
||||
result = append(result, obj.(*v1.Addon))
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type AddonStatusHandler func(obj *v1.Addon, status v1.AddonStatus) (v1.AddonStatus, error)
|
||||
|
||||
type AddonGeneratingHandler func(obj *v1.Addon, status v1.AddonStatus) ([]runtime.Object, v1.AddonStatus, error)
|
||||
|
||||
func RegisterAddonStatusHandler(ctx context.Context, controller AddonController, condition condition.Cond, name string, handler AddonStatusHandler) {
|
||||
statusHandler := &addonStatusHandler{
|
||||
client: controller,
|
||||
condition: condition,
|
||||
handler: handler,
|
||||
}
|
||||
controller.AddGenericHandler(ctx, name, FromAddonHandlerToHandler(statusHandler.sync))
|
||||
}
|
||||
|
||||
func RegisterAddonGeneratingHandler(ctx context.Context, controller AddonController, apply apply.Apply,
|
||||
condition condition.Cond, name string, handler AddonGeneratingHandler, opts *generic.GeneratingHandlerOptions) {
|
||||
statusHandler := &addonGeneratingHandler{
|
||||
AddonGeneratingHandler: handler,
|
||||
apply: apply,
|
||||
name: name,
|
||||
gvk: controller.GroupVersionKind(),
|
||||
}
|
||||
if opts != nil {
|
||||
statusHandler.opts = *opts
|
||||
}
|
||||
controller.OnChange(ctx, name, statusHandler.Remove)
|
||||
RegisterAddonStatusHandler(ctx, controller, condition, name, statusHandler.Handle)
|
||||
}
|
||||
|
||||
type addonStatusHandler struct {
|
||||
client AddonClient
|
||||
condition condition.Cond
|
||||
handler AddonStatusHandler
|
||||
}
|
||||
|
||||
func (a *addonStatusHandler) sync(key string, obj *v1.Addon) (*v1.Addon, error) {
|
||||
if obj == nil {
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
origStatus := obj.Status.DeepCopy()
|
||||
obj = obj.DeepCopy()
|
||||
newStatus, err := a.handler(obj, obj.Status)
|
||||
if err != nil {
|
||||
// Revert to old status on error
|
||||
newStatus = *origStatus.DeepCopy()
|
||||
}
|
||||
|
||||
if a.condition != "" {
|
||||
if errors.IsConflict(err) {
|
||||
a.condition.SetError(&newStatus, "", nil)
|
||||
} else {
|
||||
a.condition.SetError(&newStatus, "", err)
|
||||
}
|
||||
}
|
||||
if !equality.Semantic.DeepEqual(origStatus, &newStatus) {
|
||||
if a.condition != "" {
|
||||
// Since status has changed, update the lastUpdatedTime
|
||||
a.condition.LastUpdated(&newStatus, time.Now().UTC().Format(time.RFC3339))
|
||||
}
|
||||
|
||||
var newErr error
|
||||
obj.Status = newStatus
|
||||
newObj, newErr := a.client.UpdateStatus(obj)
|
||||
if err == nil {
|
||||
err = newErr
|
||||
}
|
||||
if newErr == nil {
|
||||
obj = newObj
|
||||
}
|
||||
}
|
||||
return obj, err
|
||||
}
|
||||
|
||||
type addonGeneratingHandler struct {
|
||||
AddonGeneratingHandler
|
||||
apply apply.Apply
|
||||
opts generic.GeneratingHandlerOptions
|
||||
gvk schema.GroupVersionKind
|
||||
name string
|
||||
}
|
||||
|
||||
func (a *addonGeneratingHandler) Remove(key string, obj *v1.Addon) (*v1.Addon, error) {
|
||||
if obj != nil {
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
obj = &v1.Addon{}
|
||||
obj.Namespace, obj.Name = kv.RSplit(key, "/")
|
||||
obj.SetGroupVersionKind(a.gvk)
|
||||
|
||||
return nil, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
|
||||
WithOwner(obj).
|
||||
WithSetID(a.name).
|
||||
ApplyObjects()
|
||||
}
|
||||
|
||||
func (a *addonGeneratingHandler) Handle(obj *v1.Addon, status v1.AddonStatus) (v1.AddonStatus, error) {
|
||||
if !obj.DeletionTimestamp.IsZero() {
|
||||
return status, nil
|
||||
}
|
||||
|
||||
objs, newStatus, err := a.AddonGeneratingHandler(obj, status)
|
||||
if err != nil {
|
||||
return newStatus, err
|
||||
}
|
||||
|
||||
return newStatus, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
|
||||
WithOwner(obj).
|
||||
WithSetID(a.name).
|
||||
ApplyObjects(objs...)
|
||||
// AddIndexer adds a new Indexer to the cache with the provided name.
|
||||
// If you call this after you already have data in the store, the results are undefined.
|
||||
func (c AddonGenericCache) AddIndexer(indexName string, indexer AddonIndexer) {
|
||||
c.CacheInterface.AddIndexer(indexName, generic.Indexer[*v1.Addon](indexer))
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ package v1
|
||||
import (
|
||||
v1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
|
||||
"github.com/rancher/lasso/pkg/controller"
|
||||
"github.com/rancher/wrangler/pkg/generic"
|
||||
"github.com/rancher/wrangler/pkg/schemes"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
)
|
||||
@ -43,6 +44,8 @@ type version struct {
|
||||
controllerFactory controller.SharedControllerFactory
|
||||
}
|
||||
|
||||
func (c *version) Addon() AddonController {
|
||||
return NewAddonController(schema.GroupVersionKind{Group: "k3s.cattle.io", Version: "v1", Kind: "Addon"}, "addons", true, c.controllerFactory)
|
||||
func (v *version) Addon() AddonController {
|
||||
return &AddonGenericController{
|
||||
generic.NewController[*v1.Addon, *v1.AddonList](schema.GroupVersionKind{Group: "k3s.cattle.io", Version: "v1", Kind: "Addon"}, "addons", true, v.controllerFactory),
|
||||
}
|
||||
}
|
||||
|
@ -6,7 +6,9 @@ import (
|
||||
"os"
|
||||
"runtime"
|
||||
|
||||
helmcrd "github.com/k3s-io/helm-controller/pkg/crd"
|
||||
"github.com/k3s-io/helm-controller/pkg/generated/controllers/helm.cattle.io"
|
||||
addoncrd "github.com/k3s-io/k3s/pkg/crd"
|
||||
"github.com/k3s-io/k3s/pkg/deploy"
|
||||
"github.com/k3s-io/k3s/pkg/generated/controllers/k3s.cattle.io"
|
||||
"github.com/k3s-io/k3s/pkg/version"
|
||||
@ -82,10 +84,8 @@ func crds(ctx context.Context, config *rest.Config) error {
|
||||
return err
|
||||
}
|
||||
|
||||
factory.BatchCreateCRDs(ctx, crd.NamespacedTypes(
|
||||
"Addon.k3s.cattle.io/v1",
|
||||
"HelmChart.helm.cattle.io/v1",
|
||||
"HelmChartConfig.helm.cattle.io/v1")...)
|
||||
types := append(helmcrd.List(), addoncrd.List()...)
|
||||
factory.BatchCreateCRDs(ctx, types...)
|
||||
|
||||
return factory.BatchWait()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user