Skip to content

Stateful Component Example

A complete example of a component with state propagation across replicas.

Overview

This component maintains state that must be shared across multiple replicas. It demonstrates:

  • Leader-reader pattern
  • State propagation via TinyNode CR
  • Graceful state recovery
  • Concurrent access handling

Complete Implementation

go
package counter

import (
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/tiny-systems/module/api/v1alpha1"
    "github.com/tiny-systems/module/pkg/module"
    "github.com/tiny-systems/module/pkg/utils"
)

const ComponentName = "stateful_counter"

type Component struct {
    settings Settings

    mu         sync.RWMutex
    count      int64
    lastUpdate time.Time
    history    []CountEvent
}

// CountEvent records a counting event
type CountEvent struct {
    Timestamp int64  `json:"timestamp"`
    Value     int64  `json:"value"`
    Operation string `json:"operation"`
    Source    string `json:"source"`
}

// Settings configuration
type Settings struct {
    InitialValue int64  `json:"initialValue" title:"Initial Value" default:"0"
        description:"Starting count value"`
    MaxHistory   int    `json:"maxHistory" title:"Max History" default:"100"
        description:"Maximum history entries to keep"`
    Persistent   bool   `json:"persistent" title:"Persist State" default:"true"
        description:"Save state to Kubernetes CR"`
}

// IncrementInput for increasing the counter
type IncrementInput struct {
    Amount int64  `json:"amount" title:"Amount" default:"1" configurable:"true"
        description:"Amount to increment by"`
    Source string `json:"source,omitempty" title:"Source" configurable:"true"
        description:"Source of the increment operation"`
}

// DecrementInput for decreasing the counter
type DecrementInput struct {
    Amount int64  `json:"amount" title:"Amount" default:"1" configurable:"true"
        description:"Amount to decrement by"`
    Source string `json:"source,omitempty" title:"Source" configurable:"true"
        description:"Source of the decrement operation"`
}

// SetInput for setting the counter directly
type SetInput struct {
    Value  int64  `json:"value" title:"Value" required:"true" configurable:"true"
        description:"Value to set"`
    Source string `json:"source,omitempty" title:"Source" configurable:"true"`
}

// CountOutput emitted after each operation
type CountOutput struct {
    Value      int64  `json:"value" title:"Current Value"`
    Previous   int64  `json:"previous" title:"Previous Value"`
    Operation  string `json:"operation" title:"Operation"`
    Amount     int64  `json:"amount,omitempty" title:"Amount Changed"`
    Source     string `json:"source,omitempty" title:"Source"`
    Timestamp  string `json:"timestamp" title:"Timestamp"`
}

// ControlState for viewing current state
type ControlState struct {
    Value      int64  `json:"value" readonly:"true" title:"Current Value"`
    LastUpdate string `json:"lastUpdate" readonly:"true" title:"Last Update"`
    EventCount int    `json:"eventCount" readonly:"true" title:"Events Recorded"`
    Reset      bool   `json:"reset" format:"button" title:"Reset Counter"`
}

var _ module.Component = (*Component)(nil)
var _ module.ControlHandler = (*Component)(nil)

func (c *Component) GetInfo() module.ComponentInfo {
    return module.ComponentInfo{
        Name:        ComponentName,
        Title:       "Stateful Counter",
        Description: "Counter with state propagation across replicas",
        Category:    "State",
        Tags:        []string{"counter", "state", "stateful"},
    }
}

