Leader-Reader Pattern
The leader-reader pattern is how TinySystems modules coordinate across multiple replicas. Understanding this pattern is essential for building scalable components.
Overview
┌─────────────────────────────────────────────────────────────────────────────┐
│ LEADER-READER PATTERN │
└─────────────────────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────────────────┐
│ KUBERNETES LEASE │
│ │
│ Only one pod holds the lease at a time │
│ Lease holder = LEADER │
│ Other pods = READERS │
└──────────────────────────────────────────────────────────────────────────────┘
│
┌─────────────────────────┼─────────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ LEADER │ │ READER │ │ READER │
│ │ │ │ │ │
│ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │
│ │ Write │ │ │ │ Watch │ │ │ │ Watch │ │
│ │ CRs │ │ │ │ Only │ │ │ │ Only │ │
│ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │
│ │ │ │ │ │
│ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │
│ │ Process │ │ │ │ Handle │ │ │ │ Handle │ │
│ │ Signals │ │ │ │Messages │ │ │ │Messages │ │
│ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │
│ │ │ │ │ │
│ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │
│ │ Update │ │ │ │ Local │ │ │ │ Local │ │
│ │ Status │ │ │ │Reconcile│ │ │ │Reconcile│ │
│ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │
└─────────────┘ └─────────────┘ └─────────────┘Responsibilities
Leader Pod
| Responsibility | Description |
|---|---|
| Update TinyModule | Publish gRPC address and component list |
| Update TinyNode | Update port schemas and metadata |
| Process TinySignals | Execute signals to avoid duplicates |
| Expose Ingress | Configure external access |
| Write Metadata | Shared state updates |
Reader Pods
| Responsibility | Description |
|---|---|
| Watch CRs | Stay informed of changes |
| Handle Messages | Process incoming gRPC calls |
| Local Reconcile | Maintain component state |
| Read Metadata | Use shared configuration |
Checking Leadership
go
import "github.com/tiny-systems/module/pkg/utils"
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
if utils.IsLeader(ctx) {
// Leader-only code
} else {
// Reader code (or skip)
}
return nil
}Pattern Implementation
Basic Pattern
go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
switch port {
case v1alpha1.SettingsPort:
// ALL pods: store settings
c.settings = msg.(Settings)
return nil
case v1alpha1.ControlPort:
// LEADER only: process control actions
if !utils.IsLeader(ctx) {
return nil
}
return c.handleControl(ctx, output, msg)
case v1alpha1.ReconcilePort:
// ALL pods: read shared state
// LEADER: also writes state
return c.handleReconcile(ctx, output, msg)
case "input":
// ALL pods: handle incoming messages
return c.handleInput(ctx, output, msg)
}
return nil
}Signal Component Example
go
// signal.go - Manual flow trigger
func (t *Signal) Handle(ctx context.Context, handler module.Handler, port string, msg any) any {
switch port {
case v1alpha1.ControlPort:
// Only leader processes button clicks
if !utils.IsLeader(ctx) {
return nil
}
control := msg.(Control)
if control.Send {
// Start the flow
t.mu.Lock()
if t.cancelFunc != nil {
t.cancelFunc()
}
ctx, t.cancelFunc = context.WithCancel(ctx)
t.mu.Unlock()
// Update UI state
handler(context.Background(), v1alpha1.ReconcilePort, nil)
// Send to output
return handler(ctx, OutPort, control.Context)
}
if control.Reset {
t.mu.Lock()
if t.cancelFunc != nil {
t.cancelFunc()
t.cancelFunc = nil
}
t.mu.Unlock()
handler(context.Background(), v1alpha1.ReconcilePort, nil)
return ctx.Err()
}
}
return nil
}Ticker Component Example
go
// ticker.go - Periodic emission
func (t *Ticker) Handle(ctx context.Context, handler module.Handler, port string, msg any) any {
switch port {
case v1alpha1.ControlPort:
// Only leader starts/stops the ticker
if !utils.IsLeader(ctx) {
return nil
}
control := msg.(Control)
if control.Start {
return t.startEmitting(ctx, handler)
}
if control.Stop {
t.stopEmitting()
}
return nil
case v1alpha1.ReconcilePort:
// All pods can read state
// Leader can update state
return nil
}
return nil
}
func (t *Ticker) startEmitting(ctx context.Context, handler module.Handler) error {
t.mu.Lock()
runCtx, cancel := context.WithCancel(ctx)
t.cancelFunc = cancel
t.mu.Unlock()
// Update UI
handler(context.Background(), v1alpha1.ReconcilePort, nil)
// Emit loop (blocking)
timer := time.NewTimer(time.Duration(t.settings.Delay) * time.Millisecond)
for {
select {
case <-timer.C:
handler(runCtx, OutPort, t.settings.Context)
timer.Reset(time.Duration(t.settings.Delay) * time.Millisecond)
case <-runCtx.Done():
return runCtx.Err()
}
}
}Controller-Level Pattern
Controllers also implement the pattern:
go
// tinynode_controller.go
func (r *TinyNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
node := &v1alpha1.TinyNode{}
r.Get(ctx, req.NamespacedName, node)
// ALL pods: update local scheduler
r.Scheduler.Update(ctx, node)
// LEADER only: update CR status
if !r.IsLeader.Load() {
// Requeue to stay in sync
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
// Leader: build and update status
node.Status = r.buildStatus(node)
r.Status().Update(ctx, node)
return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
}go
// tinysignal_controller.go
func (r *TinySignalReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// LEADER only: process signals
if !r.IsLeader.Load() {
return ctrl.Result{}, nil
}
signal := &v1alpha1.TinySignal{}
r.Get(ctx, req.NamespacedName, signal)
// Route to scheduler
r.Scheduler.Handle(ctx, runner.Msg{
To: fmt.Sprintf("%s.%s", signal.Spec.Node, signal.Spec.Port),
Data: signal.Spec.Data,
})
// Delete processed signal
r.Delete(ctx, signal)
return ctrl.Result{}, nil
}Failover Handling
When the leader fails:
Time ──────────────────────────────────────────────────────────────▶
Leader Pod A Reader Pod B Reader Pod C
│ │ │
│ ◀── Lease held │ │
│ │ │
X (Pod A dies) │ │
│ │
│ ◀── Lease expires │ │
(15 seconds) │ │
│ │
│ Acquires lease │
│ ──────────────▶ │
│ │
│ isLeader = true │
│ │
│ Continues operations │
│ │
▼ ▼Code for Failover
go
type Component struct {
cancelFunc context.CancelFunc
mu sync.Mutex
wasLeader bool
}
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
if port == v1alpha1.ReconcilePort {
isLeader := utils.IsLeader(ctx)
c.mu.Lock()
defer c.mu.Unlock()
if isLeader && !c.wasLeader {
// Just became leader - take over
c.onBecameLeader(ctx, output)
c.wasLeader = true
} else if !isLeader && c.wasLeader {
// Lost leadership - stop leader-only work
c.onLostLeadership()
c.wasLeader = false
}
}
return nil
}
func (c *Component) onBecameLeader(ctx context.Context, output module.Handler) {
// Initialize leader-only resources
log.Info("became leader, taking over")
// Resume any paused operations
c.startLeaderOnlyWork(ctx, output)
}
func (c *Component) onLostLeadership() {
// Stop leader-only work
log.Info("lost leadership, stopping leader-only work")
if c.cancelFunc != nil {
c.cancelFunc()
c.cancelFunc = nil
}
}Best Practices
1. Don't Assume Continuous Leadership
go
// Bad: Assumes leader forever
func (c *Component) Handle(ctx context.Context, ...) {
if utils.IsLeader(ctx) {
go func() {
for {
// Might not be leader anymore!
c.doWork()
}
}()
}
}
// Good: Check leadership in loop
func (c *Component) Handle(ctx context.Context, ...) {
if utils.IsLeader(ctx) {
go func() {
for {
select {
case <-ctx.Done():
return
default:
if !utils.IsLeader(ctx) {
return // Lost leadership
}
c.doWork()
}
}
}()
}
}2. Idempotent State Updates
go
// Good: Check before updating
output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
if node.Status.Metadata["initialized"] != "true" {
// Only initialize once
node.Status.Metadata["initialized"] = "true"
c.performInitialization()
}
})3. Graceful Degradation
go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
// All pods can handle messages
if port == "input" {
return c.processInput(ctx, output, msg)
}
// But only leader does status updates
if port == v1alpha1.ReconcilePort && utils.IsLeader(ctx) {
c.updateStatus(ctx, output)
}
return nil
}Next Steps
- Multi-Replica Coordination - Complete examples
- Horizontal Scaling - Scaling best practices
- Leader Election - Election mechanics