package ctrlfwk
import (
"context"
"sigs.k8s.io/controller-runtime/pkg/client"
corev1 "k8s.io/api/core/v1"
)
type Context[K client.Object] interface {
context.Context
ImplementsCustomResource[K]
}
type baseContext[K client.Object] struct {
context.Context
CustomResource[K]
}
// NewContext creates a new Context for the given reconciler and base context.
// K is the type of the custom resource being reconciled.
// You can use it as such:
//
// func (reconciler *SecretReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// logger := logf.FromContext(ctx)
// context := ctrlfwk.NewContext(ctx, reconciler)
func NewContext[K client.Object](ctx context.Context, reconciler Reconciler[K]) Context[K] {
return &baseContext[K]{
Context: ctx,
CustomResource: CustomResource[K]{},
}
}
var _ Context[*corev1.Secret] = &baseContext[*corev1.Secret]{}
// ContextWithData is a context that holds additional data of type D along with the base context.
// K is the type of the custom resource being reconciled.
// D is the type of the additional data to be stored in the context.
type ContextWithData[K client.Object, D any] struct {
Context[K]
Data D
}
// NewContextWithData creates a new ContextWithData for the given reconciler, base context, and data.
// K is the type of the custom resource being reconciled.
// D is the type of the additional data to be stored in the context.
// You can use it as such:
//
// func (reconciler *TestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// logger := logf.FromContext(ctx)
// context := ctrlfwk.NewContextWithData(ctx, reconciler, &MyDataType{})
func NewContextWithData[K client.Object, D any](ctx context.Context, reconciler Reconciler[K], data D) *ContextWithData[K, D] {
return &ContextWithData[K, D]{
Context: &baseContext[K]{Context: ctx},
Data: data,
}
}
package ctrlfwk
import (
"fmt"
"reflect"
"time"
"github.com/go-viper/mapstructure/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
func GetContract[K any](object *unstructured.Unstructured, path ...string) (*K, error) {
path = append([]string{"status"}, path...)
// Get the contract from the object using the provided path
contractMap, found, err := unstructured.NestedMap(object.Object, path...)
if err != nil {
return nil, err
}
if !found {
return nil, fmt.Errorf("contract not found at path: %s", path)
}
// Convert using mapstructure
var result K
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: DecodeMetaTime(),
TagName: "json",
Result: &result,
})
if err != nil {
return nil, err
}
err = dec.Decode(contractMap)
if err != nil {
return nil, err
}
return &result, nil
}
var metaTime = reflect.TypeOf(metav1.Time{})
func DecodeMetaTime() mapstructure.DecodeHookFuncType {
return func(from, to reflect.Type, i any) (any, error) {
if to == metaTime {
if t, ok := i.(string); ok {
realTime, err := time.Parse(time.RFC3339, t)
if err != nil {
return nil, err
}
return metav1.NewTime(realTime), nil
}
return nil, fmt.Errorf("expected string, got %T", i)
}
return i, nil
}
}
package ctrlfwk
import (
"reflect"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type ImplementsCustomResource[K client.Object] interface {
GetCleanCustomResource() K
GetCustomResource() K
SetCustomResource(key K)
}
type CustomResource[K client.Object] struct {
cleanObject K
CR K
cleanObjectInitialized bool
crInitialized bool
}
// GetCleanCustomResource gives back the resource that was stored previously unedited of any changes that the resource may have went through,
// It is especially useful to generate patches between the time it was first seen and the second time.
func (cr *CustomResource[K]) GetCleanCustomResource() K {
if cr.cleanObjectInitialized {
return cr.cleanObject.DeepCopyObject().(K)
}
if reflect.ValueOf(cr.cleanObject).IsNil() {
cr.cleanObject = cr.GetCustomResource().DeepCopyObject().(K)
}
cr.cleanObjectInitialized = true
return cr.cleanObject.DeepCopyObject().(K)
}
// GetCustomResource gives back the resource that was stored previously,
// This resource can be edited as it should always be a client.Object which is a pointer to something
func (cr *CustomResource[K]) GetCustomResource() K {
if cr.crInitialized {
return cr.CR
}
if reflect.ValueOf(cr.CR).IsNil() {
cr.CR = reflect.New(reflect.TypeOf(cr.CR).Elem()).Interface().(K)
}
cr.crInitialized = true
return cr.CR
}
// SetCustomResource sets the resource and also the base resource,
// It should only be used once per reconciliation.
func (cr *CustomResource[K]) SetCustomResource(key K) {
cr.CR = key
cr.cleanObject = cr.CR.DeepCopyObject().(K)
cr.crInitialized = true
cr.cleanObjectInitialized = true
}
package ctrlfwk
import (
"fmt"
"reflect"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type GenericDependency[CustomResourceType client.Object, ContextType Context[CustomResourceType]] interface {
ID() string
New() client.Object
Key() types.NamespacedName
Set(obj client.Object)
Get() client.Object
ShouldWaitForReady() bool
ShouldAddManagedByAnnotation() bool
IsReady() bool
IsOptional() bool
Kind() string
// Hooks
BeforeReconcile(ctx ContextType) error
AfterReconcile(ctx ContextType, resource client.Object) error
}
var _ GenericDependency[client.Object, Context[client.Object]] = &Dependency[client.Object, Context[client.Object], client.Object]{}
type Dependency[CustomResourceType client.Object, ContextType Context[CustomResourceType], DependencyType client.Object] struct {
userIdentifier string
isReadyF func(obj DependencyType) bool
output DependencyType
isOptional bool
waitForReady bool
addManagedBy bool
name string
namespace string
// Hooks
beforeReconcileF func(ctx ContextType) error
afterReconcileF func(ctx ContextType, resource DependencyType) error
}
func (c *Dependency[CustomResourceType, ContextType, DependencyType]) New() client.Object {
return NewInstanceOf(c.output)
}
func (c *Dependency[CustomResourceType, ContextType, DependencyType]) Kind() string {
return reflect.TypeOf(c.output).Elem().Name()
}
func (c *Dependency[CustomResourceType, ContextType, DependencyType]) Set(obj client.Object) {
if reflect.TypeOf(c.output) == reflect.TypeOf(obj) {
if reflect.ValueOf(c.output).IsNil() {
c.output = reflect.New(reflect.TypeOf(c.output).Elem()).Interface().(DependencyType)
}
reflect.ValueOf(c.output).Elem().Set(reflect.ValueOf(obj).Elem())
}
}
func (c *Dependency[CustomResourceType, ContextType, DependencyType]) Get() client.Object {
return c.output
}
func (c *Dependency[CustomResourceType, ContextType, DependencyType]) IsOptional() bool {
return c.isOptional
}
func (c *Dependency[CustomResourceType, ContextType, DependencyType]) Key() types.NamespacedName {
return types.NamespacedName{
Name: c.name,
Namespace: c.namespace,
}
}
func (c *Dependency[CustomResourceType, ContextType, DependencyType]) ID() string {
if c.userIdentifier != "" {
return c.userIdentifier
}
return fmt.Sprintf("%v,%v", c.Kind(), c.Key())
}
func (c *Dependency[CustomResourceType, ContextType, DependencyType]) ShouldWaitForReady() bool {
return c.waitForReady
}
func (c *Dependency[CustomResourceType, ContextType, DependencyType]) IsReady() bool {
if c.isReadyF != nil {
return c.isReadyF(c.output)
}
return false
}
func (c *Dependency[CustomResourceType, ContextType, DependencyType]) BeforeReconcile(ctx ContextType) error {
if c.beforeReconcileF != nil {
return c.beforeReconcileF(ctx)
}
return nil
}
func (c *Dependency[CustomResourceType, ContextType, DependencyType]) AfterReconcile(ctx ContextType, resource client.Object) error {
if c.afterReconcileF != nil {
switch typedObj := resource.(type) {
case DependencyType:
return c.afterReconcileF(ctx, typedObj)
default:
var zero DependencyType
return c.afterReconcileF(ctx, zero)
}
}
return nil
}
func (c *Dependency[CustomResourceType, ContextType, DependencyType]) ShouldAddManagedByAnnotation() bool {
return c.addManagedBy
}
package ctrlfwk
import (
"sigs.k8s.io/controller-runtime/pkg/client"
)
// DependencyBuilder provides a fluent builder pattern for creating Dependency instances.
// It enables declarative construction of dependencies with type safety and method chaining.
//
// Type parameters:
// - CustomResourceType: The custom resource that owns this dependency
// - ContextType: The context type containing the custom resource and additional data
// - DependencyType: The Kubernetes resource type this dependency represents
type DependencyBuilder[CustomResourceType client.Object, ContextType Context[CustomResourceType], DependencyType client.Object] struct {
dependency *Dependency[CustomResourceType, ContextType, DependencyType]
}
// NewDependencyBuilder creates a new DependencyBuilder for constructing dependencies
// on external Kubernetes resources.
//
// Dependencies are external resources that your custom resource depends on to function
// correctly. They are resolved during the dependency resolution step of reconciliation.
//
// Parameters:
// - ctx: The context containing the custom resource and additional data
// - _: A zero-value instance used for type inference (e.g., &corev1.Secret{})
//
// The dependency will only be resolved when used with ResolveDependencyStep or
// ResolveDynamicDependenciesStep during reconciliation.
//
// Common use cases:
// - Waiting for secrets or configmaps to be available
// - Ensuring other custom resources are ready
// - Validating external service availability
//
// Example:
//
// // Wait for a secret to contain required data
// dep := NewDependencyBuilder(ctx, &corev1.Secret{}).
// WithName("database-credentials").
// WithNamespace(ctx.GetCustomResource().Namespace).
// WithWaitForReady(true).
// WithIsReadyFunc(func(secret *corev1.Secret) bool {
// return secret.Data["username"] != nil && secret.Data["password"] != nil
// }).
// WithOutput(ctx.Data.DatabaseSecret).
// Build()
func NewDependencyBuilder[
CustomResourceType client.Object,
ContextType Context[CustomResourceType],
DependencyType client.Object,
](
ctx ContextType,
_ DependencyType,
) *DependencyBuilder[CustomResourceType, ContextType, DependencyType] {
return &DependencyBuilder[CustomResourceType, ContextType, DependencyType]{
dependency: &Dependency[CustomResourceType, ContextType, DependencyType]{},
}
}
// WithOutput specifies where to store the resolved dependency resource.
//
// The provided object will be populated with the dependency's data after successful
// resolution. This allows other parts of your reconciliation logic to access the
// dependency's current state.
//
// The output object should be a field in your context's data structure to ensure
// it's accessible throughout the reconciliation process.
//
// Note: Setting an output is optional. If you only need to verify the dependency
// exists and is ready, you can use WithAfterReconcile for post-resolution actions
// without storing the full resource.
//
// Example:
//
// type MyContextData struct {
// DatabaseSecret *corev1.Secret
// }
//
// dep := NewDependencyBuilder(ctx, &corev1.Secret{}).
// WithName("database-creds").
// WithOutput(ctx.Data.DatabaseSecret). // Store resolved secret here
// Build()
//
// This allows you to access ctx.Data.DatabaseSecret later in your reconciliation logic.
func (b *DependencyBuilder[CustomResourceType, ContextType, DependencyType]) WithOutput(obj DependencyType) *DependencyBuilder[CustomResourceType, ContextType, DependencyType] {
b.dependency.output = obj
return b
}
// WithIsReadyFunc defines custom logic to determine if the dependency is ready for use.
//
// The provided function is called with the resolved dependency resource and should
// return true if the dependency meets your readiness criteria, false otherwise.
//
// If no readiness function is provided, the dependency is considered ready as soon
// as it exists in the cluster.
//
// Common readiness patterns:
// - Checking for specific data fields in secrets/configmaps
// - Validating status conditions on custom resources
// - Ensuring required labels or annotations are present
//
// Example:
//
// .WithIsReadyFunc(func(secret *corev1.Secret) bool {
// // Secret is ready when it contains required database credentials
// return secret.Data["host"] != nil &&
// secret.Data["username"] != nil &&
// secret.Data["password"] != nil
// })
func (b *DependencyBuilder[CustomResourceType, ContextType, DependencyType]) WithIsReadyFunc(f func(obj DependencyType) bool) *DependencyBuilder[CustomResourceType, ContextType, DependencyType] {
b.dependency.isReadyF = f
return b
}
// WithOptional configures whether this dependency is required for reconciliation.
//
// When set to true, the dependency resolution will continue even if this dependency
// is missing or not ready. When false (default), missing or unready dependencies
// will cause reconciliation to requeue and wait.
//
// Use optional dependencies for:
// - Feature flags or optional configurations
// - Dependencies that provide enhanced functionality but aren't required
// - Resources that may not exist in all environments
//
// Example:
//
// // Optional feature configuration
// dep := NewDependencyBuilder(ctx, &corev1.ConfigMap{}).
// WithName("optional-features").
// WithOptional(true). // Don't block reconciliation if missing
// Build()
func (b *DependencyBuilder[CustomResourceType, ContextType, DependencyType]) WithOptional(optional bool) *DependencyBuilder[CustomResourceType, ContextType, DependencyType] {
b.dependency.isOptional = optional
return b
}
// WithName specifies the name of the Kubernetes resource to depend on.
//
// This is the metadata.name field of the target resource. The name is required
// for dependency resolution and should match exactly with the existing resource.
//
// Example:
//
// .WithName("database-credentials") // Look for resource named "database-credentials"
func (b *DependencyBuilder[CustomResourceType, ContextType, DependencyType]) WithName(name string) *DependencyBuilder[CustomResourceType, ContextType, DependencyType] {
b.dependency.name = name
return b
}
// WithNamespace specifies the namespace where the dependency resource is located.
//
// This is the metadata.namespace field of the target resource. If not specified,
// the dependency will be looked up in the same namespace as the custom resource.
//
// For cluster-scoped resources, this field is ignored.
//
// Example:
//
// .WithNamespace("kube-system") // Look in kube-system namespace
// .WithNamespace(ctx.GetCustomResource().Namespace) // Same namespace as custom resource
func (b *DependencyBuilder[CustomResourceType, ContextType, DependencyType]) WithNamespace(namespace string) *DependencyBuilder[CustomResourceType, ContextType, DependencyType] {
b.dependency.namespace = namespace
return b
}
// WithWaitForReady determines whether reconciliation should wait for this dependency
// to become ready before proceeding.
//
// When true (recommended), reconciliation will requeue if the dependency exists but
// is not yet ready according to the readiness function. When false, the dependency
// is only checked for existence.
//
// This is particularly useful for:
// - Resources that need initialization time
// - External services that may be temporarily unavailable
// - Custom resources with complex readiness conditions
//
// Example:
//
// .WithWaitForReady(true) // Wait for dependency to be ready, don't just check existence
func (b *DependencyBuilder[CustomResourceType, ContextType, DependencyType]) WithWaitForReady(waitForReady bool) *DependencyBuilder[CustomResourceType, ContextType, DependencyType] {
b.dependency.waitForReady = waitForReady
return b
}
// WithUserIdentifier assigns a custom identifier for this dependency.
//
// This identifier is used for logging, debugging, and distinguishing between
// multiple dependencies of the same type. If not provided, a default identifier
// will be generated based on the resource type and name.
//
// Useful for:
// - Debugging dependency resolution issues
// - Providing meaningful names in logs
// - Distinguishing between similar dependencies
//
// Example:
//
// .WithUserIdentifier("database-connection-secret") // Custom name for logs
func (b *DependencyBuilder[CustomResourceType, ContextType, DependencyType]) WithUserIdentifier(identifier string) *DependencyBuilder[CustomResourceType, ContextType, DependencyType] {
b.dependency.userIdentifier = identifier
return b
}
// WithBeforeReconcile registers a hook function to execute before dependency resolution.
//
// This function is called before attempting to resolve the dependency and can be used
// for setup tasks, validation, or logging. If the function returns an error,
// dependency resolution will be aborted.
//
// Common use cases:
// - Validating preconditions
// - Setting up authentication
// - Logging dependency resolution attempts
//
// Example:
//
// .WithBeforeReconcile(func(ctx MyContext) error {
// logger := ctx.GetLogger()
// logger.Info("Resolving database secret dependency")
// return nil
// })
func (b *DependencyBuilder[CustomResourceType, ContextType, DependencyType]) WithBeforeReconcile(f func(ctx ContextType) error) *DependencyBuilder[CustomResourceType, ContextType, DependencyType] {
b.dependency.beforeReconcileF = f
return b
}
// WithAfterReconcile registers a hook function to execute after successful dependency resolution.
//
// This function is called with the resolved dependency resource and can be used for
// post-processing, validation, or updating your custom resource's status. If the
// function returns an error, the reconciliation will fail.
//
// The resource parameter contains the current state of the resolved dependency.
//
// Common use cases:
// - Extracting and caching dependency data
// - Updating custom resource status with dependency information
// - Triggering additional processing based on dependency state
//
// Example:
//
// .WithAfterReconcile(func(ctx MyContext, secret *corev1.Secret) error {
// // Cache database connection string from secret
// connStr := string(secret.Data["connection-string"])
// ctx.Data.DatabaseConnectionString = connStr
// return updateCustomResourceStatus(ctx, "DatabaseReady", "Connected")
// })
func (b *DependencyBuilder[CustomResourceType, ContextType, DependencyType]) WithAfterReconcile(f func(ctx ContextType, resource DependencyType) error) *DependencyBuilder[CustomResourceType, ContextType, DependencyType] {
b.dependency.afterReconcileF = f
return b
}
// WithReadinessCondition is an alias for WithIsReadyFunc that defines custom readiness logic.
//
// This method provides the same functionality as WithIsReadyFunc but with a more
// descriptive name. Use whichever method name feels more natural in your context.
//
// See WithIsReadyFunc for detailed documentation and examples.
func (b *DependencyBuilder[CustomResourceType, ContextType, DependencyType]) WithReadinessCondition(f func(obj DependencyType) bool) *DependencyBuilder[CustomResourceType, ContextType, DependencyType] {
b.dependency.isReadyF = f
return b
}
// WithAddManagedByAnnotation controls whether to add a "managed-by" annotation
// to the dependency resource.
//
// When enabled, this adds metadata to help identify which controller is managing
// or depending on this resource. This is useful for:
// - Debugging resource relationships
// - Resource lifecycle management
// - Avoiding conflicts between controllers
//
// Also, when enabled, if the reconciler has a Watcher configured, it will automatically
// watch for changes to this dependency resource and trigger reconciliations accordingly.
//
// This is not enabled by default to avoid unnecessary annotations on resources.
//
// The annotation typically follows the format:
//
// "dependencies.ctrlfwk.com/managed-by": "[{'name':'<controller-name>','namespace':'<controller-namespace>','gvk':{'group':'<group>','version':'<version>','kind':'<kind>'}}]"
//
// Example:
//
// .WithAddManagedByAnnotation(true) // Mark this resource as managed by our controller
func (b *DependencyBuilder[CustomResourceType, ContextType, DependencyType]) WithAddManagedByAnnotation(add bool) *DependencyBuilder[CustomResourceType, ContextType, DependencyType] {
b.dependency.addManagedBy = add
return b
}
// Build constructs and returns the final Dependency instance with all configured options.
//
// This method finalizes the builder pattern and creates the dependency that can be
// used in reconciliation steps. The returned dependency contains all the configuration
// specified through the builder methods.
//
// The dependency must be used with appropriate reconciliation steps (such as
// ResolveDynamicDependenciesStep) to actually perform the dependency resolution.
//
// Returns a configured Dependency instance ready for use in reconciliation.
func (b *DependencyBuilder[CustomResourceType, ContextType, DependencyType]) Build() *Dependency[CustomResourceType, ContextType, DependencyType] {
return b.dependency
}
package ctrlfwk
import (
"fmt"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type UntypedDependency[CustomResourceType client.Object, ContextType Context[CustomResourceType]] struct {
*Dependency[CustomResourceType, ContextType, *unstructured.Unstructured]
gvk schema.GroupVersionKind
}
var _ GenericDependency[client.Object, Context[client.Object]] = &UntypedDependency[client.Object, Context[client.Object]]{}
func (c *UntypedDependency[CustomResourceType, ContextType]) New() client.Object {
out := &unstructured.Unstructured{}
out.SetGroupVersionKind(c.gvk)
return out
}
func (c *UntypedDependency[CustomResourceType, ContextType]) Kind() string {
return fmt.Sprintf("Untyped%s", c.gvk.Kind)
}
func (c *UntypedDependency[CustomResourceType, ContextType]) Set(obj client.Object) {
if c.output == nil {
c.output = &unstructured.Unstructured{}
c.output.SetGroupVersionKind(c.gvk)
}
unstructuredObj := obj.(*unstructured.Unstructured)
*c.output = *unstructuredObj
c.output.SetGroupVersionKind(c.gvk)
}
package ctrlfwk
import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// UntypedDependencyBuilder provides a fluent builder pattern for creating dependencies
// on resources that are not known at compile time or don't have Go type definitions.
//
// This builder is useful when working with:
// - Custom Resource Definitions (CRDs) not included in your Go code
// - Third-party resources without Go client types
// - Dynamic resource types determined at runtime
// - Resources from different API groups or versions
//
// Type parameters:
// - CustomResourceType: The custom resource that owns this dependency
// - ContextType: The context type containing the custom resource and additional data
//
// The builder works with unstructured.Unstructured objects, which can represent
// any Kubernetes resource dynamically.
//
// Example:
//
// // Depend on a custom resource defined by a CRD
// gvk := schema.GroupVersionKind{
// Group: "example.com",
// Version: "v1",
// Kind: "Database",
// }
// dep := NewUntypedDependencyBuilder(ctx, gvk).
// WithName("my-database").
// WithNamespace("default").
// WithIsReadyFunc(func(obj *unstructured.Unstructured) bool {
// // Check custom readiness condition
// status, found, _ := unstructured.NestedString(obj.Object, "status", "phase")
// return found && status == "Ready"
// }).
// Build()
type UntypedDependencyBuilder[CustomResourceType client.Object, ContextType Context[CustomResourceType]] struct {
inner *DependencyBuilder[CustomResourceType, ContextType, *unstructured.Unstructured]
gvk schema.GroupVersionKind
}
// NewUntypedDependencyBuilder creates a new UntypedDependencyBuilder for constructing
// dependencies on Kubernetes resources that don't have compile-time Go types.
//
// This is particularly useful for:
// - Custom Resource Definitions (CRDs) defined in YAML but not in Go
// - Third-party resources from operators you don't control
// - Resources from different API versions or groups
// - Dynamic resource types determined at runtime
//
// Parameters:
// - ctx: The context containing the custom resource and additional data
// - gvk: GroupVersionKind specifying the exact resource type to depend on
//
// The GroupVersionKind must exactly match the target resource's type information.
// The dependency will be resolved as an unstructured.Unstructured object.
//
// Example:
//
// // Depend on a Prometheus ServiceMonitor resource
// gvk := schema.GroupVersionKind{
// Group: "monitoring.coreos.com",
// Version: "v1",
// Kind: "ServiceMonitor",
// }
// dep := NewUntypedDependencyBuilder(ctx, gvk).
// WithName("my-app-metrics").
// WithNamespace(ctx.GetCustomResource().Namespace).
// WithOptional(true). // Don't fail if Prometheus operator not installed
// Build()
func NewUntypedDependencyBuilder[CustomResourceType client.Object, ContextType Context[CustomResourceType]](ctx ContextType, gvk schema.GroupVersionKind) *UntypedDependencyBuilder[CustomResourceType, ContextType] {
return &UntypedDependencyBuilder[CustomResourceType, ContextType]{
inner: NewDependencyBuilder(ctx, &unstructured.Unstructured{}),
gvk: gvk,
}
}
// Build constructs and returns the final UntypedDependency instance with all configured options.
//
// This method finalizes the builder pattern and creates an untyped dependency that can be
// used in reconciliation steps. The returned dependency contains all the configuration
// specified through the builder methods and will work with unstructured.Unstructured objects.
//
// The dependency must be used with appropriate reconciliation steps (such as
// ResolveDynamicDependenciesStep) to actually perform the dependency resolution.
//
// Returns a configured UntypedDependency instance ready for use in reconciliation.
func (b *UntypedDependencyBuilder[CustomResourceType, ContextType]) Build() *UntypedDependency[CustomResourceType, ContextType] {
return &UntypedDependency[CustomResourceType, ContextType]{
Dependency: b.inner.Build(),
gvk: b.gvk,
}
}
// WithAfterReconcile registers a hook function to execute after successful dependency resolution.
//
// This function is called with the resolved dependency as an unstructured.Unstructured object
// and can be used for post-processing, validation, or updating your custom resource's status.
// If the function returns an error, the reconciliation will fail.
//
// Working with unstructured objects requires using helper functions to access nested fields:
//
// Example:
//
// .WithAfterReconcile(func(ctx MyContext, obj *unstructured.Unstructured) error {
// // Extract status from custom resource
// status, found, err := unstructured.NestedString(obj.Object, "status", "endpoint")
// if err != nil {
// return err
// }
// if found {
// ctx.Data.DatabaseEndpoint = status
// }
// return nil
// })
func (b *UntypedDependencyBuilder[CustomResourceType, ContextType]) WithAfterReconcile(f func(ctx ContextType, resource *unstructured.Unstructured) error) *UntypedDependencyBuilder[CustomResourceType, ContextType] {
b.inner = b.inner.WithAfterReconcile(f)
return b
}
// WithBeforeReconcile registers a hook function to execute before dependency resolution.
//
// This function is called before attempting to resolve the untyped dependency and can be used
// for setup tasks, validation, or logging. If the function returns an error,
// dependency resolution will be aborted.
//
// Common use cases:
// - Validating that the target CRD is installed
// - Setting up authentication for third-party resources
// - Logging dependency resolution attempts for debugging
//
// Example:
//
// .WithBeforeReconcile(func(ctx MyContext) error {
// logger := ctx.GetLogger()
// logger.Info("Resolving third-party resource dependency", "gvk", gvk)
// return nil
// })
func (b *UntypedDependencyBuilder[CustomResourceType, ContextType]) WithBeforeReconcile(f func(ctx ContextType) error) *UntypedDependencyBuilder[CustomResourceType, ContextType] {
b.inner = b.inner.WithBeforeReconcile(f)
return b
}
// WithIsReadyFunc defines custom logic to determine if the untyped dependency is ready for use.
//
// The provided function is called with the resolved dependency as an unstructured.Unstructured
// object and should return true if the dependency meets your readiness criteria.
//
// Working with unstructured objects requires using helper functions to access nested fields.
// Common patterns include checking status conditions, phase fields, or custom markers.
//
// Example:
//
// .WithIsReadyFunc(func(obj *unstructured.Unstructured) bool {
// // Check if a custom resource has reached "Ready" state
// phase, found, _ := unstructured.NestedString(obj.Object, "status", "phase")
// if !found {
// return false
// }
// return phase == "Ready" || phase == "Running"
// })
func (b *UntypedDependencyBuilder[CustomResourceType, ContextType]) WithIsReadyFunc(f func(obj *unstructured.Unstructured) bool) *UntypedDependencyBuilder[CustomResourceType, ContextType] {
b.inner = b.inner.WithIsReadyFunc(f)
return b
}
// WithName specifies the name of the Kubernetes resource to depend on.
//
// This is the metadata.name field of the target resource. The name is required
// for dependency resolution and should match exactly with the existing resource.
//
// Example:
//
// .WithName("my-database-instance") // Look for resource named "my-database-instance"
func (b *UntypedDependencyBuilder[CustomResourceType, ContextType]) WithName(name string) *UntypedDependencyBuilder[CustomResourceType, ContextType] {
b.inner = b.inner.WithName(name)
return b
}
// WithNamespace specifies the namespace where the dependency resource is located.
//
// This is the metadata.namespace field of the target resource. If not specified,
// the dependency will be looked up in the same namespace as the custom resource.
//
// For cluster-scoped resources, this field is ignored.
//
// Example:
//
// .WithNamespace("monitoring") // Look in monitoring namespace
// .WithNamespace(ctx.GetCustomResource().Namespace) // Same namespace as custom resource
func (b *UntypedDependencyBuilder[CustomResourceType, ContextType]) WithNamespace(namespace string) *UntypedDependencyBuilder[CustomResourceType, ContextType] {
b.inner = b.inner.WithNamespace(namespace)
return b
}
// WithOptional configures whether this dependency is required for reconciliation.
//
// When set to true, the dependency resolution will continue even if this dependency
// is missing or not ready. When false (default), missing or unready dependencies
// will cause reconciliation to requeue and wait.
//
// This is particularly useful for untyped dependencies on optional operators or CRDs:
// - Prometheus monitoring resources (when Prometheus operator is optional)
// - Service mesh resources (when Istio/Linkerd might not be installed)
// - Third-party integrations that enhance but don't break functionality
//
// Example:
//
// .WithOptional(true) // Don't fail if the CRD isn't installed
func (b *UntypedDependencyBuilder[CustomResourceType, ContextType]) WithOptional(optional bool) *UntypedDependencyBuilder[CustomResourceType, ContextType] {
b.inner = b.inner.WithOptional(optional)
return b
}
// WithOutput specifies where to store the resolved untyped dependency resource.
//
// The provided unstructured.Unstructured object will be populated with the dependency's
// data after successful resolution. This allows other parts of your reconciliation
// logic to access the dependency's current state.
//
// The output object should be a field in your context's data structure to ensure
// it's accessible throughout the reconciliation process.
//
// Example:
//
// type MyContextData struct {
// DatabaseInstance *unstructured.Unstructured
// }
//
// dep := NewUntypedDependencyBuilder(ctx, gvk).
// WithName("my-db").
// WithOutput(ctx.Data.DatabaseInstance). // Store resolved resource here
// Build()
func (b *UntypedDependencyBuilder[CustomResourceType, ContextType]) WithOutput(obj *unstructured.Unstructured) *UntypedDependencyBuilder[CustomResourceType, ContextType] {
b.inner = b.inner.WithOutput(obj)
return b
}
// WithReadinessCondition is an alias for WithIsReadyFunc that defines custom readiness logic.
//
// This method provides the same functionality as WithIsReadyFunc but with a more
// descriptive name for untyped dependencies. Use whichever method name feels more
// natural in your context.
//
// See WithIsReadyFunc for detailed documentation and examples of working with
// unstructured.Unstructured objects.
func (b *UntypedDependencyBuilder[CustomResourceType, ContextType]) WithReadinessCondition(f func(obj *unstructured.Unstructured) bool) *UntypedDependencyBuilder[CustomResourceType, ContextType] {
b.inner = b.inner.WithReadinessCondition(f)
return b
}
// WithUserIdentifier assigns a custom identifier for this untyped dependency.
//
// This identifier is used for logging, debugging, and distinguishing between
// multiple untyped dependencies. If not provided, a default identifier will be
// generated based on the GroupVersionKind and resource name.
//
// This is especially useful for untyped dependencies since the resource types
// may not be immediately obvious from logs.
//
// Example:
//
// .WithUserIdentifier("prometheus-servicemonitor") // Clear name for logs and debugging
func (b *UntypedDependencyBuilder[CustomResourceType, ContextType]) WithUserIdentifier(identifier string) *UntypedDependencyBuilder[CustomResourceType, ContextType] {
b.inner = b.inner.WithUserIdentifier(identifier)
return b
}
// WithWaitForReady determines whether reconciliation should wait for this dependency
// to become ready before proceeding.
//
// When true (recommended), reconciliation will requeue if the dependency exists but
// is not yet ready according to the readiness function. When false, the dependency
// is only checked for existence.
//
// This is particularly important for untyped dependencies on custom resources that
// may have complex initialization or external dependencies.
//
// Example:
//
// .WithWaitForReady(true) // Wait for custom resource to be fully initialized
func (b *UntypedDependencyBuilder[CustomResourceType, ContextType]) WithWaitForReady(waitForReady bool) *UntypedDependencyBuilder[CustomResourceType, ContextType] {
b.inner = b.inner.WithWaitForReady(waitForReady)
return b
}
// WithAddManagedByAnnotation controls whether to add a "managed-by" annotation
// to the untyped dependency resource.
//
// When enabled, this adds metadata to help identify which controller is managing
// or depending on this resource. This is especially useful for untyped dependencies
// since the relationship between controllers and third-party resources may not be obvious.
//
// The annotation typically follows the format:
//
// "app.kubernetes.io/managed-by": "<controller-name>"
//
// Example:
//
// .WithAddManagedByAnnotation(true) // Mark dependency relationship for debugging
func (b *UntypedDependencyBuilder[CustomResourceType, ContextType]) WithAddManagedByAnnotation(add bool) *UntypedDependencyBuilder[CustomResourceType, ContextType] {
b.inner = b.inner.WithAddManagedByAnnotation(add)
return b
}
package ctrlfwk
import (
"reflect"
"sigs.k8s.io/controller-runtime/pkg/client"
)
func NewInstanceOf[T client.Object](object T) T {
var newObject T
// Use reflection to create a new instance of the object type
objectType := reflect.TypeOf(object)
if objectType == nil {
return newObject
}
if objectType.Kind() == reflect.Ptr {
newObject = reflect.New(objectType.Elem()).Interface().(T)
} else {
newObject = reflect.New(objectType).Interface().(T)
}
return newObject
}
func SetAnnotation(obj client.Object, key, value string) {
if obj.GetAnnotations() == nil {
obj.SetAnnotations(make(map[string]string))
}
obj.GetAnnotations()[key] = value
}
func GetAnnotation(obj client.Object, key string) string {
if obj.GetAnnotations() == nil {
return ""
}
return obj.GetAnnotations()[key]
}
package instrument
import (
"reflect"
"unsafe"
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
type InstrumentedBuilder struct {
manager manager.Manager
*builder.Builder
Instrumenter
}
// forceSetNewController uses reflection and unsafe to set the unexported newController field of a
// controller.Builder. This is necessary because controller.Builder does not provide a way to set
// a custom controller constructor function.
func forceSetNewController[request comparable](obj *builder.TypedBuilder[request], fn func(name string, mgr manager.Manager, options controller.TypedOptions[request]) (controller.TypedController[reconcile.Request], error)) {
v := reflect.ValueOf(obj).Elem()
f := v.FieldByName("newController")
if f.Type() != reflect.TypeOf(fn) {
panic("field newController has unexpected type")
}
// Get pointer to the field even if it’s unexported
ptr := unsafe.Pointer(f.UnsafeAddr())
realValue := reflect.NewAt(f.Type(), ptr).Elem()
realValue.Set(reflect.ValueOf(fn))
}
// InstrumentedControllerManagedBy returns a new controller builder that will be started by the provided Manager.
func InstrumentedControllerManagedBy(t Instrumenter, m manager.Manager) *InstrumentedBuilder {
blder := builder.TypedControllerManagedBy[reconcile.Request](m)
blder.WithOptions(controller.TypedOptions[reconcile.Request]{
NewQueue: t.NewQueue(m),
})
forceSetNewController(blder, NewTracerControllerFunc(t))
return &InstrumentedBuilder{
Instrumenter: t,
Builder: blder,
manager: m,
}
}
func (blder *InstrumentedBuilder) For(object client.Object, opts ...builder.ForOption) *InstrumentedBuilder {
blder.Builder = blder.Builder.For(object, opts...)
return blder
}
func (blder *InstrumentedBuilder) Build(r reconcile.TypedReconciler[reconcile.Request]) (controller.TypedController[reconcile.Request], error) {
return blder.Builder.Build(NewInstrumentedReconciler(blder.Instrumenter, r))
}
func (blder *InstrumentedBuilder) Complete(r reconcile.TypedReconciler[reconcile.Request]) error {
return blder.Builder.Complete(r)
}
func (blder *InstrumentedBuilder) Named(name string) *InstrumentedBuilder {
blder.Builder = blder.Builder.Named(name)
return blder
}
func (blder *InstrumentedBuilder) Owns(object client.Object, opts ...builder.OwnsOption) *InstrumentedBuilder {
blder.Builder = blder.Builder.Owns(object, opts...)
return blder
}
func (blder *InstrumentedBuilder) Watches(object client.Object, eventHandler handler.TypedEventHandler[client.Object, reconcile.Request], opts ...builder.WatchesOption) *InstrumentedBuilder {
blder.Builder = blder.Builder.Watches(object, eventHandler, opts...)
return blder
}
func (blder *InstrumentedBuilder) WatchesMetadata(object client.Object, eventHandler handler.TypedEventHandler[client.Object, reconcile.Request], opts ...builder.WatchesOption) *InstrumentedBuilder {
blder.Builder = blder.Builder.WatchesMetadata(object, eventHandler, opts...)
return blder
}
func (blder *InstrumentedBuilder) WatchesRawSource(src source.TypedSource[reconcile.Request]) *InstrumentedBuilder {
blder.Builder = blder.Builder.WatchesRawSource(src)
return blder
}
func (blder *InstrumentedBuilder) WithEventFilter(p predicate.Predicate) *InstrumentedBuilder {
blder.Builder = blder.Builder.WithEventFilter(p)
return blder
}
func (blder *InstrumentedBuilder) WithLogConstructor(logConstructor func(*reconcile.Request) logr.Logger) *InstrumentedBuilder {
blder.Builder = blder.Builder.WithLogConstructor(logConstructor)
return blder
}
func (blder *InstrumentedBuilder) WithOptions(options controller.TypedOptions[reconcile.Request]) *InstrumentedBuilder {
options.NewQueue = blder.Instrumenter.NewQueue(blder.manager)
blder.Builder = blder.Builder.WithOptions(options)
return blder
}
package instrument
import (
"context"
"time"
)
type MergedContext struct {
ctx1 context.Context
ctx2 context.Context
}
var _ context.Context = &MergedContext{}
func NewMergedContext(ctx1, ctx2 context.Context) *MergedContext {
return &MergedContext{
ctx1: ctx1,
ctx2: ctx2,
}
}
func (m *MergedContext) Deadline() (deadline time.Time, ok bool) {
d1, ok1 := m.ctx1.Deadline()
d2, ok2 := m.ctx2.Deadline()
if !ok1 && !ok2 {
return time.Time{}, false
}
if !ok1 {
return d2, true
}
if !ok2 {
return d1, true
}
if d1.Before(d2) {
return d1, true
}
return d2, true
}
func (m *MergedContext) Done() <-chan struct{} {
done1 := m.ctx1.Done()
done2 := m.ctx2.Done()
if done1 == nil && done2 == nil {
return nil
}
if done1 == nil {
return done2
}
if done2 == nil {
return done1
}
mergedDone := make(chan struct{})
go func() {
select {
case <-done1:
case <-done2:
}
close(mergedDone)
}()
return mergedDone
}
func (m *MergedContext) Err() error {
err1 := m.ctx1.Err()
err2 := m.ctx2.Err()
if err1 != nil {
return err1
}
return err2
}
func (m *MergedContext) Value(key any) any {
val1 := m.ctx1.Value(key)
if val1 != nil {
return val1
}
return m.ctx2.Value(key)
}
package instrument
import (
"context"
"reflect"
"unsafe"
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
type InstrumentedController struct {
internalController controller.TypedController[reconcile.Request]
Instrumenter
}
var _ controller.TypedController[reconcile.Request] = &InstrumentedController{}
func NewTracerControllerFunc(t Instrumenter) func(name string, mgr manager.Manager, options controller.TypedOptions[reconcile.Request]) (controller.TypedController[reconcile.Request], error) {
return func(name string, mgr manager.Manager, options controller.TypedOptions[reconcile.Request]) (controller.TypedController[reconcile.Request], error) {
internal, err := controller.NewTyped(name, mgr, options)
if err != nil {
return nil, err
}
return &InstrumentedController{
internalController: internal,
Instrumenter: t,
}, nil
}
}
func (t *InstrumentedController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
return t.internalController.Reconcile(ctx, req)
}
func (t *InstrumentedController) Watch(src source.TypedSource[reconcile.Request]) error {
// Use reflect to check if src has a field named "Handler"
v := reflect.ValueOf(src).Elem()
hdlrField := v.FieldByName("Handler")
if hdlrField.IsValid() && hdlrField.Type() == reflect.TypeOf((*handler.TypedEventHandler[client.Object, reconcile.Request])(nil)).Elem() {
// Get the func to wrap it, it's exported since the name starts with a capital letter
ptr := unsafe.Pointer(hdlrField.UnsafeAddr())
realValue := reflect.NewAt(hdlrField.Type(), ptr).Elem()
originalHandler := realValue.Interface().(handler.TypedEventHandler[client.Object, reconcile.Request])
wrappedHandler := t.InstrumentRequestHandler(originalHandler)
realValue.Set(reflect.ValueOf(wrappedHandler))
}
return t.internalController.Watch(src)
}
func (t *InstrumentedController) Start(ctx context.Context) error {
return t.internalController.Start(ctx)
}
func (t *InstrumentedController) GetLogger() logr.Logger {
return t.internalController.GetLogger()
}
package instrument
import (
"context"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/embedded"
)
type NilTracer struct {
embedded.Tracer
}
var _ Tracer = &NilTracer{}
func (nt *NilTracer) StartSpan(globalCtx *context.Context, localCtx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return localCtx, trace.SpanFromContext(localCtx)
}
package instrument
import (
"context"
"crypto/md5"
"encoding/json"
"fmt"
"runtime"
"sync"
"weak"
"github.com/go-logr/logr"
"go.opentelemetry.io/otel/trace"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
type Instrumenter interface {
InstrumentRequestHandler(handler handler.TypedEventHandler[client.Object, reconcile.Request]) handler.TypedEventHandler[client.Object, reconcile.Request]
GetContextForRequest(req reconcile.Request) (*context.Context, bool)
GetContextForEvent(event any) *context.Context
NewQueue(mgr ctrl.Manager) func(controllerName string, rateLimiter workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request]
Cleanup(ctx *context.Context, req reconcile.Request)
NewLogger(ctx context.Context) logr.Logger
Tracer
}
type Tracer interface {
StartSpan(globalCtx *context.Context, localCtx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span)
}
type instrumenter struct {
mgr ctrl.Manager
queue *InstrumentedQueue[reconcile.Request]
lock sync.Mutex
ctxCache map[string]weak.Pointer[context.Context]
ctxCacheReverse map[*context.Context]string
newLogger func(ctx context.Context) logr.Logger
Tracer
}
func InstrumentRequestHandlerWithTracer[T client.Object](t Instrumenter, handler handler.TypedEventHandler[T, reconcile.Request]) handler.TypedEventHandler[T, reconcile.Request] {
return NewInstrumentedEventHandler(t, handler)
}
func (t *instrumenter) InstrumentRequestHandler(handler handler.TypedEventHandler[client.Object, reconcile.Request]) handler.TypedEventHandler[client.Object, reconcile.Request] {
return NewInstrumentedEventHandler(t, handler)
}
func (t *instrumenter) NewQueue(mgr ctrl.Manager) func(controllerName string, rateLimiter workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
return func(controllerName string, _ workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
ratelimiter := workqueue.DefaultTypedControllerRateLimiter[*reconcile.Request]()
if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) {
t.queue = NewInstrumentedQueue(priorityqueue.New(controllerName, func(o *priorityqueue.Opts[*reconcile.Request]) {
o.Log = mgr.GetLogger().WithValues("controller", controllerName)
o.RateLimiter = ratelimiter
}))
return t.queue
}
t.queue = NewInstrumentedQueue(workqueue.NewTypedRateLimitingQueueWithConfig(ratelimiter, workqueue.TypedRateLimitingQueueConfig[*reconcile.Request]{
Name: controllerName,
}))
return t.queue
}
}
func (t *instrumenter) GetContextForRequest(req reconcile.Request) (*context.Context, bool) {
var defaultContext = context.Background()
if t.queue.internalQueue == nil {
return &defaultContext, false
}
meta, ok := t.queue.GetMetaOf(req)
if !ok {
return &defaultContext, false
}
return meta.Context, true
}
func (t *instrumenter) GetContextForEvent(event any) *context.Context {
var digest string
data, err := json.Marshal(event)
if err != nil {
hash := md5.Sum([]byte(fmt.Sprintf("%#+v\n", event)))
digest = fmt.Sprintf("%x", hash)
} else {
hash := md5.Sum(data)
digest = fmt.Sprintf("%x", hash)
}
t.lock.Lock()
defer t.lock.Unlock()
val, ok := t.ctxCache[digest]
if ok && val.Value() != nil {
return val.Value()
}
ctx := context.Background()
ctxPtr := &ctx
runtime.AddCleanup(ctxPtr, t.cleanupKey, digest)
t.ctxCache[digest] = weak.Make(ctxPtr)
t.ctxCacheReverse[ctxPtr] = digest
return ctxPtr
}
func (t *instrumenter) cleanupKey(key string) {
t.lock.Lock()
defer t.lock.Unlock()
delete(t.ctxCache, key)
}
func (t *instrumenter) Cleanup(ctx *context.Context, req reconcile.Request) {
if t.queue == nil {
return
}
t.queue.cleanupKey(req)
t.lock.Lock()
defer t.lock.Unlock()
digest, ok := t.ctxCacheReverse[ctx]
if !ok {
return
}
delete(t.ctxCache, digest)
delete(t.ctxCacheReverse, ctx)
}
func (t *instrumenter) NewLogger(ctx context.Context) logr.Logger {
return t.newLogger(ctx)
}
package instrument
import (
"context"
"weak"
"github.com/go-logr/logr"
ctrl "sigs.k8s.io/controller-runtime"
)
type instrumenterBuilder struct {
tracer Tracer
newLogger func(ctx context.Context) logr.Logger
mgr ctrl.Manager
}
func NewInstrumenter(mgr ctrl.Manager) *instrumenterBuilder {
return &instrumenterBuilder{
mgr: mgr,
tracer: &NilTracer{},
newLogger: func(ctx context.Context) logr.Logger {
return logr.New(nil)
},
}
}
func (b *instrumenterBuilder) WithTracer(t Tracer) *instrumenterBuilder {
b.tracer = t
return b
}
func (b *instrumenterBuilder) WithLoggerFunc(l func(ctx context.Context) logr.Logger) *instrumenterBuilder {
b.newLogger = l
return b
}
func (b *instrumenterBuilder) Build() Instrumenter {
return &instrumenter{
mgr: b.mgr,
ctxCache: make(map[string]weak.Pointer[context.Context]),
ctxCacheReverse: make(map[*context.Context]string),
newLogger: b.newLogger,
Tracer: b.tracer,
}
}
package instrument
import (
"context"
"github.com/go-logr/logr"
)
type Logger interface {
Enabled() bool
Error(err error, msg string, keysAndValues ...any)
GetSink() logr.LogSink
GetV() int
Info(msg string, keysAndValues ...any)
IsZero() bool
V(level int) logr.Logger
WithCallDepth(depth int) logr.Logger
WithCallStackHelper() (func(), logr.Logger)
WithName(name string) logr.Logger
WithSink(sink logr.LogSink) logr.Logger
WithValues(keysAndValues ...any) logr.Logger
}
func NewLoggerFunc(logger logr.Logger) func(ctx context.Context) logr.Logger {
return func(ctx context.Context) logr.Logger {
return logger
}
}
package instrument
import (
"context"
"go.opentelemetry.io/otel/trace"
)
type OtelTracer struct {
trace.Tracer
}
func NewOtelTracer(t trace.Tracer) *OtelTracer {
return &OtelTracer{
Tracer: t,
}
}
func (t *OtelTracer) StartSpan(globalCtx *context.Context, localCtx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return t.Start(localCtx, spanName, opts...)
}
package instrument
import (
"context"
"runtime"
"sync"
"time"
"weak"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
type encapsulatedItem[T comparable] struct {
Context *context.Context
Object weak.Pointer[T]
}
type InstrumentedQueue[T comparable] struct {
lock *sync.Mutex
currentContext *context.Context
internalQueue workqueue.TypedRateLimitingInterface[*T]
metamap map[T]*encapsulatedItem[T]
}
var _ priorityqueue.PriorityQueue[reconcile.Request] = InstrumentedQueue[reconcile.Request]{}
func NewInstrumentedQueue[T comparable](queue workqueue.TypedRateLimitingInterface[*T]) *InstrumentedQueue[T] {
return &InstrumentedQueue[T]{
lock: &sync.Mutex{},
internalQueue: queue,
metamap: make(map[T]*encapsulatedItem[T]),
}
}
func (q InstrumentedQueue[T]) cleanupKey(key T) {
q.lock.Lock()
defer q.lock.Unlock()
delete(q.metamap, key)
}
func (q InstrumentedQueue[T]) WithContext(ctx *context.Context) *InstrumentedQueue[T] {
return &InstrumentedQueue[T]{
lock: q.lock,
currentContext: ctx,
internalQueue: q.internalQueue,
metamap: q.metamap,
}
}
func (q InstrumentedQueue[T]) GetMetaOf(item T) (*encapsulatedItem[T], bool) {
val, ok := q.metamap[item]
if !ok {
return nil, false
}
return val, true
}
func (q InstrumentedQueue[T]) isInQueue(item T) bool {
val, ok := q.metamap[item]
if !ok {
return false
}
if val.Object.Value() == nil {
delete(q.metamap, item)
return false
}
return true
}
func (q InstrumentedQueue[T]) Add(item T) {
q.lock.Lock()
defer q.lock.Unlock()
pointerToItem := &item
weakPointerToItem := weak.Make(pointerToItem)
runtime.AddCleanup(pointerToItem, q.cleanupKey, item)
if !q.isInQueue(item) {
q.internalQueue.Add(pointerToItem)
q.metamap[item] = &encapsulatedItem[T]{
Context: q.currentContext,
Object: weakPointerToItem,
}
}
}
func (q InstrumentedQueue[T]) AddAfter(item T, duration time.Duration) {
q.lock.Lock()
defer q.lock.Unlock()
pointerToItem := &item
weakPointerToItem := weak.Make(pointerToItem)
runtime.AddCleanup(pointerToItem, q.cleanupKey, item)
if !q.isInQueue(item) {
q.internalQueue.AddAfter(pointerToItem, duration)
q.metamap[item] = &encapsulatedItem[T]{
Context: q.currentContext,
Object: weakPointerToItem,
}
}
}
func (q InstrumentedQueue[T]) AddRateLimited(item T) {
q.lock.Lock()
defer q.lock.Unlock()
pointerToItem := &item
weakPointerToItem := weak.Make(pointerToItem)
runtime.AddCleanup(pointerToItem, q.cleanupKey, item)
if !q.isInQueue(item) {
q.internalQueue.AddRateLimited(pointerToItem)
q.metamap[item] = &encapsulatedItem[T]{
Context: q.currentContext,
Object: weakPointerToItem,
}
}
}
func (q InstrumentedQueue[T]) Done(item T) {
capsule, ok := q.metamap[item]
if !ok {
return
}
q.internalQueue.Done(capsule.Object.Value())
q.lock.Lock()
defer q.lock.Unlock()
delete(q.metamap, item)
}
func (q InstrumentedQueue[T]) Forget(item T) {
capsule, ok := q.metamap[item]
if !ok {
return
}
q.internalQueue.Forget(capsule.Object.Value())
q.lock.Lock()
defer q.lock.Unlock()
delete(q.metamap, item)
}
func (q InstrumentedQueue[T]) Get() (item T, shutdown bool) {
pointerToItem, shutdown := q.internalQueue.Get()
if pointerToItem == nil {
var zero T
return zero, shutdown
}
item = *pointerToItem
return item, shutdown
}
func (q InstrumentedQueue[T]) Len() int {
return q.internalQueue.Len()
}
func (q InstrumentedQueue[T]) NumRequeues(item T) int {
capsule, ok := q.metamap[item]
if !ok {
return 0
}
return q.internalQueue.NumRequeues(capsule.Object.Value())
}
func (q InstrumentedQueue[T]) ShutDown() {
q.internalQueue.ShutDown()
}
func (q InstrumentedQueue[T]) ShutDownWithDrain() {
q.internalQueue.ShutDownWithDrain()
}
func (q InstrumentedQueue[T]) ShuttingDown() bool {
return q.internalQueue.ShuttingDown()
}
func (q InstrumentedQueue[T]) AddWithOpts(o priorityqueue.AddOpts, Items ...T) {
pq, ok := q.internalQueue.(priorityqueue.PriorityQueue[*T])
if ok {
q.lock.Lock()
defer q.lock.Unlock()
for _, item := range Items {
pointerToItem := &item
weakPointerToItem := weak.Make(pointerToItem)
runtime.AddCleanup(pointerToItem, q.cleanupKey, item)
if !q.isInQueue(item) {
if o.After > 0 {
pq.AddAfter(pointerToItem, o.After)
} else if o.RateLimited {
pq.AddRateLimited(pointerToItem)
} else {
pq.Add(pointerToItem)
}
q.metamap[item] = &encapsulatedItem[T]{
Context: q.currentContext,
Object: weakPointerToItem,
}
}
}
return
}
for _, item := range Items {
if o.After > 0 {
q.AddAfter(item, o.After)
} else if o.RateLimited {
q.AddRateLimited(item)
} else {
q.Add(item)
}
}
}
func (q InstrumentedQueue[T]) GetWithPriority() (item T, priority int, shutdown bool) {
pq, ok := q.internalQueue.(priorityqueue.PriorityQueue[*T])
if ok {
pointerToItem, priority, shutdown := pq.GetWithPriority()
if pointerToItem == nil {
var zero T
return zero, priority, shutdown
}
item = *pointerToItem
return item, priority, shutdown
}
pointerToItem, shutdown := q.internalQueue.Get()
if pointerToItem == nil {
var zero T
return zero, 0, shutdown
}
item = *pointerToItem
return item, 0, shutdown
}
package instrument
import (
"context"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
type InstrumentedReconciler struct {
internalReconciler reconcile.TypedReconciler[reconcile.Request]
Instrumenter
}
var _ reconcile.TypedReconciler[reconcile.Request] = &InstrumentedReconciler{}
func NewInstrumentedReconciler(t Instrumenter, r reconcile.TypedReconciler[reconcile.Request]) *InstrumentedReconciler {
return &InstrumentedReconciler{
internalReconciler: r,
Instrumenter: t,
}
}
func (t *InstrumentedReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
logger := t.NewLogger(ctx)
ctx = logf.IntoContext(ctx, logger)
ctxPtr, _ := t.GetContextForRequest(req)
ctx, span := t.StartSpan(ctxPtr, ctx, "reconcile")
defer span.End()
result, err := t.internalReconciler.Reconcile(ctx, req)
if err != nil {
logger.Error(err, "failed to reconcile")
}
t.Cleanup(ctxPtr, req)
return result, err
}
package instrument
import (
"context"
"fmt"
"github.com/wI2L/jsondiff"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
type instrumentedEventHandler[ObjectType client.Object] struct {
tracer Instrumenter
inner handler.TypedEventHandler[ObjectType, reconcile.Request]
innerName string
}
func NewInstrumentedEventHandler[ObjectType client.Object](tracer Instrumenter, inner handler.TypedEventHandler[ObjectType, reconcile.Request]) handler.TypedEventHandler[ObjectType, reconcile.Request] {
return &instrumentedEventHandler[ObjectType]{
tracer: tracer,
inner: inner,
innerName: fmt.Sprintf("%T", inner),
}
}
// Ensure tracingEventHandler implements the handler.TypedEventHandler interface
func (t *instrumentedEventHandler[ObjectType]) Create(
ctx context.Context,
e event.TypedCreateEvent[ObjectType],
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
ctxPtr := t.tracer.GetContextForEvent(e)
logger := t.tracer.NewLogger(*ctxPtr)
ctx, span := t.tracer.StartSpan(ctxPtr, ctx, fmt.Sprintf("event.create.handler.%T", t.inner))
defer span.End()
logger.Info("Received create event", "object_type", e.Object.GetObjectKind().GroupVersionKind(), "name", e.Object.GetName(), "namespace", e.Object.GetNamespace())
tracingQueue, ok := q.(*InstrumentedQueue[reconcile.Request])
if !ok {
// If the provided queue is not a TracingQueue, we cannot proceed with tracing
t.inner.Create(ctx, e, q)
return
}
// Create a temporary queue with the current context
tempQueue := tracingQueue.WithContext(ctxPtr)
t.inner.Create(ctx, e, tempQueue)
}
// Update is called in response to an update event - e.g. Pod Updated.
func (t *instrumentedEventHandler[ObjectType]) Update(ctx context.Context, e event.TypedUpdateEvent[ObjectType], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
ctxPtr := t.tracer.GetContextForEvent(e)
logger := t.tracer.NewLogger(*ctxPtr)
ctx, span := t.tracer.StartSpan(ctxPtr, ctx, fmt.Sprintf("event.update.handler.%T", t.inner))
defer span.End()
patch, _ := jsondiff.Compare(e.ObjectOld, e.ObjectNew,
jsondiff.Ignores("/metadata/managedFields", "/kind", "/apiVersion"),
)
logger.Info("Received update event", "object_type", e.ObjectOld.GetObjectKind().GroupVersionKind(), "name", e.ObjectOld.GetName(), "namespace", e.ObjectOld.GetNamespace(), "patch_data", patch)
tracingQueue, ok := q.(*InstrumentedQueue[reconcile.Request])
if !ok {
// If the provided queue is not a TracingQueue, we cannot proceed with tracing
t.inner.Update(ctx, e, q)
return
}
// Create a temporary queue with the current context
tempQueue := tracingQueue.WithContext(&ctx)
t.inner.Update(ctx, e, tempQueue)
}
// Delete is called in response to a delete event - e.g. Pod Deleted.
func (t *instrumentedEventHandler[ObjectType]) Delete(ctx context.Context, e event.TypedDeleteEvent[ObjectType], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
ctxPtr := t.tracer.GetContextForEvent(e)
logger := t.tracer.NewLogger(*ctxPtr)
ctx, span := t.tracer.StartSpan(ctxPtr, ctx, fmt.Sprintf("event.delete.handler.%T", t.inner))
defer span.End()
logger.Info("Received delete event", "object_type", e.Object.GetObjectKind().GroupVersionKind(), "name", e.Object.GetName(), "namespace", e.Object.GetNamespace())
tracingQueue, ok := q.(*InstrumentedQueue[reconcile.Request])
if !ok {
// If the provided queue is not a TracingQueue, we cannot proceed with tracing
t.inner.Delete(ctx, e, q)
return
}
// Create a temporary queue with the current context
tempQueue := tracingQueue.WithContext(ctxPtr)
t.inner.Delete(ctx, e, tempQueue)
}
// Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or
// external trigger request - e.g. reconcile Autoscaling, or a Webhook.
func (t *instrumentedEventHandler[ObjectType]) Generic(ctx context.Context, e event.TypedGenericEvent[ObjectType], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
ctxPtr := t.tracer.GetContextForEvent(e)
logger := t.tracer.NewLogger(*ctxPtr)
ctx, span := t.tracer.StartSpan(ctxPtr, *ctxPtr, fmt.Sprintf("event.generic.handler.%T", t.inner))
defer span.End()
logger.Info("Received generic event", "object_type", e.Object.GetObjectKind().GroupVersionKind(), "name", e.Object.GetName(), "namespace", e.Object.GetNamespace())
tracingQueue, ok := q.(*InstrumentedQueue[reconcile.Request])
if !ok {
// If the provided queue is not a TracingQueue, we cannot proceed with tracing
t.inner.Generic(ctx, e, q)
return
}
// Create a temporary queue with the current context
tempQueue := tracingQueue.WithContext(ctxPtr)
t.inner.Generic(ctx, e, tempQueue)
}
package instrument
import (
"context"
"fmt"
"maps"
"time"
"github.com/getsentry/sentry-go"
"github.com/go-logr/logr"
)
func NewSentryLoggerFunc(logger logr.Logger) func(ctx context.Context) logr.Logger {
return func(ctx context.Context) logr.Logger {
return logger.WithSink(NewSentrySink(ctx, logger.GetSink()))
}
}
type sentrySink struct {
context.Context
logr.LogSink
values map[string]any
}
var _ logr.LogSink = &sentrySink{}
func NewSentrySink(ctx context.Context, logSink logr.LogSink) *sentrySink {
return &sentrySink{
Context: ctx,
LogSink: logSink,
values: make(map[string]any),
}
}
func keysAndValuesToMap(keysAndValues ...any) sentry.BreadcrumbHint {
if len(keysAndValues)%2 != 0 {
keysAndValues = append(keysAndValues, "unknown")
}
m := make(map[string]any)
for i := 0; i < len(keysAndValues)-1; i += 2 {
m[fmt.Sprint(keysAndValues[i])] = fmt.Sprint(keysAndValues[i+1])
}
return m
}
func (s *sentrySink) Info(level int, msg string, keysAndValues ...any) {
s.LogSink.Info(level, msg, keysAndValues...)
hub := sentry.GetHubFromContext(s.Context)
if hub == nil {
return
}
data := keysAndValuesToMap(keysAndValues...)
maps.Copy(data, s.values)
hub.AddBreadcrumb(&sentry.Breadcrumb{
Level: sentry.LevelInfo,
Message: msg,
Data: data,
Type: "log",
Timestamp: time.Now(),
}, &data)
}
func (s *sentrySink) Error(err error, msg string, keysAndValues ...any) {
s.LogSink.Error(err, msg, keysAndValues...)
hub := sentry.GetHubFromContext(s.Context)
if hub == nil {
return
}
hub.CaptureException(err)
}
func (s *sentrySink) Enabled(level int) bool {
return s.LogSink.Enabled(level)
}
func (s *sentrySink) WithValues(keysAndValues ...any) logr.LogSink {
newValues := keysAndValuesToMap(keysAndValues...)
maps.Copy(newValues, s.values)
return &sentrySink{
Context: s.Context,
LogSink: s.LogSink.WithValues(keysAndValues...),
values: newValues,
}
}
func (s *sentrySink) WithName(name string) logr.LogSink {
return &sentrySink{
Context: s.Context,
LogSink: s.LogSink.WithName(name),
values: s.values,
}
}
package instrument
import (
"context"
"github.com/getsentry/sentry-go"
"go.opentelemetry.io/otel/trace"
)
type SentryTracer struct {
tracer trace.Tracer
}
func NewSentryTracer(t trace.Tracer) *SentryTracer {
return &SentryTracer{
tracer: t,
}
}
func (t *SentryTracer) StartSpan(globalCtx *context.Context, localCtx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
hub := sentry.GetHubFromContext(*globalCtx)
if hub == nil {
hub = sentry.CurrentHub().Clone()
*globalCtx = sentry.SetHubOnContext(*globalCtx, hub)
}
localCtx = sentry.SetHubOnContext(localCtx, hub)
return t.tracer.Start(localCtx, spanName, opts...)
}
package ctrlfwk
import (
"context"
"encoding/json"
"slices"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
const (
AnnotationRef = "dependencies.ctrlfwk.com/managed-by"
)
// ManagedBy represents a reference to a controller managing a resource and
// it includes the controller's name, namespace, and GroupVersionKind (GVK)
// information.
type ManagedBy struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
GVK schema.GroupVersionKind `json:"gvk"`
}
// GetManagedBy retrieves the list of ManagedBy references from the specified
// object's annotations. If the annotation is not present, it returns an empty
// slice. If there is an error during unmarshalling, it returns the error.
func GetManagedBy(obj client.Object) ([]ManagedBy, error) {
annotations := obj.GetAnnotations()
v, ok := annotations[AnnotationRef]
if !ok {
return []ManagedBy{}, nil
}
var out []ManagedBy
err := json.Unmarshal([]byte(v), &out)
if err != nil {
return nil, err
}
return out, err
}
// AddManagedBy adds a ManagedBy reference to the specified object's annotations.
// It returns true if the annotation was added or modified, and false if the
// reference already exists. If there is an error during the process, it returns
// the error.
func AddManagedBy(obj client.Object, controlledBy client.Object, scheme *runtime.Scheme) (changed bool, err error) {
gvk, err := apiutil.GVKForObject(controlledBy, scheme)
if err != nil {
return false, err
}
references, err := GetManagedBy(obj)
if err != nil {
return false, err
}
// Early return if ref is already present
ref := ManagedBy{
Name: controlledBy.GetName(),
Namespace: controlledBy.GetNamespace(),
GVK: gvk,
}
if slices.Contains(references, ref) {
return false, nil
}
references = append(references, ref)
annotations := obj.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotationValue, err := json.Marshal(references)
if err != nil {
return false, err
}
annotations[AnnotationRef] = string(annotationValue)
obj.SetAnnotations(annotations)
return true, nil
}
func RemoveManagedBy(obj client.Object, controlledBy client.Object, scheme *runtime.Scheme) (changed bool, err error) {
gvk, err := apiutil.GVKForObject(controlledBy, scheme)
if err != nil {
return false, err
}
references, err := GetManagedBy(obj)
if err != nil {
return false, err
}
// Early return if ref is not present
ref := ManagedBy{
Name: controlledBy.GetName(),
Namespace: controlledBy.GetNamespace(),
GVK: gvk,
}
if !slices.Contains(references, ref) {
return false, nil
}
references = slices.DeleteFunc(references, func(val ManagedBy) bool {
return val == ref
})
annotations := obj.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
if len(references) == 0 {
delete(annotations, AnnotationRef)
obj.SetAnnotations(annotations)
return true, nil
}
annotationValue, err := json.Marshal(references)
if err != nil {
return false, err
}
annotations[AnnotationRef] = string(annotationValue)
obj.SetAnnotations(annotations)
return true, nil
}
func GetManagedByReconcileRequests(ownedBy client.Object, scheme *runtime.Scheme) (func(ctx context.Context, obj client.Object) []reconcile.Request, error) {
gvk, err := apiutil.GVKForObject(ownedBy, scheme)
return func(ctx context.Context, obj client.Object) []reconcile.Request {
references, err := GetManagedBy(obj)
if err != nil {
return nil
}
var requests []reconcile.Request
for _, ref := range references {
if ref.GVK != gvk {
continue
}
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Name: ref.Name,
Namespace: ref.Namespace,
},
})
}
return requests
}, err
}
package ctrlfwk
import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
)
// NotPausedPredicate is a predicate that filters out paused resources from reconciliation.
// Resources with the ctrlfwk.com/pause label will not trigger reconciliation events.
type NotPausedPredicate = TypedNotPausedPredicate[client.Object]
// TypedNotPausedPredicate filters reconciliation events for resources marked as paused.
// When applied to a controller, it prevents the controller from queuing reconciliation
// requests for resources that have the pause label set.
type TypedNotPausedPredicate[object client.Object] struct{}
func (p TypedNotPausedPredicate[object]) Create(e event.TypedCreateEvent[object]) bool {
obj := e.Object
labels := obj.GetLabels()
if labels == nil {
return true
}
if _, ok := labels[LabelReconciliationPaused]; ok {
return false
}
return true
}
func (p TypedNotPausedPredicate[object]) Delete(e event.TypedDeleteEvent[object]) bool {
return true
}
func (p TypedNotPausedPredicate[object]) Update(e event.TypedUpdateEvent[object]) bool {
obj := e.ObjectNew
labels := obj.GetLabels()
if labels == nil {
return true
}
if _, ok := labels[LabelReconciliationPaused]; ok {
return false
}
return true
}
func (p TypedNotPausedPredicate[object]) Generic(e event.TypedGenericEvent[object]) bool {
obj := e.Object
labels := obj.GetLabels()
if labels == nil {
return true
}
if _, ok := labels[LabelReconciliationPaused]; ok {
return false
}
return true
}
package ctrlfwk
import (
"fmt"
"reflect"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type Mutator[ResourceType client.Object] func(resource ResourceType) error
type GenericResource[CustomResource client.Object, ContextType Context[CustomResource]] interface {
ID() string
ObjectMetaGenerator() (obj client.Object, delete bool, err error)
ShouldDeleteNow() bool
GetMutator(obj client.Object) func() error
Set(obj client.Object)
Get() client.Object
Kind() string
IsReady(obj client.Object) bool
RequiresManualDeletion(obj client.Object) bool
CanBePaused() bool
// Hooks
BeforeReconcile(ctx ContextType) error
AfterReconcile(ctx ContextType, resource client.Object) error
OnCreate(ctx ContextType, resource client.Object) error
OnUpdate(ctx ContextType, resource client.Object) error
OnDelete(ctx ContextType, resource client.Object) error
OnFinalize(ctx ContextType, resource client.Object) error
}
var _ GenericResource[client.Object, Context[client.Object]] = &Resource[client.Object, Context[client.Object], client.Object]{}
type Resource[CustomResource client.Object, ContextType Context[CustomResource], ResourceType client.Object] struct {
userIdentifier string
keyF func() types.NamespacedName
mutateF Mutator[ResourceType]
isReadyF func(obj ResourceType) bool
shouldDeleteF func() bool
requiresDeletionF func(obj ResourceType) bool
output ResourceType
canBePausedF func() bool
// Hooks
beforeReconcileF func(ctx ContextType) error
afterReconcileF func(ctx ContextType, resource ResourceType) error
onCreateF func(ctx ContextType, resource ResourceType) error
onUpdateF func(ctx ContextType, resource ResourceType) error
onDeleteF func(ctx ContextType, resource ResourceType) error
onFinalizeF func(ctx ContextType, resource ResourceType) error
}
func (c *Resource[CustomResource, ContextType, ResourceType]) Kind() string {
return reflect.TypeOf(c.output).Elem().Name()
}
func (c *Resource[CustomResource, ContextType, ResourceType]) ObjectMetaGenerator() (obj client.Object, skip bool, err error) {
if reflect.ValueOf(c.output).IsNil() {
c.output = reflect.New(reflect.TypeOf(c.output).Elem()).Interface().(ResourceType)
}
key := c.keyF()
c.output.SetName(key.Name)
c.output.SetNamespace(key.Namespace)
return c.output, c.shouldDeleteF != nil && c.shouldDeleteF(), nil
}
func (c *Resource[CustomResource, ContextType, ResourceType]) ID() string {
if c.userIdentifier != "" {
return c.userIdentifier
}
key := c.keyF()
return fmt.Sprintf("%v,%v", c.Kind(), key)
}
func (c *Resource[CustomResource, ContextType, ResourceType]) Set(obj client.Object) {
if reflect.TypeOf(c.output) == reflect.TypeOf(obj) {
if reflect.ValueOf(c.output).IsNil() {
c.output = reflect.New(reflect.TypeOf(c.output).Elem()).Interface().(ResourceType)
}
reflect.ValueOf(c.output).Elem().Set(reflect.ValueOf(obj).Elem())
}
}
func (c *Resource[CustomResource, ContextType, ResourceType]) Get() client.Object {
return c.output
}
func (c *Resource[CustomResource, ContextType, ResourceType]) IsReady(obj client.Object) bool {
if c.isReadyF != nil {
if typedObj, ok := obj.(ResourceType); ok {
return c.isReadyF(typedObj)
}
if obj == nil {
var zero ResourceType
return c.isReadyF(zero)
}
}
return false
}
func (c *Resource[CustomResource, ContextType, ResourceType]) RequiresManualDeletion(obj client.Object) bool {
if c.requiresDeletionF != nil {
if typedObj, ok := obj.(ResourceType); ok {
return c.requiresDeletionF(typedObj)
}
if obj == nil {
var zero ResourceType
return c.requiresDeletionF(zero)
}
}
return false
}
func (c *Resource[CustomResource, ContextType, ResourceType]) ShouldDeleteNow() bool {
if c.shouldDeleteF != nil {
return c.shouldDeleteF()
}
return false
}
func (c *Resource[CustomResource, ContextType, ResourceType]) BeforeReconcile(ctx ContextType) error {
if c.beforeReconcileF != nil {
return c.beforeReconcileF(ctx)
}
return nil
}
func (c *Resource[CustomResource, ContextType, ResourceType]) AfterReconcile(ctx ContextType, resource client.Object) error {
if c.afterReconcileF != nil {
if typedObj, ok := resource.(ResourceType); ok {
return c.afterReconcileF(ctx, typedObj)
}
if resource == nil {
var zero ResourceType
return c.afterReconcileF(ctx, zero)
}
}
return nil
}
func (c *Resource[CustomResource, ContextType, ResourceType]) OnCreate(ctx ContextType, resource client.Object) error {
if c.onCreateF != nil {
if typedObj, ok := resource.(ResourceType); ok {
return c.onCreateF(ctx, typedObj)
}
if resource == nil {
var zero ResourceType
return c.onCreateF(ctx, zero)
}
}
return nil
}
func (c *Resource[CustomResource, ContextType, ResourceType]) OnUpdate(ctx ContextType, resource client.Object) error {
if c.onUpdateF != nil {
if typedObj, ok := resource.(ResourceType); ok {
return c.onUpdateF(ctx, typedObj)
}
if resource == nil {
var zero ResourceType
return c.onUpdateF(ctx, zero)
}
}
return nil
}
func (c *Resource[CustomResource, ContextType, ResourceType]) OnDelete(ctx ContextType, resource client.Object) error {
if c.onDeleteF != nil {
if typedObj, ok := resource.(ResourceType); ok {
return c.onDeleteF(ctx, typedObj)
}
if resource == nil {
var zero ResourceType
return c.onDeleteF(ctx, zero)
}
}
return nil
}
func (c *Resource[CustomResource, ContextType, ResourceType]) OnFinalize(ctx ContextType, resource client.Object) error {
if c.onFinalizeF != nil {
if typedObj, ok := resource.(ResourceType); ok {
return c.onFinalizeF(ctx, typedObj)
}
if resource == nil {
var zero ResourceType
return c.onFinalizeF(ctx, zero)
}
}
return nil
}
func (c *Resource[CustomResource, ContextType, ResourceType]) GetMutator(obj client.Object) func() error {
return func() error {
if c.mutateF != nil {
if typedObj, ok := obj.(ResourceType); ok {
return c.mutateF(typedObj)
}
if obj == nil {
var zero ResourceType
return c.mutateF(zero)
}
}
return nil
}
}
func (c *Resource[CustomResource, ContextType, ResourceType]) CanBePaused() bool {
if c.canBePausedF != nil {
return c.canBePausedF()
}
return false
}
package ctrlfwk
import (
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// ResourceBuilder provides a fluent builder pattern for creating Resource instances that your
// custom resource controller manages during reconciliation.
//
// Resources represent Kubernetes objects that your controller creates, updates, and manages
// as part of achieving the desired state for your custom resource. Unlike dependencies,
// resources are owned by your custom resource and are created/managed by your controller.
//
// Type parameters:
// - CustomResource: The custom resource that owns and manages this resource
// - ContextType: The context type containing the custom resource and additional data
// - ResourceType: The Kubernetes resource type this builder creates (e.g., Deployment, Service)
//
// Common use cases:
// - Creating Deployments for your application custom resource
// - Managing Services, ConfigMaps, and Secrets
// - Setting up RBAC resources (Roles, RoleBindings)
// - Creating PersistentVolumeClaims for stateful applications
//
// Example:
//
// // Create a Deployment resource for your custom resource
// deployment := NewResourceBuilder(ctx, &appsv1.Deployment{}).
// WithKeyFunc(func() types.NamespacedName {
// return types.NamespacedName{
// Name: ctx.GetCustomResource().Name + "-deployment",
// Namespace: ctx.GetCustomResource().Namespace,
// }
// }).
// WithMutator(func(deployment *appsv1.Deployment) error {
// // Configure deployment spec based on custom resource
// deployment.Spec.Replicas = ctx.GetCustomResource().Spec.Replicas
// return controllerutil.SetOwnerReference(ctx.GetCustomResource(), deployment, scheme)
// }).
// WithReadinessCondition(func(deployment *appsv1.Deployment) bool {
// return deployment.Status.ReadyReplicas == *deployment.Spec.Replicas
// }).
// Build()
type ResourceBuilder[CustomResource client.Object, ContextType Context[CustomResource], ResourceType client.Object] struct {
resource *Resource[CustomResource, ContextType, ResourceType]
}
// NewResourceBuilder creates a new ResourceBuilder for constructing managed Kubernetes resources.
//
// Resources are Kubernetes objects that your controller creates and manages to implement
// the desired state defined by your custom resource. They are typically owned by your
// custom resource and have owner references set for garbage collection.
//
// Parameters:
// - ctx: The context containing the custom resource and additional data
// - _: A zero-value instance used for type inference (e.g., &appsv1.Deployment{})
//
// The resource will be reconciled when used with ReconcileResourcesStep or
// ReconcileResourceStep during the reconciliation process.
//
// Key differences from dependencies:
// - Resources are CREATED and MANAGED by your controller
// - Dependencies are CONSUMED by your controller (external resources)
// - Resources typically have owner references to your custom resource
// - Resources are deleted when your custom resource is deleted
//
// Common resource patterns:
// - Application deployments and services
// - Configuration resources (ConfigMaps, Secrets)
// - RBAC resources for your application
// - Storage resources (PVCs) for stateful applications
//
// Example:
//
// // Create a Service resource for your web application
// service := NewResourceBuilder(ctx, &corev1.Service{}).
// WithKeyFunc(func() types.NamespacedName {
// return types.NamespacedName{
// Name: ctx.GetCustomResource().Name + "-service",
// Namespace: ctx.GetCustomResource().Namespace,
// }
// }).
// WithMutator(func(svc *corev1.Service) error {
// svc.Spec.Selector = map[string]string{"app": ctx.GetCustomResource().Name}
// svc.Spec.Ports = []corev1.ServicePort{{
// Port: 80,
// TargetPort: intstr.FromInt(8080),
// Protocol: corev1.ProtocolTCP,
// }}
// return controllerutil.SetOwnerReference(ctx.GetCustomResource(), svc, scheme)
// }).
// Build()
func NewResourceBuilder[CustomResource client.Object, ContextType Context[CustomResource], ResourceType client.Object](ctx ContextType, _ ResourceType) *ResourceBuilder[CustomResource, ContextType, ResourceType] {
return &ResourceBuilder[CustomResource, ContextType, ResourceType]{
resource: &Resource[CustomResource, ContextType, ResourceType]{},
}
}
// WithKey specifies a static NamespacedName for the resource.
//
// This is useful when the resource name and namespace are known at build time
// and don't need to be computed dynamically based on the custom resource state.
//
// For dynamic naming based on custom resource properties, use WithKeyFunc instead.
//
// Example:
//
// .WithKey(types.NamespacedName{
// Name: "my-app-service",
// Namespace: "default",
// }) // Static name for the service
func (b *ResourceBuilder[CustomResource, ContextType, ResourceType]) WithKey(name types.NamespacedName) *ResourceBuilder[CustomResource, ContextType, ResourceType] {
b.resource.keyF = func() types.NamespacedName {
return name
}
return b
}
// WithKeyFunc specifies a function that dynamically determines the resource's NamespacedName.
//
// This function is called during reconciliation to determine where the resource should
// be created or found. It's evaluated each time the resource is reconciled, allowing
// for dynamic naming based on the current state of the custom resource.
//
// The function typically derives the name from the custom resource's metadata or spec,
// and should return a consistent result for the same custom resource state.
//
// Common patterns:
// - Prefixing with custom resource name: ctx.GetCustomResource().Name + "-suffix"
// - Using custom resource namespace: ctx.GetCustomResource().Namespace
// - Conditional naming based on custom resource spec
// - When the name is stored in the spec, you might wanna refer to the status when the spec field is updated or disappears
//
// Example:
//
// .WithKeyFunc(func() types.NamespacedName {
// cr := ctx.GetCustomResource()
// return types.NamespacedName{
// Name: cr.Name + "-" + cr.Spec.Component, // Dynamic based on spec
// Namespace: cr.Namespace, // Same namespace as CR
// }
// })
func (b *ResourceBuilder[CustomResource, ContextType, ResourceType]) WithKeyFunc(f func() types.NamespacedName) *ResourceBuilder[CustomResource, ContextType, ResourceType] {
b.resource.keyF = f
return b
}
// WithMutator specifies the function that configures the resource's desired state.
//
// The mutator function is called whenever the resource needs to be created or updated.
// It receives the resource object and should configure all necessary fields to match
// the desired state defined by your custom resource.
//
// The mutator should:
// - Set all required fields on the resource
// - Configure the resource based on custom resource spec
// - Set owner references for garbage collection
// - Apply labels, annotations, and other metadata
// - Return an error if configuration fails
//
// The mutator is called for both CREATE and UPDATE operations, so it should be
// idempotent and handle both scenarios gracefully.
//
// Example:
//
// .WithMutator(func(deployment *appsv1.Deployment) error {
// cr := ctx.GetCustomResource()
//
// // Configure deployment based on custom resource
// deployment.Spec.Replicas = cr.Spec.Replicas
// deployment.Spec.Selector = &metav1.LabelSelector{
// MatchLabels: map[string]string{"app": cr.Name},
// }
// deployment.Spec.Template.Spec.Containers = []corev1.Container{{
// Name: "app",
// Image: cr.Spec.Image,
// }}
//
// // Set owner reference for garbage collection
// return controllerutil.SetOwnerReference(cr, deployment, ctx.GetScheme())
// })
func (b *ResourceBuilder[CustomResource, ContextType, ResourceType]) WithMutator(f Mutator[ResourceType]) *ResourceBuilder[CustomResource, ContextType, ResourceType] {
b.resource.mutateF = f
return b
}
// WithOutput specifies where to store the reconciled resource after successful operations.
//
// The provided object will be populated with the resource's current state from the
// cluster after reconciliation completes. This allows other parts of your controller
// logic to access the resource's runtime state, such as generated fields, status
// information, or cluster-assigned values.
//
// The output object should be a field in your context's data structure to ensure
// it's accessible throughout the reconciliation process.
//
// Common use cases:
// - Accessing service ClusterIP after creation
// - Reading generated secret data
// - Getting deployment status for custom resource status updates
// - Obtaining persistent volume claim details
//
// Example:
//
// type MyContextData struct {
// AppService *corev1.Service
// }
//
// service := NewResourceBuilder(ctx, &corev1.Service{}).
// // ... other configuration ...
// WithOutput(ctx.Data.AppService). // Store reconciled service here
// Build()
func (b *ResourceBuilder[CustomResource, ContextType, ResourceType]) WithOutput(obj ResourceType) *ResourceBuilder[CustomResource, ContextType, ResourceType] {
b.resource.output = obj
return b
}
// WithReadinessCondition defines custom logic to determine when the resource is ready.
//
// The provided function is called with the current resource state and should return
// true if the resource has reached the desired operational state. This affects when
// the overall custom resource is considered ready and can influence reconciliation flow.
//
// If no readiness condition is provided, the resource is considered ready as soon
// as it exists and has been successfully created or updated.
//
// Common readiness patterns:
// - Deployments: Check that ReadyReplicas == DesiredReplicas
// - Services: Verify endpoints are populated
// - Jobs: Check for successful completion
// - Custom resources: Examine status conditions
//
// Example:
//
// .WithReadinessCondition(func(deployment *appsv1.Deployment) bool {
// // Deployment is ready when all replicas are ready
// if deployment.Spec.Replicas == nil {
// return deployment.Status.ReadyReplicas > 0
// }
// return deployment.Status.ReadyReplicas == *deployment.Spec.Replicas &&
// deployment.Status.UpdatedReplicas == *deployment.Spec.Replicas
// })
func (b *ResourceBuilder[CustomResource, ContextType, ResourceType]) WithReadinessCondition(f func(obj ResourceType) bool) *ResourceBuilder[CustomResource, ContextType, ResourceType] {
b.resource.isReadyF = f
return b
}
// WithSkipAndDeleteOnCondition specifies when to skip creating or delete an existing resource.
//
// The provided function is evaluated during reconciliation. When it returns true:
// - If the resource doesn't exist, it will be skipped (not created)
// - If the resource exists, it will be deleted
//
// This is useful for conditional resource management based on custom resource configuration,
// feature flags, or environmental conditions.
//
// The condition function is called each reconciliation cycle, so resources can be
// dynamically created or removed based on changing conditions.
//
// Common use cases:
// - Feature toggles that enable/disable optional components
// - Environment-specific resources (dev vs prod)
// - Conditional scaling or resource allocation
// - Migration scenarios where resources are phased out
//
// Example:
//
// .WithSkipAndDeleteOnCondition(func() bool {
// cr := ctx.GetCustomResource()
// // Only create monitoring service if monitoring is enabled
// return !cr.Spec.Monitoring.Enabled
// })
//
// // Another example: conditional based on environment
// .WithSkipAndDeleteOnCondition(func() bool {
// // Skip expensive resources in development environment
// return ctx.GetCustomResource().Spec.Environment == "development"
// })
func (b *ResourceBuilder[CustomResource, ContextType, ResourceType]) WithSkipAndDeleteOnCondition(f func() bool) *ResourceBuilder[CustomResource, ContextType, ResourceType] {
b.resource.shouldDeleteF = f
return b
}
// WithRequireManualDeletionForFinalize specifies when a resource requires manual cleanup
// during custom resource finalization.
//
// When the custom resource is being deleted, this function is called to determine if
// the resource requires special handling before the custom resource can be fully removed.
// If the function returns true, the resource must be manually cleaned up before
// finalization can complete.
//
// This is typically used for resources that:
// - Have external dependencies that need cleanup
// - Store important data that needs backup
// - Have complex deletion procedures
// - Need graceful shutdown processes
//
// The function receives the current resource state to make decisions based on the
// resource's current condition or configuration.
//
// Example:
//
// .WithRequireManualDeletionForFinalize(func(pvc *corev1.PersistentVolumeClaim) bool {
// // Require manual deletion for PVCs that contain important data
// if important, exists := pvc.Annotations["data.important"]; exists {
// return important == "true"
// }
// return false
// })
//
// // Another example: based on resource state
// .WithRequireManualDeletionForFinalize(func(deployment *appsv1.Deployment) bool {
// // Require manual deletion if deployment is still running
// return deployment.Status.Replicas > 0
// })
func (b *ResourceBuilder[CustomResource, ContextType, ResourceType]) WithRequireManualDeletionForFinalize(f func(obj ResourceType) bool) *ResourceBuilder[CustomResource, ContextType, ResourceType] {
b.resource.requiresDeletionF = f
return b
}
// WithBeforeReconcile registers a hook function to execute before resource reconciliation.
//
// This function is called before any resource operations (create, update, or delete)
// are performed. It can be used for setup tasks, validation, or preparation work.
// If the function returns an error, resource reconciliation will be aborted.
//
// Common use cases:
// - Validating preconditions for resource creation
// - Setting up external dependencies
// - Performing cleanup of old resources
// - Logging reconciliation attempts
// - Initializing shared state
//
// Example:
//
// .WithBeforeReconcile(func(ctx MyContext) error {
// logger := ctx.GetLogger()
// cr := ctx.GetCustomResource()
//
// logger.Info("Reconciling application deployment", "version", cr.Spec.Version)
//
// // Validate configuration before proceeding
// if cr.Spec.Replicas < 0 {
// return fmt.Errorf("replica count cannot be negative: %d", cr.Spec.Replicas)
// }
//
// return nil
// })
func (b *ResourceBuilder[CustomResource, ContextType, ResourceType]) WithBeforeReconcile(f func(ctx ContextType) error) *ResourceBuilder[CustomResource, ContextType, ResourceType] {
b.resource.beforeReconcileF = f
return b
}
// WithAfterReconcile registers a hook function to execute after successful resource reconciliation.
//
// This function is called after the resource has been successfully created, updated, or
// verified to exist with the correct configuration. It receives the current resource
// state from the cluster and can be used for post-processing, status updates, or
// triggering additional operations.
//
// If the function returns an error, the reconciliation will fail even though the
// resource operation itself was successful.
//
// Common use cases:
// - Updating custom resource status with resource information
// - Triggering external system notifications
// - Caching resource data for future use
// - Logging successful operations
// - Initiating dependent operations
//
// Example:
//
// .WithAfterReconcile(func(ctx MyContext, service *corev1.Service) error {
// cr := ctx.GetCustomResource()
//
// // Update custom resource status with service information
// if cr.Status.ServiceStatus == nil {
// cr.Status.ServiceStatus = &ServiceStatus{}
// }
// cr.Status.ServiceStatus.Name = service.Name
// cr.Status.ServiceStatus.ClusterIP = service.Spec.ClusterIP
//
// // Set ready condition
// meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
// Type: "ServiceReady",
// Status: metav1.ConditionTrue,
// Reason: "ServiceCreated",
// })
//
// return ctx.PatchStatus()
// })
func (b *ResourceBuilder[CustomResource, ContextType, ResourceType]) WithAfterReconcile(f func(ctx ContextType, resource ResourceType) error) *ResourceBuilder[CustomResource, ContextType, ResourceType] {
b.resource.afterReconcileF = f
return b
}
// WithAfterCreate registers a hook function that executes only when a resource is newly created.
//
// This function is called specifically when a resource is created for the first time,
// not when it's updated. It's useful for one-time initialization tasks, logging
// creation events, or triggering operations that should only happen on resource creation.
//
// The function receives the newly created resource in its current state from the cluster,
// including any fields that were populated by Kubernetes (like UID, creation timestamp).
//
// Common use cases:
// - Logging resource creation events
// - One-time initialization tasks
// - Sending creation notifications to external systems
// - Recording metrics for new resource creation
// - Triggering initial configuration workflows
//
// Example:
//
// .WithAfterCreate(func(ctx MyContext, deployment *appsv1.Deployment) error {
// logger := ctx.GetLogger()
// logger.Info("Deployment created successfully",
// "name", deployment.Name,
// "namespace", deployment.Namespace,
// "uid", deployment.UID,
// "replicas", *deployment.Spec.Replicas)
//
// // Send notification to monitoring system
// return ctx.NotifyExternalSystem("deployment_created", deployment.Name)
// })
func (b *ResourceBuilder[CustomResource, ContextType, ResourceType]) WithAfterCreate(f func(ctx ContextType, resource ResourceType) error) *ResourceBuilder[CustomResource, ContextType, ResourceType] {
b.resource.onCreateF = f
return b
}
// WithAfterUpdate registers a hook function that executes only when a resource is updated.
//
// This function is called specifically when an existing resource is modified, not when
// it's initially created. It's useful for tracking changes, logging update events, or
// triggering operations that should only happen when configuration changes.
//
// The function receives the updated resource in its current state from the cluster
// after the update operation has completed successfully.
//
// Common use cases:
// - Logging configuration changes
// - Triggering rolling updates or restarts
// - Notifying external systems of changes
// - Recording metrics for resource modifications
// - Validating update results
//
// Example:
//
// .WithAfterUpdate(func(ctx MyContext, deployment *appsv1.Deployment) error {
// logger := ctx.GetLogger()
// cr := ctx.GetCustomResource()
//
// logger.Info("Deployment updated successfully",
// "name", deployment.Name,
// "generation", deployment.Generation,
// "replicas", *deployment.Spec.Replicas)
//
// // Update custom resource status with latest generation
// cr.Status.DeploymentGeneration = deployment.Generation
// return ctx.PatchStatus()
// })
func (b *ResourceBuilder[CustomResource, ContextType, ResourceType]) WithAfterUpdate(f func(ctx ContextType, resource ResourceType) error) *ResourceBuilder[CustomResource, ContextType, ResourceType] {
b.resource.onUpdateF = f
return b
}
// WithAfterDelete registers a hook function that executes after a resource is deleted.
//
// This function is called when a resource has been successfully deleted from the cluster,
// either due to a delete condition being met or during custom resource finalization.
// It can be used for cleanup tasks, logging deletion events, or notifying external systems.
//
// The function receives the resource object as it existed just before deletion.
// At this point, the resource no longer exists in the cluster.
//
// Common use cases:
// - Logging resource deletion events
// - Cleaning up external resources or dependencies
// - Notifying monitoring or auditing systems
// - Recording metrics for resource deletion
// - Updating custom resource status to reflect deletion
//
// Example:
//
// .WithAfterDelete(func(ctx MyContext, pvc *corev1.PersistentVolumeClaim) error {
// logger := ctx.GetLogger()
// logger.Info("PersistentVolumeClaim deleted",
// "name", pvc.Name,
// "namespace", pvc.Namespace,
// "storageClass", *pvc.Spec.StorageClassName)
//
// // Clean up any backup data associated with this PVC
// return ctx.CleanupBackupData(pvc.Name)
// })
func (b *ResourceBuilder[CustomResource, ContextType, ResourceType]) WithAfterDelete(f func(ctx ContextType, resource ResourceType) error) *ResourceBuilder[CustomResource, ContextType, ResourceType] {
b.resource.onDeleteF = f
return b
}
// WithAfterFinalize registers a hook function that executes during custom resource finalization.
//
// This function is called when the custom resource is being deleted and the resource
// needs to be cleaned up as part of the finalization process. It's used for graceful
// shutdown procedures and cleanup of resources that require special handling.
//
// The function should perform any necessary cleanup operations and ensure that
// external dependencies are properly handled before the custom resource is fully removed.
//
// Common use cases:
// - Graceful shutdown of applications
// - Backup of important data before deletion
// - Cleanup of external resources or registrations
// - Notifying external systems of resource removal
// - Removing finalizers from dependent resources
//
// Example:
//
// .WithAfterFinalize(func(ctx MyContext, deployment *appsv1.Deployment) error {
// logger := ctx.GetLogger()
//
// // Gracefully scale down before deletion
// if deployment.Spec.Replicas != nil && *deployment.Spec.Replicas > 0 {
// logger.Info("Scaling down deployment before finalization")
// zero := int32(0)
// deployment.Spec.Replicas = &zero
// return ctx.Update(deployment)
// }
//
// // Perform final cleanup
// return ctx.CleanupExternalResources(deployment.Name)
// })
func (b *ResourceBuilder[CustomResource, ContextType, ResourceType]) WithAfterFinalize(f func(ctx ContextType, resource ResourceType) error) *ResourceBuilder[CustomResource, ContextType, ResourceType] {
b.resource.onFinalizeF = f
return b
}
// WithUserIdentifier assigns a custom identifier for this resource.
//
// This identifier is used for logging, debugging, and distinguishing between multiple
// resources of the same type within your controller. If not provided, a default
// identifier will be generated based on the resource type.
//
// The identifier appears in logs and error messages, making it easier to track
// specific resources during debugging and troubleshooting.
//
// Useful for:
// - Distinguishing between multiple deployments (e.g., "frontend", "backend")
// - Providing meaningful names for different services
// - Creating clear audit trails in logs
// - Simplifying debugging of complex resource hierarchies
//
// Example:
//
// .WithUserIdentifier("frontend-deployment") // Clear name for logs
// .WithUserIdentifier("database-service") // Easy to identify in debugging
func (b *ResourceBuilder[CustomResource, ContextType, ResourceType]) WithUserIdentifier(identifier string) *ResourceBuilder[CustomResource, ContextType, ResourceType] {
b.resource.userIdentifier = identifier
return b
}
// WithCanBePaused specifies whether this resource supports pausing reconciliation.
//
// When set to true, the resource will respect the paused state of the custom resource.
// If the custom resource is marked as paused (e.g., via a label), reconciliation
// for this resource will be skipped until the pause is lifted.
//
// This is useful for scenarios where you want to temporarily halt changes to
// certain resources without deleting them or affecting other parts of the system.
//
// Common use cases:
// - Temporarily halting updates during maintenance windows
// - Pausing non-critical resources while troubleshooting issues
// - Allowing manual intervention before resuming automated management
//
// Example:
//
// .WithCanBePaused(true) // Enable pausing for this resource
func (b *ResourceBuilder[CustomResource, ContextType, ResourceType]) WithCanBePaused(canBePaused bool) *ResourceBuilder[CustomResource, ContextType, ResourceType] {
b.resource.canBePausedF = func() bool {
return canBePaused
}
return b
}
// WithCanBePausedFunc specifies a function to determine if this resource supports pausing reconciliation.
//
// The provided function is called during reconciliation to check if the resource
// should respect the paused state of the custom resource. If it returns true,
// reconciliation for this resource will be skipped when the custom resource is paused.
//
// This allows for dynamic control over which resources can be paused based on
// the current state or configuration of the custom resource.
//
// Example:
//
// .WithCanBePausedFunc(func() bool {
// // Custom logic to determine if the resource can be paused
// return someCondition
// })
func (b *ResourceBuilder[CustomResource, ContextType, ResourceType]) WithCanBePausedFunc(f func() bool) *ResourceBuilder[CustomResource, ContextType, ResourceType] {
b.resource.canBePausedF = f
return b
}
// Build constructs and returns the final Resource instance with all configured options.
//
// This method finalizes the builder pattern and creates a resource that can be used
// in reconciliation steps. The returned resource contains all the configuration
// specified through the builder methods.
//
// The resource must be used with appropriate reconciliation steps (such as
// ReconcileResourcesStep) to actually perform the resource management operations.
//
// Validation:
// - At least one of WithKey or WithKeyFunc must be called before Build()
// - WithMutator is typically required for meaningful resource management
//
// Returns a configured Resource instance ready for use in reconciliation.
func (b *ResourceBuilder[CustomResource, ContextType, ResourceType]) Build() *Resource[CustomResource, ContextType, ResourceType] {
return b.resource
}
package ctrlfwk
import (
"fmt"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type UntypedResource[CustomResource client.Object, ContextType Context[CustomResource]] struct {
*Resource[CustomResource, ContextType, *unstructured.Unstructured]
gvk schema.GroupVersionKind
}
var _ GenericResource[client.Object, Context[client.Object]] = &UntypedResource[client.Object, Context[client.Object]]{}
func (c *UntypedResource[CustomResource, ContextType]) Kind() string {
return fmt.Sprintf("Untyped%s", c.gvk.Kind)
}
func (c *UntypedResource[CustomResource, ContextType]) ObjectMetaGenerator() (obj client.Object, skip bool, err error) {
obj, skip, err = c.Resource.ObjectMetaGenerator()
if err != nil || skip {
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(c.gvk)
return obj, skip, err
}
unstructuredObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return nil, false, fmt.Errorf("expected *unstructured.Unstructured, got %T", obj)
}
unstructuredObj.SetGroupVersionKind(c.gvk)
return unstructuredObj, false, nil
}
package ctrlfwk
import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// UntypedResourceBuilder provides a fluent builder pattern for creating managed resources
// that are not known at compile time or don't have Go type definitions.
//
// This builder is useful when your custom resource needs to manage:
// - Custom Resource Definitions (CRDs) not included in your Go code
// - Third-party resources from other operators
// - Resources from different API groups or versions
// - Dynamic resource types determined at runtime
//
// Type parameters:
// - CustomResource: The custom resource that owns and manages this untyped resource
// - ContextType: The context type containing the custom resource and additional data
//
// The builder works with unstructured.Unstructured objects, which can represent
// any Kubernetes resource dynamically.
//
// Unlike untyped dependencies (which are external resources you consume), untyped
// resources are resources that your controller creates and manages as part of
// implementing your custom resource's desired state.
//
// Common use cases:
// - Managing CRDs defined in YAML but not in Go
// - Creating third-party operator resources (e.g., Prometheus ServiceMonitors)
// - Managing resources from different API versions
// - Implementing operators that work with dynamic resource types
//
// Example:
//
// // Create a Prometheus ServiceMonitor for your application
// gvk := schema.GroupVersionKind{
// Group: "monitoring.coreos.com",
// Version: "v1",
// Kind: "ServiceMonitor",
// }
// serviceMonitor := NewUntypedResourceBuilder(ctx, gvk).
// WithKeyFunc(func() types.NamespacedName {
// return types.NamespacedName{
// Name: ctx.GetCustomResource().Name + "-metrics",
// Namespace: ctx.GetCustomResource().Namespace,
// }
// }).
// WithMutator(func(obj *unstructured.Unstructured) error {
// // Configure ServiceMonitor using unstructured helpers
// return unstructured.SetNestedField(obj.Object, "app", "spec", "selector", "matchLabels", "app")
// }).
// WithReadinessCondition(func(obj *unstructured.Unstructured) bool {
// // Check if ServiceMonitor is being scraped
// status, found, _ := unstructured.NestedString(obj.Object, "status", "phase")
// return found && status == "Active"
// }).
// Build()
type UntypedResourceBuilder[CustomResource client.Object, ContextType Context[CustomResource]] struct {
inner *ResourceBuilder[CustomResource, ContextType, *unstructured.Unstructured]
gvk schema.GroupVersionKind
}
// NewUntypedResourceBuilder creates a new UntypedResourceBuilder for constructing
// managed Kubernetes resources that don't have compile-time Go types.
//
// This is particularly useful for:
// - Managing Custom Resource Definitions (CRDs) defined in YAML but not in Go
// - Creating third-party resources from operators you don't control
// - Working with resources from different API versions or groups
// - Implementing dynamic resource management based on configuration
//
// Parameters:
// - ctx: The context containing the custom resource and additional data
// - gvk: GroupVersionKind specifying the exact resource type to manage
//
// The GroupVersionKind must exactly match the target resource's type information.
// The resource will be managed as an unstructured.Unstructured object with
// owner references to your custom resource for proper garbage collection.
//
// Key differences from NewUntypedDependencyBuilder:
// - Resources are CREATED and MANAGED by your controller
// - Dependencies are CONSUMED by your controller (external resources)
// - Resources have owner references to your custom resource
// - Resources are deleted when your custom resource is deleted
//
// Example:
//
// // Manage a Grafana Dashboard resource
// gvk := schema.GroupVersionKind{
// Group: "grafana.integreatly.org",
// Version: "v1beta1",
// Kind: "GrafanaDashboard",
// }
// dashboard := NewUntypedResourceBuilder(ctx, gvk).
// WithKeyFunc(func() types.NamespacedName {
// return types.NamespacedName{
// Name: ctx.GetCustomResource().Name + "-dashboard",
// Namespace: ctx.GetCustomResource().Namespace,
// }
// }).
// WithMutator(func(obj *unstructured.Unstructured) error {
// // Configure dashboard JSON content
// dashboardJSON := generateDashboard(ctx.GetCustomResource())
// return unstructured.SetNestedField(obj.Object, dashboardJSON, "spec", "json")
// }).
// Build()
func NewUntypedResourceBuilder[CustomResource client.Object, ContextType Context[CustomResource]](ctx ContextType, gvk schema.GroupVersionKind) *UntypedResourceBuilder[CustomResource, ContextType] {
return &UntypedResourceBuilder[CustomResource, ContextType]{
inner: NewResourceBuilder(ctx, &unstructured.Unstructured{}),
gvk: gvk,
}
}
// Build constructs and returns the final UntypedResource instance with all configured options.
//
// This method finalizes the builder pattern and creates an untyped resource that can be
// used in reconciliation steps. The returned resource contains all the configuration
// specified through the builder methods and will work with unstructured.Unstructured objects.
//
// The resource must be used with appropriate reconciliation steps (such as
// ReconcileResourcesStep) to actually perform the resource management operations.
//
// Validation:
// - At least one of WithKey or WithKeyFunc must be called before Build()
// - WithMutator is typically required for meaningful resource management
//
// Returns a configured UntypedResource instance ready for use in reconciliation.
func (b *UntypedResourceBuilder[CustomResource, ContextType]) Build() *UntypedResource[CustomResource, ContextType] {
return &UntypedResource[CustomResource, ContextType]{
Resource: b.inner.Build(),
gvk: b.gvk,
}
}
// WithAfterCreate registers a hook function that executes only when an untyped resource is newly created.
//
// This function is called specifically when a resource is created for the first time,
// not when it's updated. It's useful for one-time initialization tasks specific to
// untyped resources, such as setting up external integrations or logging creation events.
//
// The function receives the newly created unstructured resource, including any fields
// populated by Kubernetes during creation.
//
// Working with unstructured objects requires using helper functions to access nested fields.
//
// Example:
//
// .WithAfterCreate(func(ctx MyContext, obj *unstructured.Unstructured) error {
// // Log creation of custom resource
// logger := ctx.GetLogger()
// name, _, _ := unstructured.NestedString(obj.Object, "metadata", "name")
// logger.Info("Custom resource created", "name", name, "gvk", gvk)
//
// // Register with external monitoring system
// return ctx.RegisterWithExternalSystem(obj)
// })
func (b *UntypedResourceBuilder[CustomResource, ContextType]) WithAfterCreate(f func(ctx ContextType, resource *unstructured.Unstructured) error) *UntypedResourceBuilder[CustomResource, ContextType] {
b.inner = b.inner.WithAfterCreate(f)
return b
}
// WithAfterDelete registers a hook function that executes after an untyped resource is deleted.
//
// This function is called when a resource has been successfully deleted from the cluster,
// either due to a delete condition being met or during custom resource finalization.
// It can be used for cleanup tasks specific to untyped resources.
//
// The function receives the resource object as it existed just before deletion.
// Working with unstructured objects requires using helper functions to access nested fields.
//
// Example:
//
// .WithAfterDelete(func(ctx MyContext, obj *unstructured.Unstructured) error {
// // Clean up external references
// name, _, _ := unstructured.NestedString(obj.Object, "metadata", "name")
// logger := ctx.GetLogger()
// logger.Info("Cleaning up external resources", "resource", name)
//
// // Remove from external monitoring system
// return ctx.UnregisterFromExternalSystem(name)
// })
func (b *UntypedResourceBuilder[CustomResource, ContextType]) WithAfterDelete(f func(ctx ContextType, resource *unstructured.Unstructured) error) *UntypedResourceBuilder[CustomResource, ContextType] {
b.inner = b.inner.WithAfterDelete(f)
return b
}
// WithAfterFinalize registers a hook function that executes during custom resource finalization
// for untyped resources that require special cleanup handling.
//
// This function is called when the custom resource is being deleted and the untyped resource
// needs to be cleaned up as part of the finalization process. It's particularly important
// for untyped resources that may have external dependencies or special deletion requirements.
//
// Working with unstructured objects requires using helper functions to access nested fields.
//
// Example:
//
// .WithAfterFinalize(func(ctx MyContext, obj *unstructured.Unstructured) error {
// // Graceful cleanup for third-party resources
// logger := ctx.GetLogger()
//
// // Check if resource has special cleanup requirements
// cleanupMode, found, _ := unstructured.NestedString(obj.Object, "spec", "cleanupMode")
// if found && cleanupMode == "preserve" {
// logger.Info("Preserving resource during finalization")
// return nil
// }
//
// // Perform graceful shutdown
// return ctx.GracefullyShutdownExternalResource(obj)
// })
func (b *UntypedResourceBuilder[CustomResource, ContextType]) WithAfterFinalize(f func(ctx ContextType, resource *unstructured.Unstructured) error) *UntypedResourceBuilder[CustomResource, ContextType] {
b.inner = b.inner.WithAfterFinalize(f)
return b
}
// WithAfterReconcile registers a hook function to execute after successful untyped resource reconciliation.
//
// This function is called after the untyped resource has been successfully created, updated,
// or verified. It receives the current resource state as an unstructured.Unstructured object
// and can be used for post-processing, status updates, or triggering additional operations.
//
// Working with unstructured objects requires using helper functions to access nested fields.
//
// Example:
//
// .WithAfterReconcile(func(ctx MyContext, obj *unstructured.Unstructured) error {
// cr := ctx.GetCustomResource()
//
// // Extract status information from the untyped resource
// status, found, err := unstructured.NestedString(obj.Object, "status", "phase")
// if err != nil {
// return err
// }
//
// // Update custom resource status
// if found {
// cr.Status.ExternalResourcePhase = status
// meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
// Type: "ExternalResourceReady",
// Status: metav1.ConditionTrue,
// Reason: "ResourceReconciled",
// })
// }
//
// return ctx.PatchStatus()
// })
func (b *UntypedResourceBuilder[CustomResource, ContextType]) WithAfterReconcile(f func(ctx ContextType, resource *unstructured.Unstructured) error) *UntypedResourceBuilder[CustomResource, ContextType] {
b.inner = b.inner.WithAfterReconcile(f)
return b
}
// WithAfterUpdate registers a hook function that executes only when an untyped resource is updated.
//
// This function is called specifically when an existing untyped resource is modified,
// not when it's initially created. It's useful for tracking changes to third-party
// resources and responding to configuration updates.
//
// Working with unstructured objects requires using helper functions to access nested fields.
//
// Example:
//
// .WithAfterUpdate(func(ctx MyContext, obj *unstructured.Unstructured) error {
// logger := ctx.GetLogger()
//
// // Log the update with resource details
// name, _, _ := unstructured.NestedString(obj.Object, "metadata", "name")
// generation, _, _ := unstructured.NestedInt64(obj.Object, "metadata", "generation")
//
// logger.Info("External resource updated",
// "name", name,
// "generation", generation,
// "gvk", gvk)
//
// // Trigger external system update notification
// return ctx.NotifyExternalSystemOfUpdate(obj)
// })
func (b *UntypedResourceBuilder[CustomResource, ContextType]) WithAfterUpdate(f func(ctx ContextType, resource *unstructured.Unstructured) error) *UntypedResourceBuilder[CustomResource, ContextType] {
b.inner = b.inner.WithAfterUpdate(f)
return b
}
// WithBeforeReconcile registers a hook function to execute before untyped resource reconciliation.
//
// This function is called before any resource operations (create, update, or delete)
// are performed on the untyped resource. It's particularly useful for validating
// that the target CRD is installed and for preparing the environment.
//
// Common use cases for untyped resources:
// - Validating that the target CRD exists in the cluster
// - Checking operator availability (e.g., Prometheus, Grafana operators)
// - Setting up authentication for third-party APIs
// - Performing environment-specific preparations
//
// Example:
//
// .WithBeforeReconcile(func(ctx MyContext) error {
// logger := ctx.GetLogger()
// client := ctx.GetClient()
//
// // Verify that the target CRD exists
// crdName := fmt.Sprintf("%ss.%s", strings.ToLower(gvk.Kind), gvk.Group)
// crd := &apiextensionsv1.CustomResourceDefinition{}
// err := client.Get(ctx, types.NamespacedName{Name: crdName}, crd)
// if err != nil {
// return fmt.Errorf("required CRD %s not found: %w", crdName, err)
// }
//
// logger.Info("Proceeding with untyped resource reconciliation", "gvk", gvk)
// return nil
// })
func (b *UntypedResourceBuilder[CustomResource, ContextType]) WithBeforeReconcile(f func(ctx ContextType) error) *UntypedResourceBuilder[CustomResource, ContextType] {
b.inner = b.inner.WithBeforeReconcile(f)
return b
}
// WithKey specifies a static NamespacedName for the untyped resource.
//
// This is useful when the resource name and namespace are known at build time
// and don't need to be computed dynamically based on the custom resource state.
//
// For dynamic naming based on custom resource properties, use WithKeyFunc instead.
//
// Example:
//
// .WithKey(types.NamespacedName{
// Name: "monitoring-config",
// Namespace: "monitoring",
// }) // Static name for the monitoring resource
func (b *UntypedResourceBuilder[CustomResource, ContextType]) WithKey(name types.NamespacedName) *UntypedResourceBuilder[CustomResource, ContextType] {
b.inner = b.inner.WithKey(name)
return b
}
// WithKeyFunc specifies a function that dynamically determines the untyped resource's NamespacedName.
//
// This function is called during reconciliation to determine where the resource should
// be created or found. For untyped resources, this is particularly important as the
// naming often needs to be coordinated with external operators or systems.
//
// Example:
//
// .WithKeyFunc(func() types.NamespacedName {
// cr := ctx.GetCustomResource()
// return types.NamespacedName{
// // Name follows third-party operator conventions
// Name: fmt.Sprintf("%s-%s-monitor", cr.Name, cr.Spec.Component),
// Namespace: cr.Namespace,
// }
// })
func (b *UntypedResourceBuilder[CustomResource, ContextType]) WithKeyFunc(f func() types.NamespacedName) *UntypedResourceBuilder[CustomResource, ContextType] {
b.inner = b.inner.WithKeyFunc(f)
return b
}
// WithMutator specifies the function that configures the untyped resource's desired state.
//
// The mutator function receives an unstructured.Unstructured object and should configure
// all necessary fields using the unstructured helper functions. This is where you define
// how your custom resource's spec translates into the target third-party resource configuration.
//
// Working with unstructured objects requires using helper functions like:
// - unstructured.SetNestedField() to set individual values
// - unstructured.SetNestedSlice() to set arrays
// - unstructured.SetNestedMap() to set objects
//
// The mutator should be idempotent and handle both CREATE and UPDATE operations.
//
// Example:
//
// .WithMutator(func(obj *unstructured.Unstructured) error {
// cr := ctx.GetCustomResource()
//
// // Set basic metadata
// obj.SetName(cr.Name + "-servicemonitor")
// obj.SetNamespace(cr.Namespace)
//
// // Configure ServiceMonitor spec using unstructured helpers
// err := unstructured.SetNestedMap(obj.Object, map[string]any{
// "app": cr.Name,
// }, "spec", "selector", "matchLabels")
// if err != nil {
// return err
// }
//
// // Set endpoint configuration
// endpoints := []any{
// map[string]any{
// "port": "metrics",
// "path": "/metrics",
// },
// }
// err = unstructured.SetNestedSlice(obj.Object, endpoints, "spec", "endpoints")
// if err != nil {
// return err
// }
//
// // Set owner reference for garbage collection
// return controllerutil.SetOwnerReference(cr, obj, ctx.GetScheme())
// })
func (b *UntypedResourceBuilder[CustomResource, ContextType]) WithMutator(f Mutator[*unstructured.Unstructured]) *UntypedResourceBuilder[CustomResource, ContextType] {
b.inner = b.inner.WithMutator(f)
return b
}
// WithOutput specifies where to store the reconciled untyped resource after successful operations.
//
// The provided unstructured.Unstructured object will be populated with the resource's
// current state from the cluster after reconciliation completes. This is particularly
// useful for untyped resources where you need to extract status information or other
// runtime values generated by third-party operators.
//
// Example:
//
// type MyContextData struct {
// ServiceMonitor *unstructured.Unstructured
// }
//
// resource := NewUntypedResourceBuilder(ctx, gvk).
// // ... other configuration ...
// WithOutput(ctx.Data.ServiceMonitor). // Store reconciled resource here
// Build()
//
// // Later, access the reconciled resource
// if ctx.Data.ServiceMonitor != nil {
// status, found, _ := unstructured.NestedString(
// ctx.Data.ServiceMonitor.Object, "status", "phase")
// }
func (b *UntypedResourceBuilder[CustomResource, ContextType]) WithOutput(obj *unstructured.Unstructured) *UntypedResourceBuilder[CustomResource, ContextType] {
b.inner = b.inner.WithOutput(obj)
return b
}
// WithReadinessCondition defines custom logic to determine when the untyped resource is ready.
//
// The provided function is called with the current unstructured resource state and should
// return true if the resource has reached the desired operational state. This is particularly
// important for untyped resources that may have complex initialization processes managed
// by third-party operators.
//
// Working with unstructured objects requires using helper functions to access nested fields.
//
// Example:
//
// .WithReadinessCondition(func(obj *unstructured.Unstructured) bool {
// // Check if Prometheus ServiceMonitor is being scraped
// status, found, _ := unstructured.NestedString(obj.Object, "status", "conditions")
// if !found {
// return false
// }
//
// // More complex readiness check
// lastScrape, found, _ := unstructured.NestedString(obj.Object, "status", "lastScrapeTime")
// if !found {
// return false
// }
//
// // Consider ready if scraped within last 5 minutes
// scrapeTime, err := time.Parse(time.RFC3339, lastScrape)
// return err == nil && time.Since(scrapeTime) < 5*time.Minute
// })
func (b *UntypedResourceBuilder[CustomResource, ContextType]) WithReadinessCondition(f func(obj *unstructured.Unstructured) bool) *UntypedResourceBuilder[CustomResource, ContextType] {
b.inner = b.inner.WithReadinessCondition(f)
return b
}
// WithRequireManualDeletionForFinalize specifies when an untyped resource requires manual cleanup
// during custom resource finalization.
//
// This is particularly important for untyped resources managed by third-party operators,
// as they may have complex deletion procedures or external dependencies that need special handling.
//
// Working with unstructured objects requires using helper functions to access nested fields.
//
// Example:
//
// .WithRequireManualDeletionForFinalize(func(obj *unstructured.Unstructured) bool {
// // Check if resource has special deletion requirements
// deletionPolicy, found, _ := unstructured.NestedString(
// obj.Object, "spec", "deletionPolicy")
// if found && deletionPolicy == "Retain" {
// return true // Requires manual cleanup
// }
//
// // Check for external dependencies
// externalDeps, found, _ := unstructured.NestedSlice(
// obj.Object, "status", "externalDependencies")
// return found && len(externalDeps) > 0
// })
func (b *UntypedResourceBuilder[CustomResource, ContextType]) WithRequireManualDeletionForFinalize(f func(obj *unstructured.Unstructured) bool) *UntypedResourceBuilder[CustomResource, ContextType] {
b.inner = b.inner.WithRequireManualDeletionForFinalize(f)
return b
}
// WithSkipAndDeleteOnCondition specifies when to skip creating or delete an existing untyped resource.
//
// This is particularly useful for untyped resources that depend on optional third-party
// operators or should only be created under certain conditions. The function is evaluated
// during each reconciliation cycle.
//
// Common use cases for untyped resources:
// - Optional monitoring resources (when Prometheus operator might not be installed)
// - Feature flag controlled third-party integrations
// - Environment-specific operator resources
// - Conditional third-party service configurations
//
// Example:
//
// .WithSkipAndDeleteOnCondition(func() bool {
// cr := ctx.GetCustomResource()
//
// // Only create monitoring resources if monitoring is enabled
// if !cr.Spec.Monitoring.Enabled {
// return true
// }
//
// // Check if the required operator is available
// client := ctx.GetClient()
// prometheusOperator := &appsv1.Deployment{}
// err := client.Get(ctx, types.NamespacedName{
// Name: "prometheus-operator",
// Namespace: "monitoring",
// }, prometheusOperator)
//
// // Skip if operator not found or not ready
// return err != nil || prometheusOperator.Status.ReadyReplicas == 0
// })
func (b *UntypedResourceBuilder[CustomResource, ContextType]) WithSkipAndDeleteOnCondition(f func() bool) *UntypedResourceBuilder[CustomResource, ContextType] {
b.inner = b.inner.WithSkipAndDeleteOnCondition(f)
return b
}
// WithUserIdentifier assigns a custom identifier for this untyped resource.
//
// This identifier is used for logging, debugging, and distinguishing between multiple
// untyped resources. It's especially important for untyped resources since the resource
// types may not be immediately obvious from logs, and you might be managing multiple
// third-party resources of different types.
//
// The identifier should be descriptive and include both the resource purpose and type.
//
// Example:
//
// .WithUserIdentifier("prometheus-servicemonitor") // Clear purpose and type
// .WithUserIdentifier("grafana-dashboard-app") // Identifies both operator and purpose
// .WithUserIdentifier("istio-virtualservice") // Service mesh resource identifier
func (b *UntypedResourceBuilder[CustomResource, ContextType]) WithUserIdentifier(identifier string) *UntypedResourceBuilder[CustomResource, ContextType] {
b.inner = b.inner.WithUserIdentifier(identifier)
return b
}
// WithCanBePaused specifies whether this resource supports pausing reconciliation.
//
// When set to true, the resource will respect the paused state of the custom resource.
// If the custom resource is marked as paused (e.g., via a label), reconciliation
// for this resource will be skipped until the pause is lifted.
//
// This is useful for scenarios where you want to temporarily halt changes to
// certain resources without deleting them or affecting other parts of the system.
//
// Common use cases:
// - Temporarily halting updates during maintenance windows
// - Pausing non-critical resources while troubleshooting issues
// - Allowing manual intervention before resuming automated management
//
// Example:
//
// .WithCanBePaused(true) // Enable pausing for this resource
func (b *UntypedResourceBuilder[CustomResource, ContextType]) WithCanBePaused(canBePaused bool) *UntypedResourceBuilder[CustomResource, ContextType] {
b.inner = b.inner.WithCanBePaused(canBePaused)
return b
}
// WithCanBePausedFunc specifies a function to determine if this resource supports pausing reconciliation.
//
// The provided function is called during reconciliation to check if the resource
// should respect the paused state of the custom resource. If it returns true,
// reconciliation for this resource will be skipped when the custom resource is paused.
//
// This allows for dynamic control over which resources can be paused based on
// the current state or configuration of the custom resource.
//
// Example:
//
// .WithCanBePausedFunc(func() bool {
// // Custom logic to determine if the resource can be paused
// return someCondition
// })
func (b *UntypedResourceBuilder[CustomResource, ContextType]) WithCanBePausedFunc(f func() bool) *UntypedResourceBuilder[CustomResource, ContextType] {
b.inner = b.inner.WithCanBePausedFunc(f)
return b
}
package ctrlfwk
import (
"fmt"
"reflect"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// SetReadyCondition is a function type that sets the Ready condition on a controller resource.
// It uses reflection and assumes that the controller resource has a standard status field with conditions.
// Your api MUST have a field like so:
//
// type MyCustomResourceStatus struct {
// Conditions []metav1.Condition `json:"conditions,omitempty"`
// ...
// }
//
// If your status field or conditions field is named differently, this function will not work correctly.
func SetReadyCondition[ControllerResourceType client.Object](_ Reconciler[ControllerResourceType]) func(obj ControllerResourceType) (bool, error) {
return func(obj ControllerResourceType) (bool, error) {
// Use reflection to set the Ready condition
objValue := reflect.ValueOf(obj)
if objValue.Kind() == reflect.Ptr {
objValue = objValue.Elem()
}
statusField := objValue.FieldByName("Status")
if !statusField.IsValid() {
return false, fmt.Errorf("status field not found on controller resource")
}
conditionsField := statusField.FieldByName("Conditions")
if !conditionsField.IsValid() || conditionsField.Kind() != reflect.Slice {
return false, fmt.Errorf("conditions field not found or is not a slice on status")
}
conditions := conditionsField.Interface().([]metav1.Condition)
readyCondition := metav1.Condition{
Type: "Ready",
Status: metav1.ConditionTrue,
Reason: "Reconciled",
Message: "The resource is ready",
LastTransitionTime: metav1.Now(),
ObservedGeneration: obj.GetGeneration(),
}
changed := meta.SetStatusCondition(&conditions, readyCondition)
if !changed {
return false, nil
}
conditionsField.Set(reflect.ValueOf(conditions))
return changed, nil
}
}
// PatchCustomResourceStatus patches the status subresource of the custom resource stored in the context.
// This function assumes that the context contains a ReconcilerContextData with the CustomResource field populated.
// The step "FindControllerResource" does exactly that, populating the context.
//
// It also sets the updated custom resource back into the context after patching.
func PatchCustomResourceStatus[CustomResourceType client.Object](ctx Context[CustomResourceType], reconciler Reconciler[CustomResourceType]) error {
// Get the custom resource from the context
cleanObject := ctx.GetCleanCustomResource()
modifiableObject := ctx.GetCustomResource()
// Patch the status subresource
err := reconciler.Status().Patch(ctx, modifiableObject, client.MergeFrom(cleanObject))
if err != nil {
return err
}
ctx.SetCustomResource(modifiableObject)
return nil
}
package ctrlfwk
import (
"fmt"
"github.com/go-logr/logr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
func NewAddFinalizerStep[
ControllerResourceType ControllerCustomResource,
ContextType Context[ControllerResourceType],
](
_ ContextType,
reconciler Reconciler[ControllerResourceType],
finalizerName string,
) Step[ControllerResourceType, ContextType] {
return Step[ControllerResourceType, ContextType]{
Name: fmt.Sprintf(StepAddFinalizer, finalizerName),
Step: func(ctx ContextType, logger logr.Logger, req ctrl.Request) StepResult {
cr := ctx.GetCustomResource()
if IsFinalizing(cr) {
return ResultSuccess()
}
changed := controllerutil.AddFinalizer(cr, finalizerName)
if changed {
err := reconciler.Patch(ctx, cr, client.MergeFrom(ctx.GetCleanCustomResource()))
if err != nil {
return ResultInError(err)
}
}
return ResultSuccess()
},
}
}
package ctrlfwk
import (
"github.com/go-logr/logr"
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
)
func NewResolveDynamicDependenciesStep[
ControllerResourceType ControllerCustomResource,
ContextType Context[ControllerResourceType],
](
_ ContextType,
reconciler ReconcilerWithDependencies[ControllerResourceType, ContextType],
) Step[ControllerResourceType, ContextType] {
return Step[ControllerResourceType, ContextType]{
Name: StepResolveDependencies,
Step: func(ctx ContextType, logger logr.Logger, req ctrl.Request) StepResult {
dependencies, err := reconciler.GetDependencies(ctx, req)
if err != nil {
return ResultInError(errors.Wrap(err, "failed to get dependencies"))
}
var returnResults []StepResult
// Add the finalizer to clean up "managed by" references
// in dependencies when the CR is deleted
subStep := NewAddFinalizerStep(ctx, reconciler, FinalizerDependenciesManagedBy)
result := subStep.Step(ctx, logger, req)
if result.ShouldReturn() {
return result.FromSubStep()
}
for _, dependency := range dependencies {
subStepLogger := logger.WithValues("dependency", dependency.ID())
subStep := NewResolveDependencyStep(ctx, reconciler, dependency)
result := subStep.Step(ctx, subStepLogger, req)
if result.ShouldReturn() {
subStepLogger.Info("Dependency resolution resulted in early return or error")
returnResults = append(returnResults, result)
continue
}
subStepLogger.Info("Resolved dependency successfully")
}
// Return result errors first
for _, result := range returnResults {
if result.err != nil {
if IsFinalizing(ctx.GetCustomResource()) && apierrors.IsNotFound(result.err) {
continue
}
return result
}
}
// Remove the finalizer, ExecuteFinalizerStep will handle actual removal when finalizing
subStep = NewExecuteFinalizerStep(ctx, reconciler, FinalizerDependenciesManagedBy, NilFinalizerFunc)
result = subStep.Step(ctx, logger, req)
if result.ShouldReturn() {
return result.FromSubStep()
}
for _, result := range returnResults {
if result.ShouldReturn() {
return result
}
}
return ResultSuccess()
},
}
}
package ctrlfwk
import (
"fmt"
"time"
"github.com/go-logr/logr"
"github.com/pkg/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
func NewResolveDependencyStep[
ControllerResourceType ControllerCustomResource,
ContextType Context[ControllerResourceType],
](
_ ContextType,
reconciler Reconciler[ControllerResourceType],
dependency GenericDependency[ControllerResourceType, ContextType],
) Step[ControllerResourceType, ContextType] {
return Step[ControllerResourceType, ContextType]{
Name: fmt.Sprintf(StepResolveDependency, dependency.Kind()),
Step: func(ctx ContextType, logger logr.Logger, req ctrl.Request) StepResult {
var dep client.Object
funcResult := func() StepResult {
if err := dependency.BeforeReconcile(ctx); err != nil {
return ResultInError(errors.Wrap(err, "failed to run BeforeReconcile hook"))
}
cr := ctx.GetCustomResource()
depKey := dependency.Key()
dep = dependency.New()
err := reconciler.Get(ctx, depKey, dep)
if err != nil {
if client.IgnoreNotFound(err) != nil {
return ResultInError(errors.Wrap(err, "failed to get dependency resource"))
}
if IsFinalizing(cr) {
return ResultSuccess()
}
return ResultRequeueIn(30 * time.Second)
}
cleanDep := dep.DeepCopyObject().(client.Object)
dependency.Set(dep)
if IsFinalizing(cr) {
changed, err := RemoveManagedBy(dep, cr, reconciler.Scheme())
if client.IgnoreNotFound(err) != nil {
return ResultInError(err)
}
if changed {
if err := reconciler.Patch(ctx, dep, client.MergeFrom(cleanDep)); err != nil {
return ResultInError(err)
}
}
return ResultSuccess()
}
if dependency.ShouldAddManagedByAnnotation() {
// Setup watch if we can
reconcilerWithWatcher, ok := reconciler.(ReconcilerWithWatcher[ControllerResourceType])
if ok {
result := SetupWatch(reconcilerWithWatcher, dep, true)(ctx, req)
if result.ShouldReturn() {
return result.FromSubStep()
}
}
changed, err := AddManagedBy(dep, cr, reconciler.Scheme())
if err != nil {
return ResultInError(err)
}
if changed {
if err := reconciler.Patch(ctx, dep, client.MergeFrom(cleanDep)); err != nil {
return ResultInError(err)
}
}
}
if dependency.ShouldWaitForReady() && !dependency.IsReady() {
return ResultRequeueIn(30 * time.Second)
}
return ResultSuccess()
}()
if err := dependency.AfterReconcile(ctx, dep); err != nil {
return ResultInError(errors.Wrap(err, "failed to run AfterReconcile hook"))
}
return funcResult
},
}
}
package ctrlfwk
import (
"github.com/go-logr/logr"
"github.com/pkg/errors"
ctrl "sigs.k8s.io/controller-runtime"
)
func NewEndStep[
ControllerResourceType ControllerCustomResource,
ContextType Context[ControllerResourceType],
](
_ ContextType,
reconciler Reconciler[ControllerResourceType],
setReadyCondF func(ControllerResourceType) (bool, error),
) Step[ControllerResourceType, ContextType] {
return Step[ControllerResourceType, ContextType]{
Name: StepEndReconciliation,
Step: func(ctx ContextType, logger logr.Logger, req ctrl.Request) StepResult {
cr := ctx.GetCustomResource()
// Set Ready condition
if setReadyCondF != nil {
changed, err := setReadyCondF(cr)
if err != nil {
return ResultInError(errors.Wrap(err, "failed to set ready condition"))
}
if changed {
if err = PatchCustomResourceStatus(ctx, reconciler); err != nil {
return ResultInError(errors.Wrap(err, "failed to update controller resource"))
}
}
}
return ResultSuccess()
},
}
}
package ctrlfwk
import (
"context"
"fmt"
"github.com/go-logr/logr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
type FinalizingFunc func(ctx context.Context, logger logr.Logger, req ctrl.Request) (done bool, err error)
func NilFinalizerFunc(ctx context.Context, logger logr.Logger, req ctrl.Request) (done bool, err error) {
return true, nil
}
func NewExecuteFinalizerStep[
ControllerResourceType ControllerCustomResource,
ContextType Context[ControllerResourceType],
](
_ ContextType,
reconciler Reconciler[ControllerResourceType],
finalizerName string,
finalizerFunc FinalizingFunc,
) Step[ControllerResourceType, ContextType] {
return Step[ControllerResourceType, ContextType]{
Name: fmt.Sprintf(StepExecuteFinalizer, finalizerName),
Step: func(ctx ContextType, logger logr.Logger, req ctrl.Request) StepResult {
cr := ctx.GetCustomResource()
if !IsFinalizing(cr) {
return ResultSuccess()
}
done, err := finalizerFunc(ctx, logger, req)
if err != nil {
return ResultInError(err)
}
if done {
// Remove finalizer from CR
changed := controllerutil.RemoveFinalizer(cr, finalizerName)
if changed {
err := reconciler.Patch(ctx, cr, client.MergeFrom(ctx.GetCleanCustomResource()))
if err != nil {
return ResultInError(err)
}
}
}
return ResultSuccess()
},
}
}
package ctrlfwk
import (
"github.com/go-logr/logr"
"github.com/pkg/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
func NewFindControllerCustomResourceStep[
ControllerResourceType ControllerCustomResource,
ContextType Context[ControllerResourceType],
](
_ ContextType,
reconciler Reconciler[ControllerResourceType],
) Step[ControllerResourceType, ContextType] {
return Step[ControllerResourceType, ContextType]{
Name: StepFindControllerCustomResource,
Step: func(ctx ContextType, logger logr.Logger, req ctrl.Request) StepResult {
cr := ctx.GetCustomResource()
// Get the controller resource from the client
err := reconciler.Get(ctx, req.NamespacedName, cr)
if err != nil {
if client.IgnoreNotFound(err) != nil {
// If the resource is not found, return early
return ResultInError(errors.Wrap(err, "failed to get controller resource"))
}
return ResultEarlyReturn()
}
// Check labels for pause
labels := cr.GetLabels()
if labels != nil {
if _, ok := labels[LabelReconciliationPaused]; ok {
logger.Info("Reconciliation is paused for this resource, skipping further steps")
return ResultEarlyReturn()
}
}
// Set the controller resource in the reconciler
ctx.SetCustomResource(cr)
return ResultSuccess()
},
}
}
package ctrlfwk
import (
"fmt"
"github.com/go-logr/logr"
"github.com/pkg/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
func NewReconcileResourceStep[
ControllerResourceType ControllerCustomResource,
ContextType Context[ControllerResourceType],
](
_ ContextType,
reconciler Reconciler[ControllerResourceType],
resource GenericResource[ControllerResourceType, ContextType],
) Step[ControllerResourceType, ContextType] {
return Step[ControllerResourceType, ContextType]{
Name: fmt.Sprintf(StepReconcileResource, resource.Kind()),
Step: func(ctx ContextType, logger logr.Logger, req ctrl.Request) StepResult {
var desired client.Object
var result StepResult
funcResult := func() StepResult {
cr := ctx.GetCustomResource()
if IsFinalizing(cr) {
// If the resource does not require deletion, we can just finish here, it's gonna get garbage collected
if !resource.RequiresManualDeletion(resource.Get()) {
if err := resource.OnFinalize(ctx, desired); err != nil {
return ResultInError(errors.Wrap(err, "failed to run OnFinalize hook"))
}
return ResultSuccess()
}
}
if err := resource.BeforeReconcile(ctx); err != nil {
return ResultInError(errors.Wrap(err, "failed to run BeforeReconcile hook"))
}
desired, result = getDesiredObject(reconciler, resource)(ctx, req)
if result.ShouldReturn() {
return result.FromSubStep()
}
if resource.CanBePaused() {
labels := cr.GetLabels()
if labels != nil {
if _, ok := labels[LabelReconciliationPaused]; ok {
logger.Info("Reconciliation is paused for this resource, skipping reconciliation step")
return ResultSuccess()
}
}
}
if IsFinalizing(cr) {
if err := reconciler.Delete(ctx, desired); client.IgnoreNotFound(err) != nil {
return ResultInError(errors.Wrap(err, "failed to delete resource"))
}
if err := resource.OnFinalize(ctx, desired); err != nil {
return ResultInError(errors.Wrap(err, "failed to run OnFinalize hook"))
}
return ResultSuccess()
}
// Setup watch if we can
reconcilerWithWatcher, ok := reconciler.(ReconcilerWithWatcher[ControllerResourceType])
if ok {
result = SetupWatch(reconcilerWithWatcher, desired, false)(ctx, req)
if result.ShouldReturn() {
return result.FromSubStep()
}
}
patchResult, err := controllerutil.CreateOrPatch(ctx, reconciler, desired, resource.GetMutator(desired))
if err != nil {
return ResultInError(errors.Wrap(err, "failed to create or patch resource"))
}
resource.Set(desired)
switch patchResult {
case controllerutil.OperationResultCreated:
if err := resource.OnCreate(ctx, desired); err != nil {
return ResultInError(errors.Wrap(err, "failed to run OnCreate hook"))
}
case controllerutil.OperationResultUpdated:
if err := resource.OnUpdate(ctx, desired); err != nil {
return ResultInError(errors.Wrap(err, "failed to run OnUpdate hook"))
}
}
if !resource.IsReady(desired) {
return ResultEarlyReturn()
}
return ResultSuccess()
}()
if err := resource.AfterReconcile(ctx, desired); err != nil {
return ResultInError(errors.Wrap(err, "failed to run AfterReconcile hook"))
}
return funcResult
},
}
}
func getDesiredObject[
ControllerResourceType ControllerCustomResource,
ContextType Context[ControllerResourceType],
](
reconciler Reconciler[ControllerResourceType],
resource GenericResource[ControllerResourceType, ContextType],
) func(ctx ContextType, req ctrl.Request) (client.Object, StepResult) {
return func(ctx ContextType, req ctrl.Request) (client.Object, StepResult) {
desired, delete, err := resource.ObjectMetaGenerator()
if delete {
if desired != nil && desired.GetName() != "" {
err := reconciler.Delete(ctx, desired)
if client.IgnoreNotFound(err) != nil {
return nil, ResultInError(errors.Wrap(err, "failed to delete resource"))
}
if err == nil {
if err := resource.OnDelete(ctx, desired); err != nil {
return nil, ResultInError(errors.Wrap(err, "failed to run OnDelete hook"))
}
}
}
return nil, ResultEarlyReturn()
}
if err != nil {
return nil, ResultInError(errors.Wrap(err, "failed to generate resource"))
}
return desired, ResultSuccess()
}
}
package ctrlfwk
import (
"github.com/go-logr/logr"
"github.com/pkg/errors"
ctrl "sigs.k8s.io/controller-runtime"
)
func NewReconcileResourcesStep[
ControllerResourceType ControllerCustomResource,
ContextType Context[ControllerResourceType],
](
_ ContextType,
reconciler ReconcilerWithResources[ControllerResourceType, ContextType],
) Step[ControllerResourceType, ContextType] {
return Step[ControllerResourceType, ContextType]{
Name: StepReconcileResources,
Step: func(ctx ContextType, logger logr.Logger, req ctrl.Request) StepResult {
resources, err := reconciler.GetResources(ctx, req)
if err != nil {
return ResultInError(errors.Wrap(err, "failed to get resources"))
}
var returnResults []StepResult
for _, resource := range resources {
subStepLogger := logger.WithValues("resource", resource.ID())
subStep := NewReconcileResourceStep(ctx, reconciler, resource)
result := subStep.Step(ctx, subStepLogger, req)
if result.ShouldReturn() {
subStepLogger.Info("Resource reconciliation resulted in early return or error")
returnResults = append(returnResults, result)
continue
}
subStepLogger.Info("Reconciled resource successfully")
}
// Return result errors first
for _, result := range returnResults {
if result.err != nil {
return result
}
}
for _, result := range returnResults {
if result.ShouldReturn() {
return result
}
}
return ResultSuccess()
},
}
}
package ctrlfwk
import (
"time"
"github.com/go-logr/logr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// Stepper is a utility to execute a series of steps in a controller.
// It allows for easy chaining of steps and handling of errors and requeues.
// Each step can be a function that returns a StepResult, which indicates
// whether to continue, requeue, or return an error.
// The Stepper can be used in a controller's Reconcile function to manage
// the execution of multiple steps in a clean and organized manner.
type Stepper[K client.Object, C Context[K]] struct {
logger logr.Logger
steps []Step[K, C]
}
type StepperBuilder[K client.Object, C Context[K]] struct {
logger logr.Logger
steps []Step[K, C]
}
func NewStepperFor[K client.Object, C Context[K]](ctx C, logger logr.Logger) *StepperBuilder[K, C] {
return &StepperBuilder[K, C]{
logger: logger,
steps: []Step[K, C]{},
}
}
// WithLogger sets the logger for the Stepper.
func (s *StepperBuilder[K, C]) WithStep(step Step[K, C]) *StepperBuilder[K, C] {
s.steps = append(s.steps, step)
return s
}
// WithLogger sets the logger for the Stepper.
func (s *StepperBuilder[K, C]) Build() *Stepper[K, C] {
return &Stepper[K, C]{
logger: s.logger,
steps: s.steps,
}
}
type StepResult struct {
earlyReturn bool
err error
requeueAfter time.Duration
}
func (result StepResult) ShouldReturn() bool {
return result.err != nil || result.requeueAfter > 0 || result.earlyReturn
}
func (result StepResult) FromSubStep() StepResult {
result.earlyReturn = false
return result
}
func (result StepResult) Normal() (ctrl.Result, error) {
if result.err != nil {
return ctrl.Result{}, result.err
}
if result.requeueAfter > 0 {
return ctrl.Result{RequeueAfter: result.requeueAfter}, nil
}
return ctrl.Result{}, nil
}
func ResultInError(err error) StepResult {
return StepResult{
err: err,
}
}
func ResultRequeueIn(result time.Duration) StepResult {
return StepResult{
requeueAfter: result,
}
}
func ResultEarlyReturn() StepResult {
return StepResult{
earlyReturn: true,
}
}
func ResultSuccess() StepResult {
return StepResult{}
}
type Step[K client.Object, C Context[K]] struct {
// Name is the name of the step
Name string
// Step is the function to execute
Step func(ctx C, logger logr.Logger, req ctrl.Request) StepResult
}
func NewStep[K client.Object, C Context[K]](name string, step func(ctx C, logger logr.Logger, req ctrl.Request) StepResult) Step[K, C] {
return Step[K, C]{
Name: name,
Step: step,
}
}
func (stepper *Stepper[K, C]) Execute(ctx C, req ctrl.Request) (ctrl.Result, error) {
logger := stepper.logger
startedAt := time.Now()
logger.Info("Inserting line return for lisibility\n\n")
logger.Info("Starting stepper execution")
for _, step := range stepper.steps {
stepStartedAt := time.Now()
result := step.Step(ctx, logger, req)
stepDuration := time.Since(stepStartedAt)
if result.ShouldReturn() {
if result.err != nil {
if IsFinalizing(ctx.GetCustomResource()) && apierrors.IsNotFound(result.err) {
logger.Info("Resource not found during finalization, ignoring error", "step", step.Name, "stepDuration", stepDuration)
return ResultRequeueIn(1 * time.Second).Normal()
}
logger.Error(result.err, "Error in step", "step", step.Name, "stepDuration", stepDuration)
} else if result.requeueAfter > 0 {
logger.Info("Requeueing after step", "step", step.Name, "after", result.requeueAfter, "stepDuration", stepDuration)
} else {
logger.Info("Early return after step", "step", step.Name, "stepDuration", stepDuration)
}
return result.Normal()
}
logger.Info("Executed step", "step", step.Name, "stepDuration", stepDuration)
}
logger.Info("All steps executed successfully", "duration", time.Since(startedAt))
return ctrl.Result{}, nil
}
package ctrlfwk
import "sigs.k8s.io/controller-runtime/pkg/client"
func IsFinalizing(obj client.Object) bool {
return !obj.GetDeletionTimestamp().IsZero()
}
package ctrlfwk
import (
"github.com/pkg/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
func SetupWatch[
ControllerResourceType ControllerCustomResource,
](
reconciler ReconcilerWithWatcher[ControllerResourceType],
object client.Object,
isDependency bool,
) func(ctx Context[ControllerResourceType], req ctrl.Request) StepResult {
return func(ctx Context[ControllerResourceType], req ctrl.Request) StepResult {
// Setup watch if not already set
var partialObject metav1.PartialObjectMetadata
var partialObjectInterface client.Object = &partialObject
var gvk schema.GroupVersionKind
var err error
if object.GetObjectKind().GroupVersionKind().Kind != "" {
gvk = object.GetObjectKind().GroupVersionKind()
partialObject.SetGroupVersionKind(object.GetObjectKind().GroupVersionKind())
} else {
gvk, err = apiutil.GVKForObject(object, reconciler.Scheme())
if err != nil {
return ResultInError(errors.Wrap(err, "failed to get GVK for object"))
}
partialObject.SetGroupVersionKind(gvk)
}
watchSource := NewWatchKey(gvk, CacheTypeEnqueueForOwner)
if !reconciler.IsWatchingSource(watchSource) {
requestHandler := handler.EnqueueRequestForOwner(reconciler.GetScheme(), reconciler.GetRESTMapper(), ctx.GetCustomResource())
if isDependency {
managedByHandler, err := GetManagedByReconcileRequests(ctx.GetCustomResource(), reconciler.GetScheme())
if err != nil {
return ResultInError(errors.Wrap(err, "failed to add watch source"))
}
requestHandler = handler.EnqueueRequestsFromMapFunc(managedByHandler)
}
// Add the watch source to the reconciler
err := reconciler.GetController().Watch(
source.Kind(
reconciler.GetCache(),
partialObjectInterface,
requestHandler,
ResourceVersionChangedPredicate{},
),
)
if err != nil {
return ResultInError(errors.Wrap(err, "failed to add watch source"))
}
reconciler.AddWatchSource(watchSource)
}
return ResultSuccess()
}
}
type ResourceVersionChangedPredicate struct {
predicate.Funcs
}
func (ResourceVersionChangedPredicate) Update(e event.UpdateEvent) bool {
return e.ObjectOld.GetResourceVersion() != e.ObjectNew.GetResourceVersion()
}
func (ResourceVersionChangedPredicate) Create(e event.CreateEvent) bool {
return false
}
func (ResourceVersionChangedPredicate) Delete(e event.DeleteEvent) bool {
return true
}
func (ResourceVersionChangedPredicate) Generic(e event.GenericEvent) bool {
return true
}
package ctrlfwk
import (
"k8s.io/apimachinery/pkg/runtime/schema"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
type WatchCacheKey string
type WatchCacheType string
const (
CacheTypeEnqueueForOwner WatchCacheType = "enqueueForOwner"
)
type Watcher interface {
ctrl.Manager
// AddWatchSource adds a watch source to the cache
AddWatchSource(key WatchCacheKey)
// IsWatchSource checks if the key is a watch source
IsWatchingSource(key WatchCacheKey) bool
// GetController returns the controller for the watch cache
GetController() controller.TypedController[reconcile.Request]
}
type WatchCache struct {
cache map[WatchCacheKey]bool
controller controller.TypedController[reconcile.Request]
ctrl.Manager
}
func NewWatchCache(mgr ctrl.Manager) WatchCache {
return WatchCache{
cache: make(map[WatchCacheKey]bool),
Manager: mgr,
}
}
func NewWatchKey(gvk schema.GroupVersionKind, watchType WatchCacheType) WatchCacheKey {
return WatchCacheKey(gvk.String() + "/" + string(watchType))
}
func (w *WatchCache) AddWatchSource(key WatchCacheKey) {
if w.cache == nil {
w.cache = make(map[WatchCacheKey]bool)
}
w.cache[key] = true
}
func (w *WatchCache) IsWatchingSource(key WatchCacheKey) bool {
if w.cache == nil {
return false
}
_, ok := w.cache[key]
return ok
}
func (w *WatchCache) GetController() controller.TypedController[reconcile.Request] {
return w.controller
}
func (w *WatchCache) SetController(ctrler controller.TypedController[reconcile.Request]) {
w.controller = ctrler
}