func (c *Component) Ports() []module.Port {
    return []module.Port{
        {
            Name:          v1alpha1.SettingsPort,
            Label:         "Settings",
            Position:      module.PositionTop,
            Source:        true,
            Configuration: Settings{},
        },
        {
            Name:          v1alpha1.ControlPort,
            Label:         "Control",
            Position:      module.PositionTop,
            Source:        true,
            Configuration: ControlState{},
        },
        {
            Name:          v1alpha1.ReconcilePort,
            Label:         "Reconcile",
            Position:      module.PositionTop,
            Source:        true,
            Configuration: v1alpha1.TinyNode{},
        },
        {
            Name:          "increment",
            Label:         "Increment",
            Position:      module.PositionLeft,
            Source:        true,
            Configuration: IncrementInput{},
        },
        {
            Name:          "decrement",
            Label:         "Decrement",
            Position:      module.PositionLeft,
            Source:        true,
            Configuration: DecrementInput{},
        },
        {
            Name:          "set",
            Label:         "Set",
            Position:      module.PositionLeft,
            Source:        true,
            Configuration: SetInput{},
        },
        {
            Name:          "value",
            Label:         "Value",
            Position:      module.PositionRight,
            Source:        false,
            Configuration: CountOutput{},
        },
    }
}

func (c *Component) Handle(
    ctx context.Context,
    output module.Handler,
    port string,
    msg any,
) error {
    switch port {
    case v1alpha1.SettingsPort:
        c.settings = msg.(Settings)
        c.mu.Lock()
        if c.count == 0 {
            c.count = c.settings.InitialValue
        }
        c.mu.Unlock()
        return nil

    case v1alpha1.ReconcilePort:
        return c.handleReconcile(ctx, output, msg.(*v1alpha1.TinyNode))

    case "increment":
        return c.handleIncrement(ctx, output, msg.(IncrementInput))

    case "decrement":
        return c.handleDecrement(ctx, output, msg.(DecrementInput))

    case "set":
        return c.handleSet(ctx, output, msg.(SetInput))

    default:
        return fmt.Errorf("unknown port: %s", port)
    }
}

func (c *Component) handleReconcile(
    ctx context.Context,
    output module.Handler,
    node *v1alpha1.TinyNode,
) error {
    c.mu.Lock()
    defer c.mu.Unlock()

    // Read state from CR (all replicas)
    if node.Status.Metadata != nil {
        if countStr, ok := node.Status.Metadata["counter-value"]; ok {
            var savedCount int64
            fmt.Sscanf(countStr, "%d", &savedCount)

            // Only update if saved value is newer (based on timestamp)
            if timeStr, ok := node.Status.Metadata["counter-updated"]; ok {
                savedTime, _ := time.Parse(time.RFC3339, timeStr)
                if savedTime.After(c.lastUpdate) {
                    c.count = savedCount
                    c.lastUpdate = savedTime
                }
            }
        }
    }

    // Leader writes current state
    if utils.IsLeader(ctx) && c.settings.Persistent {
        return output(ctx, v1alpha1.ReconcilePort, func(n *v1alpha1.TinyNode) {
            if n.Status.Metadata == nil {
                n.Status.Metadata = make(map[string]string)
            }
            n.Status.Metadata["counter-value"] = fmt.Sprintf("%d", c.count)
            n.Status.Metadata["counter-updated"] = c.lastUpdate.Format(time.RFC3339)
        })
    }

    return nil
}

func (c *Component) handleIncrement(
    ctx context.Context,
    output module.Handler,
    input IncrementInput,
) error {
    // Only leader can modify state
    if !utils.IsLeader(ctx) {
        return fmt.Errorf("only leader can modify state")
    }

    c.mu.Lock()
    previous := c.count
    amount := input.Amount
    if amount == 0 {
        amount = 1
    }
    c.count += amount
    c.lastUpdate = time.Now()

    // Record history
    c.addHistory(CountEvent{
        Timestamp: c.lastUpdate.UnixNano(),
        Value:     c.count,
        Operation: "increment",
        Source:    input.Source,
    })
    c.mu.Unlock()

    // Persist state
    if c.settings.Persistent {
        c.persistState(ctx, output)
    }

    return output(ctx, "value", CountOutput{
        Value:     c.count,
        Previous:  previous,
        Operation: "increment",
        Amount:    amount,
        Source:    input.Source,
        Timestamp: c.lastUpdate.Format(time.RFC3339),
    })
}

