Skip to content

Multi-Replica Coordination

This page provides complete examples of coordinating multiple pod replicas in TinySystems modules.

HTTP Server: Complete Example

The http-module demonstrates multi-replica coordination for HTTP servers.

The Challenge

Multiple pods need to:

  1. Listen on the same port
  2. Share traffic via load balancing
  3. Coordinate port assignment
  4. Handle leader failover

Solution Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                    HTTP SERVER MULTI-REPLICA ARCHITECTURE                    │
└─────────────────────────────────────────────────────────────────────────────┘

                    ┌───────────────────────────────────────┐
                    │             TinyNode CR               │
                    │                                       │
                    │  status:                              │
                    │    metadata:                          │
                    │      http-server-port: "8080"         │
                    └───────────────────────────────────────┘
                           ▲                    │
                           │ Patch              │ Watch
                           │                    ▼
         ┌─────────────────┴─────────────────────────────────────┐
         │                                                        │
    ┌────┴────┐         ┌─────────┐         ┌─────────┐         │
    │ LEADER  │         │ READER  │         │ READER  │         │
    │         │         │         │         │         │         │
    │ 1. Start│         │ Wait for│         │ Wait for│         │
    │    :8080│         │ port... │         │ port... │         │
    │         │         │         │         │         │         │
    │ 2. Write│         │         │         │         │         │
    │   "8080"│─────────│─────────│─────────│         │         │
    │         │         │         │         │         │         │
    │         │         │ 3. Read │         │ 3. Read │         │
    │         │         │   "8080"│         │   "8080"│         │
    │         │         │         │         │         │         │
    │ :8080   │         │ :8080   │         │ :8080   │         │
    └────┬────┘         └────┬────┘         └────┬────┘         │
         │                   │                   │               │
         └───────────────────┴───────────────────┘               │
                             │                                   │
                    Kubernetes Service                           │
                    (load balancing)                             │
                             │                                   │
                    External Traffic                             │

└────────────────────────────────────────────────────────────────┘

Implementation

go
// server.go
package server

import (
    "context"
    "net"
    "net/http"
    "strconv"
    "sync"

    "github.com/tiny-systems/module/api/v1alpha1"
    "github.com/tiny-systems/module/module"
    "github.com/tiny-systems/module/pkg/utils"
)

const (
    PortMetadata = "http-server-port"
)

type Server struct {
    settings     Settings
    nodeName     string
    currentPort  int
    listener     net.Listener
    httpServer   *http.Server
    startStopMu  sync.Mutex
}

func (s *Server) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    switch port {
    case v1alpha1.SettingsPort:
        s.settings = msg.(Settings)
        return nil

    case v1alpha1.ReconcilePort:
        return s.handleReconcile(ctx, output, msg)

    case RequestPort:
        return s.handleRequest(ctx, output, msg)
    }
    return nil
}

func (s *Server) handleReconcile(ctx context.Context, output module.Handler, msg any) any {
    node, ok := msg.(v1alpha1.TinyNode)
    if !ok {
        return nil
    }

    s.nodeName = node.Name

    // Read configured port from metadata
    configuredPort := 0
    if portStr, ok := node.Status.Metadata[PortMetadata]; ok {
        configuredPort, _ = strconv.Atoi(portStr)
    }

    s.startStopMu.Lock()
    defer s.startStopMu.Unlock()

    // Already running on correct port?
    if configuredPort == s.currentPort && s.currentPort > 0 {
        return nil
    }

    // No port assigned yet?
    if configuredPort == 0 {
        if utils.IsLeader(ctx) {
            // Leader: start and publish port
            return s.startAndPublish(ctx, output)
        }
        // Reader: wait for port assignment
        return nil
    }

    // Port assigned - start on that port
    return s.startOnPort(ctx, output, configuredPort)
}

func (s *Server) startAndPublish(ctx context.Context, output module.Handler) error {
    // Start on random available port
    listener, err := net.Listen("tcp", ":0")
    if err != nil {
        return err
    }

    actualPort := listener.Addr().(*net.TCPAddr).Port
    s.listener = listener
    s.currentPort = actualPort

    // Start HTTP server
    s.httpServer = &http.Server{Handler: s.handler()}
    go s.httpServer.Serve(listener)

    // Publish port to metadata
    output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
        if node.Status.Metadata == nil {
            node.Status.Metadata = make(map[string]string)
        }
        node.Status.Metadata[PortMetadata] = strconv.Itoa(actualPort)
    })

    // Expose via Ingress
    s.resourceManager.ExposePort(ctx, ExposePortRequest{
        Port:      actualPort,
        Hostnames: s.settings.Hostnames,
    })

    return nil
}

func (s *Server) startOnPort(ctx context.Context, output module.Handler, port int) error {
    // Stop existing server if any
    s.stop()

    // Start on specified port
    listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
    if err != nil {
        return err
    }

    s.listener = listener
    s.currentPort = port

    s.httpServer = &http.Server{Handler: s.handler()}
    go s.httpServer.Serve(listener)

    return nil
}

func (s *Server) stop() {
    if s.httpServer != nil {
        s.httpServer.Close()
        s.httpServer = nil
    }
    if s.listener != nil {
        s.listener.Close()
        s.listener = nil
    }
    s.currentPort = 0
}

Failover Scenario

Timeline of leader failover:

