Skip to content

CR-Based State Propagation

TinySystems uses TinyNode Custom Resources as a distributed state store, enabling pods to share state without direct communication. This is a key pattern for building scalable components.

The Problem

How do multiple pods share state?

┌─────────────────────────────────────────────────────────────────────────────┐
│                         TRADITIONAL APPROACHES                               │
└─────────────────────────────────────────────────────────────────────────────┘

Option 1: Shared Database
  - Extra infrastructure
  - Latency overhead
  - Connection management

Option 2: Pod-to-Pod Communication
  - Service discovery needed
  - Connection management
  - Failure handling

Option 3: External Cache (Redis)
  - Extra infrastructure
  - Network latency
  - Consistency issues

TinySystems uses TinyNode CRs as the shared state store:

┌─────────────────────────────────────────────────────────────────────────────┐
│                    CR-BASED STATE PROPAGATION                                │
└─────────────────────────────────────────────────────────────────────────────┘

                    ┌───────────────────────────────────────┐
                    │          TinyNode CR (etcd)           │
                    │                                       │
                    │  status:                              │
                    │    metadata:                          │
                    │      http-server-port: "8080"         │
                    │      last-health-check: "2024-01..."  │
                    │      connection-count: "42"           │
                    └───────────────────────────────────────┘
                           ▲                    │
                           │ Patch              │ Watch
                           │ (Leader)           │ (All pods)
                           │                    ▼
         ┌─────────────────┴─────────────────────────────────────┐
         │                                                        │
    ┌────┴────┐         ┌─────────┐         ┌─────────┐         │
    │ LEADER  │         │ READER  │         │ READER  │         │
    │  Pod    │         │  Pod    │         │  Pod    │         │
    │         │         │         │         │         │         │
    │ Writes  │         │ Watches │         │ Watches │         │
    │ state   │         │ & reads │         │ & reads │         │
    └─────────┘         └─────────┘         └─────────┘         │
         │                   │                   │               │
         └───────────────────┴───────────────────┘               │
                             │                                   │
                    All pods have consistent state               │

└────────────────────────────────────────────────────────────────┘

How It Works

1. TinyNode Status Metadata

The TinyNode.Status.Metadata field is a key-value map:

yaml
apiVersion: operator.tinysystems.io/v1alpha1
kind: TinyNode
metadata:
  name: http-server-abc123
status:
  metadata:
    http-server-port: "8080"
    started-at: "2024-01-15T10:30:00Z"
    request-count: "12345"
    custom-key: "custom-value"

2. Leader Writes State

Only the leader updates the CR:

go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    // Leader writes state
    if utils.IsLeader(ctx) {
        output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
            if node.Status.Metadata == nil {
                node.Status.Metadata = make(map[string]string)
            }
            node.Status.Metadata["http-server-port"] = strconv.Itoa(c.actualPort)
            node.Status.Metadata["started-at"] = time.Now().Format(time.RFC3339)
        })
    }
    return nil
}

3. All Pods Receive Updates

Via the reconcile port, all pods receive the updated TinyNode:

go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    if port == v1alpha1.ReconcilePort {
        node, ok := msg.(v1alpha1.TinyNode)
        if !ok {
            return nil
        }

        // All pods read shared state
        if portStr, ok := node.Status.Metadata["http-server-port"]; ok {
            port, _ := strconv.Atoi(portStr)
            c.configuredPort = port
        }
    }
    return nil
}

HTTP Server Example

The http-module demonstrates this pattern:

go
// server.go
func (h *Server) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    switch port {
    case v1alpha1.ReconcilePort:
        node := msg.(v1alpha1.TinyNode)
        h.nodeName = node.Name

        // Read configured port from metadata
        listenPort, _ := strconv.Atoi(node.Status.Metadata[PortMetadata])

        // Check if we need to (re)start
        if listenPort == h.currentPort {
            return nil  // Already running on correct port
        }

        if listenPort == 0 {
            // No port assigned yet
            if utils.IsLeader(ctx) {
                // Leader: start server and publish port
                h.startAndPublishPort(ctx, output)
            }
            // Non-leader: wait for port assignment
            return nil
        }

        // Port assigned - all pods start on that port
        h.startOnPort(ctx, listenPort)
        return nil
    }
    return nil
}

func (h *Server) startAndPublishPort(ctx context.Context, output module.Handler) {
    // Start on random available port
    listener, _ := net.Listen("tcp", ":0")
    actualPort := listener.Addr().(*net.TCPAddr).Port

    // Publish port to metadata
    output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
        if node.Status.Metadata == nil {
            node.Status.Metadata = make(map[string]string)
        }
        node.Status.Metadata[PortMetadata] = strconv.Itoa(actualPort)
    })

    // Start serving
    go http.Serve(listener, h.handler)
    h.currentPort = actualPort
}

