Multi-Replica Coordination
This page provides complete examples of coordinating multiple pod replicas in TinySystems modules.
HTTP Server: Complete Example
The http-module demonstrates multi-replica coordination for HTTP servers.
The Challenge
Multiple pods need to:
- Listen on the same port
- Share traffic via load balancing
- Coordinate port assignment
- Handle leader failover
Solution Architecture
┌─────────────────────────────────────────────────────────────────────────────┐
│ HTTP SERVER MULTI-REPLICA ARCHITECTURE │
└─────────────────────────────────────────────────────────────────────────────┘
┌───────────────────────────────────────┐
│ TinyNode CR │
│ │
│ status: │
│ metadata: │
│ http-server-port: "8080" │
└───────────────────────────────────────┘
▲ │
│ Patch │ Watch
│ ▼
┌─────────────────┴─────────────────────────────────────┐
│ │
┌────┴────┐ ┌─────────┐ ┌─────────┐ │
│ LEADER │ │ READER │ │ READER │ │
│ │ │ │ │ │ │
│ 1. Start│ │ Wait for│ │ Wait for│ │
│ :8080│ │ port... │ │ port... │ │
│ │ │ │ │ │ │
│ 2. Write│ │ │ │ │ │
│ "8080"│─────────│─────────│─────────│ │ │
│ │ │ │ │ │ │
│ │ │ 3. Read │ │ 3. Read │ │
│ │ │ "8080"│ │ "8080"│ │
│ │ │ │ │ │ │
│ :8080 │ │ :8080 │ │ :8080 │ │
└────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │
└───────────────────┴───────────────────┘ │
│ │
Kubernetes Service │
(load balancing) │
│ │
External Traffic │
│
└────────────────────────────────────────────────────────────────┘Implementation
go
// server.go
package server
import (
"context"
"net"
"net/http"
"strconv"
"sync"
"github.com/tiny-systems/module/api/v1alpha1"
"github.com/tiny-systems/module/module"
"github.com/tiny-systems/module/pkg/utils"
)
const (
PortMetadata = "http-server-port"
)
type Server struct {
settings Settings
nodeName string
currentPort int
listener net.Listener
httpServer *http.Server
startStopMu sync.Mutex
}
func (s *Server) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
switch port {
case v1alpha1.SettingsPort:
s.settings = msg.(Settings)
return nil
case v1alpha1.ReconcilePort:
return s.handleReconcile(ctx, output, msg)
case RequestPort:
return s.handleRequest(ctx, output, msg)
}
return nil
}
func (s *Server) handleReconcile(ctx context.Context, output module.Handler, msg any) any {
node, ok := msg.(v1alpha1.TinyNode)
if !ok {
return nil
}
s.nodeName = node.Name
// Read configured port from metadata
configuredPort := 0
if portStr, ok := node.Status.Metadata[PortMetadata]; ok {
configuredPort, _ = strconv.Atoi(portStr)
}
s.startStopMu.Lock()
defer s.startStopMu.Unlock()
// Already running on correct port?
if configuredPort == s.currentPort && s.currentPort > 0 {
return nil
}
// No port assigned yet?
if configuredPort == 0 {
if utils.IsLeader(ctx) {
// Leader: start and publish port
return s.startAndPublish(ctx, output)
}
// Reader: wait for port assignment
return nil
}
// Port assigned - start on that port
return s.startOnPort(ctx, output, configuredPort)
}
func (s *Server) startAndPublish(ctx context.Context, output module.Handler) error {
// Start on random available port
listener, err := net.Listen("tcp", ":0")
if err != nil {
return err
}
actualPort := listener.Addr().(*net.TCPAddr).Port
s.listener = listener
s.currentPort = actualPort
// Start HTTP server
s.httpServer = &http.Server{Handler: s.handler()}
go s.httpServer.Serve(listener)
// Publish port to metadata
output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
if node.Status.Metadata == nil {
node.Status.Metadata = make(map[string]string)
}
node.Status.Metadata[PortMetadata] = strconv.Itoa(actualPort)
})
// Expose via Ingress
s.resourceManager.ExposePort(ctx, ExposePortRequest{
Port: actualPort,
Hostnames: s.settings.Hostnames,
})
return nil
}
func (s *Server) startOnPort(ctx context.Context, output module.Handler, port int) error {
// Stop existing server if any
s.stop()
// Start on specified port
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
return err
}
s.listener = listener
s.currentPort = port
s.httpServer = &http.Server{Handler: s.handler()}
go s.httpServer.Serve(listener)
return nil
}
func (s *Server) stop() {
if s.httpServer != nil {
s.httpServer.Close()
s.httpServer = nil
}
if s.listener != nil {
s.listener.Close()
s.listener = nil
}
s.currentPort = 0
}Failover Scenario
Timeline of leader failover:
T=0: Leader (Pod A) running on :8080, published to metadata
Readers (Pod B, C) running on :8080
T=5: Pod A crashes
T=5-20: Lease expires, Pod B acquires lease
T=20: Pod B becomes leader
Metadata still has "8080" - no change needed
All pods continue serving on :8080
T=20+: Traffic continues flowing to Pod B and C
No downtime (except Pod A's share of traffic)Ticker: Periodic Emission
The ticker component demonstrates leader-only periodic operations.
go
// ticker.go
type Ticker struct {
settings Settings
cancelFunc context.CancelFunc
mu sync.Mutex
isRunning bool
}
func (t *Ticker) Handle(ctx context.Context, handler module.Handler, port string, msg any) any {
switch port {
case v1alpha1.ControlPort:
// Only leader handles start/stop
if !utils.IsLeader(ctx) {
return nil
}
control := msg.(Control)
t.mu.Lock()
defer t.mu.Unlock()
if control.Start && !t.isRunning {
ctx, t.cancelFunc = context.WithCancel(ctx)
t.isRunning = true
// Update UI
handler(context.Background(), v1alpha1.ReconcilePort, nil)
// Start emitting (blocks)
return t.emit(ctx, handler)
}
if control.Stop && t.isRunning {
if t.cancelFunc != nil {
t.cancelFunc()
}
t.isRunning = false
handler(context.Background(), v1alpha1.ReconcilePort, nil)
}
return nil
case v1alpha1.ReconcilePort:
// All pods receive reconcile
// Leader tracks if running for UI state
return nil
}
return nil
}
func (t *Ticker) emit(ctx context.Context, handler module.Handler) error {
timer := time.NewTimer(time.Duration(t.settings.Delay) * time.Millisecond)
defer timer.Stop()
for {
select {
case <-timer.C:
// Emit message (blocks until downstream completes)
handler(ctx, OutPort, t.settings.Context)
timer.Reset(time.Duration(t.settings.Delay) * time.Millisecond)
case <-ctx.Done():
t.mu.Lock()
t.isRunning = false
t.mu.Unlock()
return ctx.Err()
}
}
}
func (t *Ticker) getControl() Control {
t.mu.Lock()
defer t.mu.Unlock()
return Control{
IsRunning: t.isRunning,
// Show Start button if not running, Stop button if running
}
}Signal: Manual Trigger
The signal component shows button-based leader-only actions.
go
// signal.go
type Signal struct {
settings Settings
cancelFunc context.CancelFunc
mu sync.Mutex
}
func (s *Signal) Handle(ctx context.Context, handler module.Handler, port string, msg any) any {
switch port {
case v1alpha1.ControlPort:
// Only leader processes button clicks
if !utils.IsLeader(ctx) {
return nil
}
control := msg.(Control)
s.mu.Lock()
// Cancel any previous flow
if s.cancelFunc != nil {
s.cancelFunc()
s.cancelFunc = nil
}
if control.Reset {
s.mu.Unlock()
handler(context.Background(), v1alpha1.ReconcilePort, nil)
<-ctx.Done()
return ctx.Err()
}
if control.Send {
ctx, s.cancelFunc = context.WithCancel(ctx)
s.mu.Unlock()
// Update UI
handler(context.Background(), v1alpha1.ReconcilePort, nil)
// Send to output (blocks until complete)
err := handler(ctx, OutPort, control.Context)
// Flow complete
s.mu.Lock()
s.cancelFunc = nil
s.mu.Unlock()
handler(context.Background(), v1alpha1.ReconcilePort, nil)
return err
}
s.mu.Unlock()
}
return nil
}Coordination Patterns Summary
Pattern 1: Port Assignment
go
if utils.IsLeader(ctx) && portNotAssigned {
port := startOnRandomPort()
publishPortToMetadata(port)
} else if portAssigned {
startOnAssignedPort()
}Pattern 2: Singleton Operation
go
if utils.IsLeader(ctx) {
// Only one pod does this
doExpensiveOperation()
}Pattern 3: State Machine
go
if utils.IsLeader(ctx) {
switch currentState {
case StateIdle:
transitionTo(StateRunning)
case StateRunning:
doWork()
}
publishStateToMetadata()
} else {
state := readStateFromMetadata()
actOnState(state)
}Pattern 4: Resource Ownership
go
if utils.IsLeader(ctx) {
createIngressRule()
createServicePort()
}
// All pods use the created resourcesBest Practices
1. Use Mutex for State
go
type Component struct {
mu sync.Mutex
cancelFunc context.CancelFunc
isRunning bool
}2. Update UI After State Changes
go
handler(context.Background(), v1alpha1.ReconcilePort, nil)3. Handle Context Cancellation
go
select {
case <-ctx.Done():
return ctx.Err()
case result := <-workDone:
return result
}4. Idempotent Operations
go
if s.currentPort == configuredPort {
return nil // Already in desired state
}Next Steps
- Horizontal Scaling - Scaling best practices
- Leader Election - Election mechanics
- CR-Based State Propagation - State sharing