T=0:   Leader (Pod A) running on :8080, published to metadata
       Readers (Pod B, C) running on :8080

T=5:   Pod A crashes

T=5-20: Lease expires, Pod B acquires lease

T=20:  Pod B becomes leader
       Metadata still has "8080" - no change needed
       All pods continue serving on :8080

T=20+: Traffic continues flowing to Pod B and C
       No downtime (except Pod A's share of traffic)

Ticker: Periodic Emission

The ticker component demonstrates leader-only periodic operations.

go
// ticker.go
type Ticker struct {
    settings   Settings
    cancelFunc context.CancelFunc
    mu         sync.Mutex
    isRunning  bool
}

func (t *Ticker) Handle(ctx context.Context, handler module.Handler, port string, msg any) any {
    switch port {
    case v1alpha1.ControlPort:
        // Only leader handles start/stop
        if !utils.IsLeader(ctx) {
            return nil
        }

        control := msg.(Control)

        t.mu.Lock()
        defer t.mu.Unlock()

        if control.Start && !t.isRunning {
            ctx, t.cancelFunc = context.WithCancel(ctx)
            t.isRunning = true

            // Update UI
            handler(context.Background(), v1alpha1.ReconcilePort, nil)

            // Start emitting (blocks)
            return t.emit(ctx, handler)
        }

        if control.Stop && t.isRunning {
            if t.cancelFunc != nil {
                t.cancelFunc()
            }
            t.isRunning = false
            handler(context.Background(), v1alpha1.ReconcilePort, nil)
        }

        return nil

    case v1alpha1.ReconcilePort:
        // All pods receive reconcile
        // Leader tracks if running for UI state
        return nil
    }
    return nil
}

func (t *Ticker) emit(ctx context.Context, handler module.Handler) error {
    timer := time.NewTimer(time.Duration(t.settings.Delay) * time.Millisecond)
    defer timer.Stop()

    for {
        select {
        case <-timer.C:
            // Emit message (blocks until downstream completes)
            handler(ctx, OutPort, t.settings.Context)
            timer.Reset(time.Duration(t.settings.Delay) * time.Millisecond)

        case <-ctx.Done():
            t.mu.Lock()
            t.isRunning = false
            t.mu.Unlock()
            return ctx.Err()
        }
    }
}

func (t *Ticker) getControl() Control {
    t.mu.Lock()
    defer t.mu.Unlock()

    return Control{
        IsRunning: t.isRunning,
        // Show Start button if not running, Stop button if running
    }
}

Signal: Manual Trigger

The signal component shows button-based leader-only actions.

go
// signal.go
type Signal struct {
    settings   Settings
    cancelFunc context.CancelFunc
    mu         sync.Mutex
}

func (s *Signal) Handle(ctx context.Context, handler module.Handler, port string, msg any) any {
    switch port {
    case v1alpha1.ControlPort:
        // Only leader processes button clicks
        if !utils.IsLeader(ctx) {
            return nil
        }

        control := msg.(Control)

        s.mu.Lock()
        // Cancel any previous flow
        if s.cancelFunc != nil {
            s.cancelFunc()
            s.cancelFunc = nil
        }

        if control.Reset {
            s.mu.Unlock()
            handler(context.Background(), v1alpha1.ReconcilePort, nil)
            <-ctx.Done()
            return ctx.Err()
        }

        if control.Send {
            ctx, s.cancelFunc = context.WithCancel(ctx)
            s.mu.Unlock()

            // Update UI
            handler(context.Background(), v1alpha1.ReconcilePort, nil)

            // Send to output (blocks until complete)
            err := handler(ctx, OutPort, control.Context)

            // Flow complete
            s.mu.Lock()
            s.cancelFunc = nil
            s.mu.Unlock()

            handler(context.Background(), v1alpha1.ReconcilePort, nil)
            return err
        }
        s.mu.Unlock()
    }
    return nil
}

Coordination Patterns Summary

Pattern 1: Port Assignment

go
if utils.IsLeader(ctx) && portNotAssigned {
    port := startOnRandomPort()
    publishPortToMetadata(port)
} else if portAssigned {
    startOnAssignedPort()
}

Pattern 2: Singleton Operation

go
if utils.IsLeader(ctx) {
    // Only one pod does this
    doExpensiveOperation()
}

Pattern 3: State Machine

go
if utils.IsLeader(ctx) {
    switch currentState {
    case StateIdle:
        transitionTo(StateRunning)
    case StateRunning:
        doWork()
    }
    publishStateToMetadata()
} else {
    state := readStateFromMetadata()
    actOnState(state)
}

Pattern 4: Resource Ownership

go
if utils.IsLeader(ctx) {
    createIngressRule()
    createServicePort()
}
// All pods use the created resources

Best Practices

1. Use Mutex for State

go
type Component struct {
    mu         sync.Mutex
    cancelFunc context.CancelFunc
    isRunning  bool
}

2. Update UI After State Changes

go
handler(context.Background(), v1alpha1.ReconcilePort, nil)

3. Handle Context Cancellation

go
select {
case <-ctx.Done():
    return ctx.Err()
case result := <-workDone:
    return result
}

4. Idempotent Operations

go
if s.currentPort == configuredPort {
    return nil  // Already in desired state
}

Next Steps

Build flow-based applications on Kubernetes