Stateful Component Example
A complete example of a component with state propagation across replicas.
Overview
This component maintains state that must be shared across multiple replicas. It demonstrates:
- Leader-reader pattern
- State propagation via TinyNode CR
- Graceful state recovery
- Concurrent access handling
Complete Implementation
go
package counter
import (
"context"
"fmt"
"sync"
"time"
"github.com/tiny-systems/module/api/v1alpha1"
"github.com/tiny-systems/module/pkg/module"
"github.com/tiny-systems/module/pkg/utils"
)
const ComponentName = "stateful_counter"
type Component struct {
settings Settings
mu sync.RWMutex
count int64
lastUpdate time.Time
history []CountEvent
}
// CountEvent records a counting event
type CountEvent struct {
Timestamp int64 `json:"timestamp"`
Value int64 `json:"value"`
Operation string `json:"operation"`
Source string `json:"source"`
}
// Settings configuration
type Settings struct {
InitialValue int64 `json:"initialValue" title:"Initial Value" default:"0"
description:"Starting count value"`
MaxHistory int `json:"maxHistory" title:"Max History" default:"100"
description:"Maximum history entries to keep"`
Persistent bool `json:"persistent" title:"Persist State" default:"true"
description:"Save state to Kubernetes CR"`
}
// IncrementInput for increasing the counter
type IncrementInput struct {
Amount int64 `json:"amount" title:"Amount" default:"1" configurable:"true"
description:"Amount to increment by"`
Source string `json:"source,omitempty" title:"Source" configurable:"true"
description:"Source of the increment operation"`
}
// DecrementInput for decreasing the counter
type DecrementInput struct {
Amount int64 `json:"amount" title:"Amount" default:"1" configurable:"true"
description:"Amount to decrement by"`
Source string `json:"source,omitempty" title:"Source" configurable:"true"
description:"Source of the decrement operation"`
}
// SetInput for setting the counter directly
type SetInput struct {
Value int64 `json:"value" title:"Value" required:"true" configurable:"true"
description:"Value to set"`
Source string `json:"source,omitempty" title:"Source" configurable:"true"`
}
// CountOutput emitted after each operation
type CountOutput struct {
Value int64 `json:"value" title:"Current Value"`
Previous int64 `json:"previous" title:"Previous Value"`
Operation string `json:"operation" title:"Operation"`
Amount int64 `json:"amount,omitempty" title:"Amount Changed"`
Source string `json:"source,omitempty" title:"Source"`
Timestamp string `json:"timestamp" title:"Timestamp"`
}
// ControlState for viewing current state
type ControlState struct {
Value int64 `json:"value" readonly:"true" title:"Current Value"`
LastUpdate string `json:"lastUpdate" readonly:"true" title:"Last Update"`
EventCount int `json:"eventCount" readonly:"true" title:"Events Recorded"`
Reset bool `json:"reset" format:"button" title:"Reset Counter"`
}
var _ module.Component = (*Component)(nil)
var _ module.ControlHandler = (*Component)(nil)
func (c *Component) GetInfo() module.ComponentInfo {
return module.ComponentInfo{
Name: ComponentName,
Title: "Stateful Counter",
Description: "Counter with state propagation across replicas",
Category: "State",
Tags: []string{"counter", "state", "stateful"},
}
}
func (c *Component) Ports() []module.Port {
return []module.Port{
{
Name: v1alpha1.SettingsPort,
Label: "Settings",
Position: module.PositionTop,
Source: true,
Configuration: Settings{},
},
{
Name: v1alpha1.ControlPort,
Label: "Control",
Position: module.PositionTop,
Source: true,
Configuration: ControlState{},
},
{
Name: v1alpha1.ReconcilePort,
Label: "Reconcile",
Position: module.PositionTop,
Source: true,
Configuration: v1alpha1.TinyNode{},
},
{
Name: "increment",
Label: "Increment",
Position: module.PositionLeft,
Source: true,
Configuration: IncrementInput{},
},
{
Name: "decrement",
Label: "Decrement",
Position: module.PositionLeft,
Source: true,
Configuration: DecrementInput{},
},
{
Name: "set",
Label: "Set",
Position: module.PositionLeft,
Source: true,
Configuration: SetInput{},
},
{
Name: "value",
Label: "Value",
Position: module.PositionRight,
Source: false,
Configuration: CountOutput{},
},
}
}
func (c *Component) Handle(
ctx context.Context,
output module.Handler,
port string,
msg any,
) error {
switch port {
case v1alpha1.SettingsPort:
c.settings = msg.(Settings)
c.mu.Lock()
if c.count == 0 {
c.count = c.settings.InitialValue
}
c.mu.Unlock()
return nil
case v1alpha1.ReconcilePort:
return c.handleReconcile(ctx, output, msg.(*v1alpha1.TinyNode))
case "increment":
return c.handleIncrement(ctx, output, msg.(IncrementInput))
case "decrement":
return c.handleDecrement(ctx, output, msg.(DecrementInput))
case "set":
return c.handleSet(ctx, output, msg.(SetInput))
default:
return fmt.Errorf("unknown port: %s", port)
}
}
func (c *Component) handleReconcile(
ctx context.Context,
output module.Handler,
node *v1alpha1.TinyNode,
) error {
c.mu.Lock()
defer c.mu.Unlock()
// Read state from CR (all replicas)
if node.Status.Metadata != nil {
if countStr, ok := node.Status.Metadata["counter-value"]; ok {
var savedCount int64
fmt.Sscanf(countStr, "%d", &savedCount)
// Only update if saved value is newer (based on timestamp)
if timeStr, ok := node.Status.Metadata["counter-updated"]; ok {
savedTime, _ := time.Parse(time.RFC3339, timeStr)
if savedTime.After(c.lastUpdate) {
c.count = savedCount
c.lastUpdate = savedTime
}
}
}
}
// Leader writes current state
if utils.IsLeader(ctx) && c.settings.Persistent {
return output(ctx, v1alpha1.ReconcilePort, func(n *v1alpha1.TinyNode) {
if n.Status.Metadata == nil {
n.Status.Metadata = make(map[string]string)
}
n.Status.Metadata["counter-value"] = fmt.Sprintf("%d", c.count)
n.Status.Metadata["counter-updated"] = c.lastUpdate.Format(time.RFC3339)
})
}
return nil
}
func (c *Component) handleIncrement(
ctx context.Context,
output module.Handler,
input IncrementInput,
) error {
// Only leader can modify state
if !utils.IsLeader(ctx) {
return fmt.Errorf("only leader can modify state")
}
c.mu.Lock()
previous := c.count
amount := input.Amount
if amount == 0 {
amount = 1
}
c.count += amount
c.lastUpdate = time.Now()
// Record history
c.addHistory(CountEvent{
Timestamp: c.lastUpdate.UnixNano(),
Value: c.count,
Operation: "increment",
Source: input.Source,
})
c.mu.Unlock()
// Persist state
if c.settings.Persistent {
c.persistState(ctx, output)
}
return output(ctx, "value", CountOutput{
Value: c.count,
Previous: previous,
Operation: "increment",
Amount: amount,
Source: input.Source,
Timestamp: c.lastUpdate.Format(time.RFC3339),
})
}
func (c *Component) handleDecrement(
ctx context.Context,
output module.Handler,
input DecrementInput,
) error {
if !utils.IsLeader(ctx) {
return fmt.Errorf("only leader can modify state")
}
c.mu.Lock()
previous := c.count
amount := input.Amount
if amount == 0 {
amount = 1
}
c.count -= amount
c.lastUpdate = time.Now()
c.addHistory(CountEvent{
Timestamp: c.lastUpdate.UnixNano(),
Value: c.count,
Operation: "decrement",
Source: input.Source,
})
c.mu.Unlock()
if c.settings.Persistent {
c.persistState(ctx, output)
}
return output(ctx, "value", CountOutput{
Value: c.count,
Previous: previous,
Operation: "decrement",
Amount: amount,
Source: input.Source,
Timestamp: c.lastUpdate.Format(time.RFC3339),
})
}
func (c *Component) handleSet(
ctx context.Context,
output module.Handler,
input SetInput,
) error {
if !utils.IsLeader(ctx) {
return fmt.Errorf("only leader can modify state")
}
c.mu.Lock()
previous := c.count
c.count = input.Value
c.lastUpdate = time.Now()
c.addHistory(CountEvent{
Timestamp: c.lastUpdate.UnixNano(),
Value: c.count,
Operation: "set",
Source: input.Source,
})
c.mu.Unlock()
if c.settings.Persistent {
c.persistState(ctx, output)
}
return output(ctx, "value", CountOutput{
Value: c.count,
Previous: previous,
Operation: "set",
Amount: c.count - previous,
Source: input.Source,
Timestamp: c.lastUpdate.Format(time.RFC3339),
})
}
func (c *Component) addHistory(event CountEvent) {
c.history = append(c.history, event)
// Trim history
if len(c.history) > c.settings.MaxHistory {
c.history = c.history[len(c.history)-c.settings.MaxHistory:]
}
}
func (c *Component) persistState(ctx context.Context, output module.Handler) {
c.mu.RLock()
count := c.count
lastUpdate := c.lastUpdate
c.mu.RUnlock()
output(ctx, v1alpha1.ReconcilePort, func(n *v1alpha1.TinyNode) {
if n.Status.Metadata == nil {
n.Status.Metadata = make(map[string]string)
}
n.Status.Metadata["counter-value"] = fmt.Sprintf("%d", count)
n.Status.Metadata["counter-updated"] = lastUpdate.Format(time.RFC3339)
})
}
func (c *Component) HandleControl(
ctx context.Context,
state any,
port string,
msg any,
) (any, error) {
switch port {
case "reset":
if !utils.IsLeader(ctx) {
return state, fmt.Errorf("only leader can reset")
}
c.mu.Lock()
c.count = c.settings.InitialValue
c.lastUpdate = time.Now()
c.history = nil
c.mu.Unlock()
output := utils.GetOutputHandler(ctx)
if output != nil && c.settings.Persistent {
c.persistState(ctx, output)
}
}
c.mu.RLock()
defer c.mu.RUnlock()
return &ControlState{
Value: c.count,
LastUpdate: c.lastUpdate.Format(time.RFC3339),
EventCount: len(c.history),
}, nil
}
func (c *Component) Instance() module.Component {
return &Component{}
}State Propagation Flow
┌─────────────────────────────────────────────────────────────────────────┐
│ Kubernetes Cluster │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ TinyNode CR │ │
│ │ │ │
│ │ status: │ │
│ │ metadata: │ │
│ │ counter-value: "42" │ │
│ │ counter-updated: "2024-01-15T10:30:00Z" │ │
│ │ │ │
│ └───────────────────────────┬──────────────────────────────────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ Pod 1 │ │ Pod 2 │ │ Pod 3 │ │
│ │ (Leader) │ │ (Reader) │ │ (Reader) │ │
│ │ │ │ │ │ │ │
│ │ count: 42 │ │ count: 42 │ │ count: 42 │ │
│ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │
│ │ │ Write │──┼──┼─►│ Read │ │ │ │ Read │ │ │
│ │ │ State │ │ │ │ State │ │ │ │ State │ │ │
│ │ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘Key Patterns Demonstrated
1. Leader-Only Writes
Only the leader modifies state:
go
if !utils.IsLeader(ctx) {
return fmt.Errorf("only leader can modify state")
}2. State Persistence via CR
Write state to TinyNode metadata:
go
output(ctx, v1alpha1.ReconcilePort, func(n *v1alpha1.TinyNode) {
n.Status.Metadata["counter-value"] = fmt.Sprintf("%d", c.count)
n.Status.Metadata["counter-updated"] = lastUpdate.Format(time.RFC3339)
})3. State Recovery on Reconcile
Read state from CR:
go
if countStr, ok := node.Status.Metadata["counter-value"]; ok {
fmt.Sscanf(countStr, "%d", &savedCount)
if savedTime.After(c.lastUpdate) {
c.count = savedCount
}
}4. Timestamp-Based Conflict Resolution
Use timestamps to resolve conflicts:
go
if savedTime.After(c.lastUpdate) {
c.count = savedCount
c.lastUpdate = savedTime
}5. Thread-Safe Access
Protect state with mutex:
go
c.mu.Lock()
previous := c.count
c.count += amount
c.mu.Unlock()Usage Example
Settings
yaml
edges:
- port: _settings
data:
initialValue: 0
maxHistory: 100
persistent: trueIncrement Trigger
yaml
edges:
- port: increment
data:
amount: 1
source: "api_request"State Lifecycle
- Initialization: Component starts with
initialValue - Reconciliation: Reads existing state from CR
- Operation: Leader processes increment/decrement/set
- Persistence: Leader writes new state to CR
- Propagation: Readers receive state on next reconcile
- Recovery: After pod restart, state is restored from CR
Best Practices
1. Idempotent Operations
Design operations to be safely retriable:
go
// Include unique operation ID
type IncrementInput struct {
OperationID string `json:"operationId"`
Amount int64 `json:"amount"`
}
// Check if already processed
if c.processedOperations[input.OperationID] {
return nil // Already processed
}2. Versioned State
Add version numbers for complex state:
go
n.Status.Metadata["state-version"] = fmt.Sprintf("%d", c.stateVersion)3. State Validation
Validate state on read:
go
if savedCount < 0 && !c.settings.AllowNegative {
savedCount = 0 // Correct invalid state
}Extension Ideas
- Distributed Locking: Use Kubernetes leases for stronger guarantees
- Event Sourcing: Store all events, rebuild state
- Snapshot Checkpoints: Periodic full state snapshots
- Conflict Detection: Alert on concurrent modifications
- State Encryption: Encrypt sensitive state in CR