Skip to content

Controller Reconciliation

Reconciliation is the core pattern in Kubernetes operators. Understanding how TinySystems controllers reconcile ensures your components work correctly.

Reconciliation Pattern

┌─────────────────────────────────────────────────────────────────────────────┐
│                      RECONCILIATION PATTERN                                  │
└─────────────────────────────────────────────────────────────────────────────┘

Desired State (Spec)         Actual State (Status/Runtime)
       │                              │
       │                              │
       ▼                              ▼
┌─────────────┐              ┌─────────────┐
│   TinyNode  │              │   Runner    │
│   .Spec     │──────────────│   Instance  │
│             │  Reconcile   │             │
│ - module    │─────────────▶│ - component │
│ - component │              │ - ports     │
│ - edges     │              │ - state     │
└─────────────┘              └─────────────┘
       │                              │
       │                              │
       └──────────────┬───────────────┘


              Update Status
              (ports, metadata, error)

TinyNode Reconciliation

The TinyNodeReconciler handles node lifecycle:

go
// tinynode_controller.go
func (r *TinyNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    log := log.FromContext(ctx)

    // 1. Fetch the TinyNode
    node := &v1alpha1.TinyNode{}
    if err := r.Get(ctx, req.NamespacedName, node); err != nil {
        if errors.IsNotFound(err) {
            // Node deleted - cleanup
            r.Scheduler.Destroy(req.Name)
            return ctrl.Result{}, nil
        }
        return ctrl.Result{}, err
    }

    // 2. Check if this node belongs to our module
    if node.Spec.Module != r.Module.Name {
        return ctrl.Result{}, nil  // Not our node
    }

    // 3. Update or create runner instance
    if err := r.Scheduler.Update(ctx, node); err != nil {
        log.Error(err, "failed to update scheduler")
        return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
    }

    // 4. Only leader updates status
    if !r.IsLeader.Load() {
        // Non-leader: requeue to stay in sync
        return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
    }

    // 5. Check if status update needed
    if node.Status.ObservedGeneration >= node.Generation {
        // Already up to date, periodic requeue
        return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
    }

    // 6. Build and update status
    node.Status = r.buildStatus(node)
    node.Status.ObservedGeneration = node.Generation

    if err := r.Status().Update(ctx, node); err != nil {
        log.Error(err, "failed to update status")
        return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
    }

    return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
}

Reconciliation Triggers

Controllers reconcile on these events:

EventTriggerAction
CreateNew TinyNode CRCreate runner instance
UpdateSpec changesUpdate runner, rebuild ports
DeleteCR deletedDestroy runner instance
PeriodicTimer (5 min)Refresh state, sync status

Idempotent Reconciliation

Reconciliation must be idempotent - running it multiple times produces the same result:

go
func (r *TinyNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    node := &v1alpha1.TinyNode{}
    r.Get(ctx, req.NamespacedName, node)

    // Idempotent: Check if already processed this generation
    if node.Status.ObservedGeneration >= node.Generation {
        // No changes, skip processing
        return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
    }

    // Process changes...
    node.Status.ObservedGeneration = node.Generation
    r.Status().Update(ctx, node)

    return ctrl.Result{}, nil
}

Scheduler Update

The scheduler creates or updates runner instances:

go
// scheduler.go
func (s *Scheduler) Update(ctx context.Context, node *v1alpha1.TinyNode) error {
    // Upsert: Create if not exists, update if exists
    s.instancesMap.Upsert(node.Name, nil, func(exist bool, old, new *runner.Runner) *runner.Runner {
        if exist && old != nil {
            // Update existing runner
            old.UpdateNode(node)
            return old
        }

        // Create new runner
        component := s.getComponent(node.Spec.Component)
        if component == nil {
            return nil
        }

        r := runner.New(runner.Config{
            Node:      node,
            Component: component.Instance(),
            Scheduler: s,
        })

        // Initialize with settings
        go r.Initialize(ctx)

        return r
    })

    return nil
}

Reconcile Port

Components receive periodic reconciliation via the _reconcile port:

go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    switch port {
    case v1alpha1.ReconcilePort:
        // Periodic reconciliation (every 5 minutes)
        node, ok := msg.(v1alpha1.TinyNode)
        if !ok {
            return nil
        }

        // Read shared metadata
        if val, ok := node.Status.Metadata["key"]; ok {
            c.cachedValue = val
        }

        // Cleanup stale resources
        c.cleanupOldConnections()

        // Update shared state (leader only)
        if utils.IsLeader(ctx) {
            output(ctx, v1alpha1.ReconcilePort, func(n *v1alpha1.TinyNode) {
                n.Status.Metadata["last-reconcile"] = time.Now().Format(time.RFC3339)
            })
        }

        return nil
    }
    return nil
}

Requeue Strategies

Controllers use different requeue strategies:

go
// Immediate requeue (error recovery)
return ctrl.Result{Requeue: true}, nil

// Delayed requeue (rate limiting)
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil

// Periodic requeue (refresh)
return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil

// No requeue (wait for next event)
return ctrl.Result{}, nil

// Error requeue (exponential backoff)
return ctrl.Result{}, err

Status Building

Controllers build status from component state:

go
func (r *TinyNodeReconciler) buildStatus(node *v1alpha1.TinyNode) v1alpha1.TinyNodeStatus {
    runner, exists := r.Scheduler.GetInstance(node.Name)
    if !exists {
        return v1alpha1.TinyNodeStatus{
            Error: "runner not found",
        }
    }

    // Build port status from component
    ports := make([]v1alpha1.PortStatus, 0)
    for _, port := range runner.Component.Ports() {
        schema, _ := schema.Generate(port.Configuration)
        ports = append(ports, v1alpha1.PortStatus{
            Name:          port.Name,
            Label:         port.Label,
            Position:      int(port.Position),
            Source:        port.Source,
            Schema:        string(schema),
            Configuration: runner.GetPortConfig(port.Name),
        })
    }

    return v1alpha1.TinyNodeStatus{
        ModuleName: r.Module.Name,
        Component:  node.Spec.Component,
        Ports:      ports,
        Metadata:   runner.GetMetadata(),
        Error:      runner.GetError(),
    }
}

Watch Configuration

Controllers configure watches for efficient reconciliation:

go
func (r *TinyNodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&v1alpha1.TinyNode{}).
        WithEventFilter(predicate.GenerationChangedPredicate{}).  // Only spec changes
        WithOptions(controller.Options{
            MaxConcurrentReconciles: 10,  // Parallel reconciliation
        }).
        Complete(r)
}

Generation Changed Predicate

The GenerationChangedPredicate filters to only spec changes:

go
// Only reconcile when spec changes (generation increments)
// Ignores status-only updates
predicate.GenerationChangedPredicate{}

This prevents infinite loops when updating status.

Error Handling in Reconciliation

go
func (r *TinyNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    // Transient errors: return error for exponential backoff
    if err := r.Scheduler.Update(ctx, node); err != nil {
        if isTransient(err) {
            return ctrl.Result{}, err  // Will retry with backoff
        }
        // Permanent error: log and don't requeue
        log.Error(err, "permanent error")
        return ctrl.Result{}, nil
    }

    // Rate limit errors: explicit requeue delay
    if err := r.Status().Update(ctx, node); err != nil {
        if errors.IsConflict(err) {
            return ctrl.Result{RequeueAfter: 1 * time.Second}, nil
        }
        return ctrl.Result{}, err
    }

    return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
}

Best Practices

1. Check Generation

go
if node.Status.ObservedGeneration >= node.Generation {
    return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
}

2. Leader-Only Writes

go
if !r.IsLeader.Load() {
    return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}

3. Handle Not Found

go
if errors.IsNotFound(err) {
    r.Scheduler.Destroy(req.Name)
    return ctrl.Result{}, nil
}

4. Use Appropriate Requeue

go
// Error: exponential backoff
return ctrl.Result{}, err

// Conflict: short delay
return ctrl.Result{RequeueAfter: 1 * time.Second}, nil

// Periodic: long delay
return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil

Next Steps

Build flow-based applications on Kubernetes