func (c *Component) handleDecrement(
    ctx context.Context,
    output module.Handler,
    input DecrementInput,
) error {
    if !utils.IsLeader(ctx) {
        return fmt.Errorf("only leader can modify state")
    }

    c.mu.Lock()
    previous := c.count
    amount := input.Amount
    if amount == 0 {
        amount = 1
    }
    c.count -= amount
    c.lastUpdate = time.Now()

    c.addHistory(CountEvent{
        Timestamp: c.lastUpdate.UnixNano(),
        Value:     c.count,
        Operation: "decrement",
        Source:    input.Source,
    })
    c.mu.Unlock()

    if c.settings.Persistent {
        c.persistState(ctx, output)
    }

    return output(ctx, "value", CountOutput{
        Value:     c.count,
        Previous:  previous,
        Operation: "decrement",
        Amount:    amount,
        Source:    input.Source,
        Timestamp: c.lastUpdate.Format(time.RFC3339),
    })
}

func (c *Component) handleSet(
    ctx context.Context,
    output module.Handler,
    input SetInput,
) error {
    if !utils.IsLeader(ctx) {
        return fmt.Errorf("only leader can modify state")
    }

    c.mu.Lock()
    previous := c.count
    c.count = input.Value
    c.lastUpdate = time.Now()

    c.addHistory(CountEvent{
        Timestamp: c.lastUpdate.UnixNano(),
        Value:     c.count,
        Operation: "set",
        Source:    input.Source,
    })
    c.mu.Unlock()

    if c.settings.Persistent {
        c.persistState(ctx, output)
    }

    return output(ctx, "value", CountOutput{
        Value:     c.count,
        Previous:  previous,
        Operation: "set",
        Amount:    c.count - previous,
        Source:    input.Source,
        Timestamp: c.lastUpdate.Format(time.RFC3339),
    })
}

func (c *Component) addHistory(event CountEvent) {
    c.history = append(c.history, event)

    // Trim history
    if len(c.history) > c.settings.MaxHistory {
        c.history = c.history[len(c.history)-c.settings.MaxHistory:]
    }
}

func (c *Component) persistState(ctx context.Context, output module.Handler) {
    c.mu.RLock()
    count := c.count
    lastUpdate := c.lastUpdate
    c.mu.RUnlock()

    output(ctx, v1alpha1.ReconcilePort, func(n *v1alpha1.TinyNode) {
        if n.Status.Metadata == nil {
            n.Status.Metadata = make(map[string]string)
        }
        n.Status.Metadata["counter-value"] = fmt.Sprintf("%d", count)
        n.Status.Metadata["counter-updated"] = lastUpdate.Format(time.RFC3339)
    })
}

func (c *Component) HandleControl(
    ctx context.Context,
    state any,
    port string,
    msg any,
) (any, error) {
    switch port {
    case "reset":
        if !utils.IsLeader(ctx) {
            return state, fmt.Errorf("only leader can reset")
        }

        c.mu.Lock()
        c.count = c.settings.InitialValue
        c.lastUpdate = time.Now()
        c.history = nil
        c.mu.Unlock()

        output := utils.GetOutputHandler(ctx)
        if output != nil && c.settings.Persistent {
            c.persistState(ctx, output)
        }
    }

    c.mu.RLock()
    defer c.mu.RUnlock()

    return &ControlState{
        Value:      c.count,
        LastUpdate: c.lastUpdate.Format(time.RFC3339),
        EventCount: len(c.history),
    }, nil
}

func (c *Component) Instance() module.Component {
    return &Component{}
}

State Propagation Flow

