Leader-Reader Pattern
The leader-reader pattern is how TinySystems modules coordinate across multiple replicas. Understanding this pattern is essential for building scalable components.
Overview
+-----------------------------------------------------------------------+
| LEADER-READER PATTERN |
+-----------------------------------------------------------------------+
+-----------------------------------------------------------------------+
| KUBERNETES LEASE |
| |
| Only one pod holds the lease at a time |
| Lease holder = LEADER |
| Other pods = READERS |
+-----------------------------------------------------------------------+
|
+-------------------------+-------------------------+
| | |
v v v
+-------------+ +-------------+ +-------------+
| LEADER | | READER | | READER |
| | | | | |
| +---------+ | | +---------+ | | +---------+ |
| | Write | | | | Watch | | | | Watch | |
| | CRs | | | | Only | | | | Only | |
| +---------+ | | +---------+ | | +---------+ |
| | | | | |
| +---------+ | | +---------+ | | +---------+ |
| | Process | | | | Handle | | | | Handle | |
| | Signals | | | |Messages | | | |Messages | |
| +---------+ | | +---------+ | | +---------+ |
| | | | | |
| +---------+ | | +---------+ | | +---------+ |
| | Update | | | | Local | | | | Local | |
| | Status | | | |Reconcile| | | |Reconcile| |
| +---------+ | | +---------+ | | +---------+ |
+-------------+ +-------------+ +-------------+Responsibilities
Leader Pod
| Responsibility | Description |
|---|---|
| Update TinyModule | Publish gRPC address and component list |
| Update TinyNode | Update port schemas and metadata |
| Process TinySignals | Execute signals to avoid duplicates |
| Expose Ingress | Configure external access |
| Write Metadata | Shared state updates |
Reader Pods
| Responsibility | Description |
|---|---|
| Watch CRs | Stay informed of changes |
| Handle Messages | Process incoming gRPC calls |
| Local Reconcile | Maintain component state |
| Read Metadata | Use shared configuration |
Checking Leadership
go
import "github.com/tiny-systems/module/pkg/utils"
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
if utils.IsLeader(ctx) {
// Leader-only code
} else {
// Reader code (or skip)
}
return nil
}Pattern Implementation
Basic Pattern
go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
switch port {
case v1alpha1.SettingsPort:
// ALL pods: store settings
c.settings = msg.(Settings)
return nil
case v1alpha1.ControlPort:
// LEADER only: process control actions
if !utils.IsLeader(ctx) {
return nil
}
return c.handleControl(ctx, output, msg)
case v1alpha1.ReconcilePort:
// ALL pods: read shared state
// LEADER: also writes state
return c.handleReconcile(ctx, output, msg)
case "input":
// ALL pods: handle incoming messages
return c.handleInput(ctx, output, msg)
}
return nil
}Signal Component Example
go
// signal.go - Manual flow trigger
func (t *Signal) Handle(ctx context.Context, handler module.Handler, port string, msg any) any {
switch port {
case v1alpha1.ControlPort:
// Only leader processes button clicks
if !utils.IsLeader(ctx) {
return nil
}
control := msg.(Control)
if control.Send {
// Start the flow
t.mu.Lock()
if t.cancelFunc != nil {
t.cancelFunc()
}
ctx, t.cancelFunc = context.WithCancel(ctx)
t.mu.Unlock()
// Update UI state
handler(context.Background(), v1alpha1.ReconcilePort, nil)
// Send to output
return handler(ctx, OutPort, control.Context)
}
if control.Reset {
t.mu.Lock()
if t.cancelFunc != nil {
t.cancelFunc()
t.cancelFunc = nil
}
t.mu.Unlock()
handler(context.Background(), v1alpha1.ReconcilePort, nil)
return ctx.Err()
}
}
return nil
}Ticker Component Example
go
// ticker.go - Periodic emission
func (t *Ticker) Handle(ctx context.Context, handler module.Handler, port string, msg any) any {
switch port {
case v1alpha1.ControlPort:
// Only leader starts/stops the ticker
if !utils.IsLeader(ctx) {
return nil
}
control := msg.(Control)
if control.Start {
return t.startEmitting(ctx, handler)
}
if control.Stop {
t.stopEmitting()
}
return nil
case v1alpha1.ReconcilePort:
// All pods can read state
// Leader can update state
return nil
}
return nil
}
func (t *Ticker) startEmitting(ctx context.Context, handler module.Handler) error {
t.mu.Lock()
runCtx, cancel := context.WithCancel(ctx)
t.cancelFunc = cancel
t.mu.Unlock()
// Update UI
handler(context.Background(), v1alpha1.ReconcilePort, nil)
// Emit loop (blocking)
timer := time.NewTimer(time.Duration(t.settings.Delay) * time.Millisecond)
for {
select {
case <-timer.C:
handler(runCtx, OutPort, t.settings.Context)
timer.Reset(time.Duration(t.settings.Delay) * time.Millisecond)
case <-runCtx.Done():
return runCtx.Err()
}
}
}Controller-Level Pattern
Controllers also implement the pattern:
go
// tinynode_controller.go
func (r *TinyNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
node := &v1alpha1.TinyNode{}
r.Get(ctx, req.NamespacedName, node)
// ALL pods: update local scheduler
r.Scheduler.Update(ctx, node)
// LEADER only: update CR status
if !r.IsLeader.Load() {
// Requeue to stay in sync
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
// Leader: build and update status
node.Status = r.buildStatus(node)
r.Status().Update(ctx, node)
return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
}go
// tinysignal_controller.go
func (r *TinySignalReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// LEADER only: process signals
if !r.IsLeader.Load() {
return ctrl.Result{}, nil
}
signal := &v1alpha1.TinySignal{}
r.Get(ctx, req.NamespacedName, signal)
// Route to scheduler
r.Scheduler.Handle(ctx, runner.Msg{
To: fmt.Sprintf("%s.%s", signal.Spec.Node, signal.Spec.Port),
Data: signal.Spec.Data,
})
// Delete processed signal
r.Delete(ctx, signal)
return ctrl.Result{}, nil
}Failover Handling
When the leader fails:
Time -------------------------------------------------------------->
Leader Pod A Reader Pod B Reader Pod C
| | |
| <-- Lease held | |
| | |
X (Pod A dies) | |
| |
| <-- Lease expires | |
(15 seconds) | |
| |
| Acquires lease |
| --------------> |
| |
| isLeader = true |
| |
| Continues operations |
| |
v vCode for Failover
go
type Component struct {
cancelFunc context.CancelFunc
mu sync.Mutex
wasLeader bool
}
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
if port == v1alpha1.ReconcilePort {
isLeader := utils.IsLeader(ctx)
c.mu.Lock()
defer c.mu.Unlock()
if isLeader && !c.wasLeader {
// Just became leader - take over
c.onBecameLeader(ctx, output)
c.wasLeader = true
} else if !isLeader && c.wasLeader {
// Lost leadership - stop leader-only work
c.onLostLeadership()
c.wasLeader = false
}
}
return nil
}
func (c *Component) onBecameLeader(ctx context.Context, output module.Handler) {
// Initialize leader-only resources
log.Info("became leader, taking over")
// Resume any paused operations
c.startLeaderOnlyWork(ctx, output)
}
func (c *Component) onLostLeadership() {
// Stop leader-only work
log.Info("lost leadership, stopping leader-only work")
if c.cancelFunc != nil {
c.cancelFunc()
c.cancelFunc = nil
}
}Best Practices
1. Don't Assume Continuous Leadership
go
// Bad: Assumes leader forever
func (c *Component) Handle(ctx context.Context, ...) {
if utils.IsLeader(ctx) {
go func() {
for {
// Might not be leader anymore!
c.doWork()
}
}()
}
}
// Good: Check leadership in loop
func (c *Component) Handle(ctx context.Context, ...) {
if utils.IsLeader(ctx) {
go func() {
for {
select {
case <-ctx.Done():
return
default:
if !utils.IsLeader(ctx) {
return // Lost leadership
}
c.doWork()
}
}
}()
}
}2. Idempotent State Updates
go
// Good: Check before updating
output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
if node.Status.Metadata["initialized"] != "true" {
// Only initialize once
node.Status.Metadata["initialized"] = "true"
c.performInitialization()
}
})3. Graceful Degradation
go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
// All pods can handle messages
if port == "input" {
return c.processInput(ctx, output, msg)
}
// But only leader does status updates
if port == v1alpha1.ReconcilePort && utils.IsLeader(ctx) {
c.updateStatus(ctx, output)
}
return nil
}Next Steps
- Multi-Replica Coordination - Complete examples
- Horizontal Scaling - Scaling best practices
- Leader Election - Election mechanics