Skip to content

Leader-Reader Pattern

The leader-reader pattern is how TinySystems modules coordinate across multiple replicas. Understanding this pattern is essential for building scalable components.

Overview

┌─────────────────────────────────────────────────────────────────────────────┐
│                       LEADER-READER PATTERN                                  │
└─────────────────────────────────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────────────────────────┐
│                         KUBERNETES LEASE                                      │
│                                                                               │
│   Only one pod holds the lease at a time                                     │
│   Lease holder = LEADER                                                       │
│   Other pods = READERS                                                        │
└──────────────────────────────────────────────────────────────────────────────┘

          ┌─────────────────────────┼─────────────────────────┐
          │                         │                         │
          ▼                         ▼                         ▼
   ┌─────────────┐           ┌─────────────┐           ┌─────────────┐
   │   LEADER    │           │   READER    │           │   READER    │
   │             │           │             │           │             │
   │ ┌─────────┐ │           │ ┌─────────┐ │           │ ┌─────────┐ │
   │ │  Write  │ │           │ │  Watch  │ │           │ │  Watch  │ │
   │ │ CRs     │ │           │ │  Only   │ │           │ │  Only   │ │
   │ └─────────┘ │           │ └─────────┘ │           │ └─────────┘ │
   │             │           │             │           │             │
   │ ┌─────────┐ │           │ ┌─────────┐ │           │ ┌─────────┐ │
   │ │ Process │ │           │ │ Handle  │ │           │ │ Handle  │ │
   │ │ Signals │ │           │ │Messages │ │           │ │Messages │ │
   │ └─────────┘ │           │ └─────────┘ │           │ └─────────┘ │
   │             │           │             │           │             │
   │ ┌─────────┐ │           │ ┌─────────┐ │           │ ┌─────────┐ │
   │ │ Update  │ │           │ │  Local  │ │           │ │  Local  │ │
   │ │ Status  │ │           │ │Reconcile│ │           │ │Reconcile│ │
   │ └─────────┘ │           │ └─────────┘ │           │ └─────────┘ │
   └─────────────┘           └─────────────┘           └─────────────┘

Responsibilities

Leader Pod

ResponsibilityDescription
Update TinyModulePublish gRPC address and component list
Update TinyNodeUpdate port schemas and metadata
Process TinySignalsExecute signals to avoid duplicates
Expose IngressConfigure external access
Write MetadataShared state updates

Reader Pods

ResponsibilityDescription
Watch CRsStay informed of changes
Handle MessagesProcess incoming gRPC calls
Local ReconcileMaintain component state
Read MetadataUse shared configuration

Checking Leadership

go
import "github.com/tiny-systems/module/pkg/utils"

func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    if utils.IsLeader(ctx) {
        // Leader-only code
    } else {
        // Reader code (or skip)
    }
    return nil
}

Pattern Implementation

Basic Pattern

go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    switch port {
    case v1alpha1.SettingsPort:
        // ALL pods: store settings
        c.settings = msg.(Settings)
        return nil

    case v1alpha1.ControlPort:
        // LEADER only: process control actions
        if !utils.IsLeader(ctx) {
            return nil
        }
        return c.handleControl(ctx, output, msg)

    case v1alpha1.ReconcilePort:
        // ALL pods: read shared state
        // LEADER: also writes state
        return c.handleReconcile(ctx, output, msg)

    case "input":
        // ALL pods: handle incoming messages
        return c.handleInput(ctx, output, msg)
    }
    return nil
}

Signal Component Example

go
// signal.go - Manual flow trigger
func (t *Signal) Handle(ctx context.Context, handler module.Handler, port string, msg any) any {
    switch port {
    case v1alpha1.ControlPort:
        // Only leader processes button clicks
        if !utils.IsLeader(ctx) {
            return nil
        }

        control := msg.(Control)
        if control.Send {
            // Start the flow
            t.mu.Lock()
            if t.cancelFunc != nil {
                t.cancelFunc()
            }
            ctx, t.cancelFunc = context.WithCancel(ctx)
            t.mu.Unlock()

            // Update UI state
            handler(context.Background(), v1alpha1.ReconcilePort, nil)

            // Send to output
            return handler(ctx, OutPort, control.Context)
        }

        if control.Reset {
            t.mu.Lock()
            if t.cancelFunc != nil {
                t.cancelFunc()
                t.cancelFunc = nil
            }
            t.mu.Unlock()

            handler(context.Background(), v1alpha1.ReconcilePort, nil)
            return ctx.Err()
        }
    }
    return nil
}

Ticker Component Example

go
// ticker.go - Periodic emission
func (t *Ticker) Handle(ctx context.Context, handler module.Handler, port string, msg any) any {
    switch port {
    case v1alpha1.ControlPort:
        // Only leader starts/stops the ticker
        if !utils.IsLeader(ctx) {
            return nil
        }

        control := msg.(Control)
        if control.Start {
            return t.startEmitting(ctx, handler)
        }
        if control.Stop {
            t.stopEmitting()
        }
        return nil

    case v1alpha1.ReconcilePort:
        // All pods can read state
        // Leader can update state
        return nil
    }
    return nil
}