State Propagation Timeline

Time ──────────────────────────────────────────────────────────────▶

      Leader Pod              Kubernetes              Reader Pods
          │                       │                        │
          │ Start server on :8080 │                        │
          │                       │                        │
          │ Patch TinyNode ──────▶│                        │
          │ metadata["port"]=8080 │                        │
          │                       │                        │
          │                       │ Watch event ──────────▶│
          │                       │                        │
          │                       │                        │ Read metadata
          │                       │                        │ Start on :8080
          │                       │                        │
          ▼                       ▼                        ▼
      All pods now listening on :8080

Updating Metadata

Callback Pattern

The recommended way to update metadata:

go
output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
    // Modify node in place
    node.Status.Metadata["key"] = "value"
})

The runner handles:

  1. Getting current TinyNode
  2. Applying your callback
  3. Patching the status

Atomic Updates

Updates are atomic at the field level:

go
// Safe: Only updates "my-key"
output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
    node.Status.Metadata["my-key"] = "my-value"
})

// Other fields unchanged

Common Patterns

Pattern 1: Port Assignment

go
const PortMetadata = "server-port"

func (c *Server) publishPort(ctx context.Context, output module.Handler, port int) {
    output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
        if node.Status.Metadata == nil {
            node.Status.Metadata = make(map[string]string)
        }
        node.Status.Metadata[PortMetadata] = strconv.Itoa(port)
    })
}

func (c *Server) readPort(node *v1alpha1.TinyNode) int {
    if portStr, ok := node.Status.Metadata[PortMetadata]; ok {
        port, _ := strconv.Atoi(portStr)
        return port
    }
    return 0
}

Pattern 2: Timestamp Tracking

go
func (c *Component) updateLastRun(ctx context.Context, output module.Handler) {
    if utils.IsLeader(ctx) {
        output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
            node.Status.Metadata["last-run"] = time.Now().Format(time.RFC3339)
        })
    }
}

func (c *Component) getLastRun(node *v1alpha1.TinyNode) time.Time {
    if ts, ok := node.Status.Metadata["last-run"]; ok {
        t, _ := time.Parse(time.RFC3339, ts)
        return t
    }
    return time.Time{}
}

Pattern 3: State Machine

go
type State string

const (
    StateIdle    State = "idle"
    StateRunning State = "running"
    StateStopped State = "stopped"
)

func (c *Component) setState(ctx context.Context, output module.Handler, state State) {
    if utils.IsLeader(ctx) {
        output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
            node.Status.Metadata["state"] = string(state)
        })
    }
}

func (c *Component) getState(node *v1alpha1.TinyNode) State {
    if s, ok := node.Status.Metadata["state"]; ok {
        return State(s)
    }
    return StateIdle
}

Pattern 4: Counter

go
func (c *Component) incrementCounter(ctx context.Context, output module.Handler) {
    if utils.IsLeader(ctx) {
        output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
            count := 0
            if countStr, ok := node.Status.Metadata["count"]; ok {
                count, _ = strconv.Atoi(countStr)
            }
            count++
            node.Status.Metadata["count"] = strconv.Itoa(count)
        })
    }
}

Limitations

String Values Only

Metadata values are strings:

go
// Must convert to string
node.Status.Metadata["port"] = strconv.Itoa(8080)
node.Status.Metadata["enabled"] = strconv.FormatBool(true)
node.Status.Metadata["config"] = string(jsonBytes)

Size Limits

Kubernetes resources have size limits (~1MB). Keep metadata small:

go
// Good: Small values
node.Status.Metadata["port"] = "8080"

// Bad: Large values
node.Status.Metadata["all-data"] = hugeJSONString  // Don't do this

Update Latency

Watch propagation has latency (~100ms to seconds):

go
// Don't rely on immediate propagation
output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
    node.Status.Metadata["key"] = "value"
})
// Other pods won't see this immediately

Best Practices

1. Leader-Only Writes

go
if utils.IsLeader(ctx) {
    output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
        // Update metadata
    })
}

2. Initialize Metadata Map

go
output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
    if node.Status.Metadata == nil {
        node.Status.Metadata = make(map[string]string)
    }
    node.Status.Metadata["key"] = "value"
})

3. Use Constants for Keys

go
const (
    MetaKeyPort      = "http-server-port"
    MetaKeyState     = "component-state"
    MetaKeyLastRun   = "last-run-time"
)

4. Handle Missing Values

go
func (c *Component) readPort(node *v1alpha1.TinyNode) int {
    if node.Status.Metadata == nil {
        return 0
    }
    if portStr, ok := node.Status.Metadata[MetaKeyPort]; ok {
        port, err := strconv.Atoi(portStr)
        if err != nil {
            return 0
        }
        return port
    }
    return 0
}

Next Steps

Build flow-based applications on Kubernetes