┌─────────────────────────────────────────────────────────────────────────┐
│                         Kubernetes Cluster                               │
│                                                                          │
│  ┌──────────────────────────────────────────────────────────────────┐   │
│  │                        TinyNode CR                                │   │
│  │                                                                   │   │
│  │  status:                                                          │   │
│  │    metadata:                                                      │   │
│  │      counter-value: "42"                                         │   │
│  │      counter-updated: "2024-01-15T10:30:00Z"                     │   │
│  │                                                                   │   │
│  └───────────────────────────┬──────────────────────────────────────┘   │
│                              │                                           │
│              ┌───────────────┼───────────────┐                          │
│              │               │               │                          │
│              ▼               ▼               ▼                          │
│  ┌───────────────┐  ┌───────────────┐  ┌───────────────┐               │
│  │   Pod 1       │  │   Pod 2       │  │   Pod 3       │               │
│  │   (Leader)    │  │   (Reader)    │  │   (Reader)    │               │
│  │               │  │               │  │               │               │
│  │  count: 42    │  │  count: 42    │  │  count: 42    │               │
│  │  ┌─────────┐  │  │  ┌─────────┐  │  │  ┌─────────┐  │               │
│  │  │ Write   │──┼──┼─►│  Read   │  │  │  │  Read   │  │               │
│  │  │ State   │  │  │  │  State  │  │  │  │  State  │  │               │
│  │  └─────────┘  │  │  └─────────┘  │  │  └─────────┘  │               │
│  └───────────────┘  └───────────────┘  └───────────────┘               │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

Key Patterns Demonstrated

1. Leader-Only Writes

Only the leader modifies state:

go
if !utils.IsLeader(ctx) {
    return fmt.Errorf("only leader can modify state")
}

2. State Persistence via CR

Write state to TinyNode metadata:

go
output(ctx, v1alpha1.ReconcilePort, func(n *v1alpha1.TinyNode) {
    n.Status.Metadata["counter-value"] = fmt.Sprintf("%d", c.count)
    n.Status.Metadata["counter-updated"] = lastUpdate.Format(time.RFC3339)
})

3. State Recovery on Reconcile

Read state from CR:

go
if countStr, ok := node.Status.Metadata["counter-value"]; ok {
    fmt.Sscanf(countStr, "%d", &savedCount)
    if savedTime.After(c.lastUpdate) {
        c.count = savedCount
    }
}

4. Timestamp-Based Conflict Resolution

Use timestamps to resolve conflicts:

go
if savedTime.After(c.lastUpdate) {
    c.count = savedCount
    c.lastUpdate = savedTime
}

5. Thread-Safe Access

Protect state with mutex:

go
c.mu.Lock()
previous := c.count
c.count += amount
c.mu.Unlock()

Usage Example

Settings

yaml
edges:
  - port: _settings
    data:
      initialValue: 0
      maxHistory: 100
      persistent: true

Increment Trigger

yaml
edges:
  - port: increment
    data:
      amount: 1
      source: "api_request"

State Lifecycle

  1. Initialization: Component starts with initialValue
  2. Reconciliation: Reads existing state from CR
  3. Operation: Leader processes increment/decrement/set
  4. Persistence: Leader writes new state to CR
  5. Propagation: Readers receive state on next reconcile
  6. Recovery: After pod restart, state is restored from CR

Best Practices

1. Idempotent Operations

Design operations to be safely retriable:

go
// Include unique operation ID
type IncrementInput struct {
    OperationID string `json:"operationId"`
    Amount      int64  `json:"amount"`
}

// Check if already processed
if c.processedOperations[input.OperationID] {
    return nil // Already processed
}

2. Versioned State

Add version numbers for complex state:

go
n.Status.Metadata["state-version"] = fmt.Sprintf("%d", c.stateVersion)

3. State Validation

Validate state on read:

go
if savedCount < 0 && !c.settings.AllowNegative {
    savedCount = 0 // Correct invalid state
}

Extension Ideas

  1. Distributed Locking: Use Kubernetes leases for stronger guarantees
  2. Event Sourcing: Store all events, rebuild state
  3. Snapshot Checkpoints: Periodic full state snapshots
  4. Conflict Detection: Alert on concurrent modifications
  5. State Encryption: Encrypt sensitive state in CR

Build flow-based applications on Kubernetes