func (t *Ticker) startEmitting(ctx context.Context, handler module.Handler) error {
    t.mu.Lock()
    runCtx, cancel := context.WithCancel(ctx)
    t.cancelFunc = cancel
    t.mu.Unlock()

    // Update UI
    handler(context.Background(), v1alpha1.ReconcilePort, nil)

    // Emit loop (blocking)
    timer := time.NewTimer(time.Duration(t.settings.Delay) * time.Millisecond)
    for {
        select {
        case <-timer.C:
            handler(runCtx, OutPort, t.settings.Context)
            timer.Reset(time.Duration(t.settings.Delay) * time.Millisecond)
        case <-runCtx.Done():
            return runCtx.Err()
        }
    }
}

Controller-Level Pattern

Controllers also implement the pattern:

go
// tinynode_controller.go
func (r *TinyNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    node := &v1alpha1.TinyNode{}
    r.Get(ctx, req.NamespacedName, node)

    // ALL pods: update local scheduler
    r.Scheduler.Update(ctx, node)

    // LEADER only: update CR status
    if !r.IsLeader.Load() {
        // Requeue to stay in sync
        return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
    }

    // Leader: build and update status
    node.Status = r.buildStatus(node)
    r.Status().Update(ctx, node)

    return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
}
go
// tinysignal_controller.go
func (r *TinySignalReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    // LEADER only: process signals
    if !r.IsLeader.Load() {
        return ctrl.Result{}, nil
    }

    signal := &v1alpha1.TinySignal{}
    r.Get(ctx, req.NamespacedName, signal)

    // Route to scheduler
    r.Scheduler.Handle(ctx, runner.Msg{
        To:   fmt.Sprintf("%s.%s", signal.Spec.Node, signal.Spec.Port),
        Data: signal.Spec.Data,
    })

    // Delete processed signal
    r.Delete(ctx, signal)

    return ctrl.Result{}, nil
}

Failover Handling

When the leader fails:

Time ──────────────────────────────────────────────────────────────▶

    Leader Pod A              Reader Pod B              Reader Pod C
         │                         │                         │
         │ ◀── Lease held          │                         │
         │                         │                         │
         X (Pod A dies)            │                         │
                                   │                         │
         │ ◀── Lease expires       │                         │
              (15 seconds)         │                         │
                                   │                         │
                                   │ Acquires lease          │
                                   │ ──────────────▶         │
                                   │                         │
                                   │ isLeader = true         │
                                   │                         │
                                   │ Continues operations    │
                                   │                         │
                                   ▼                         ▼

Code for Failover

go
type Component struct {
    cancelFunc context.CancelFunc
    mu         sync.Mutex
    wasLeader  bool
}

func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    if port == v1alpha1.ReconcilePort {
        isLeader := utils.IsLeader(ctx)

        c.mu.Lock()
        defer c.mu.Unlock()

        if isLeader && !c.wasLeader {
            // Just became leader - take over
            c.onBecameLeader(ctx, output)
            c.wasLeader = true
        } else if !isLeader && c.wasLeader {
            // Lost leadership - stop leader-only work
            c.onLostLeadership()
            c.wasLeader = false
        }
    }
    return nil
}

func (c *Component) onBecameLeader(ctx context.Context, output module.Handler) {
    // Initialize leader-only resources
    log.Info("became leader, taking over")

    // Resume any paused operations
    c.startLeaderOnlyWork(ctx, output)
}

func (c *Component) onLostLeadership() {
    // Stop leader-only work
    log.Info("lost leadership, stopping leader-only work")

    if c.cancelFunc != nil {
        c.cancelFunc()
        c.cancelFunc = nil
    }
}

Best Practices

1. Don't Assume Continuous Leadership

go
// Bad: Assumes leader forever
func (c *Component) Handle(ctx context.Context, ...) {
    if utils.IsLeader(ctx) {
        go func() {
            for {
                // Might not be leader anymore!
                c.doWork()
            }
        }()
    }
}

// Good: Check leadership in loop
func (c *Component) Handle(ctx context.Context, ...) {
    if utils.IsLeader(ctx) {
        go func() {
            for {
                select {
                case <-ctx.Done():
                    return
                default:
                    if !utils.IsLeader(ctx) {
                        return  // Lost leadership
                    }
                    c.doWork()
                }
            }
        }()
    }
}

2. Idempotent State Updates

go
// Good: Check before updating
output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
    if node.Status.Metadata["initialized"] != "true" {
        // Only initialize once
        node.Status.Metadata["initialized"] = "true"
        c.performInitialization()
    }
})

3. Graceful Degradation

go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    // All pods can handle messages
    if port == "input" {
        return c.processInput(ctx, output, msg)
    }

    // But only leader does status updates
    if port == v1alpha1.ReconcilePort && utils.IsLeader(ctx) {
        c.updateStatus(ctx, output)
    }
    return nil
}

Next Steps

Build flow-based applications on Kubernetes