Skip to content

Periodic Emitter Component

A complete example of a time-based component with start/stop controls.

Overview

This component emits messages at configurable intervals. It demonstrates:

  • Background goroutine management
  • Start/Stop control interface
  • Leader-only execution
  • Graceful shutdown

Complete Implementation

go
package ticker

import (
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/tiny-systems/module/api/v1alpha1"
    "github.com/tiny-systems/module/pkg/module"
    "github.com/tiny-systems/module/pkg/utils"
)

const ComponentName = "periodic_emitter"

type Component struct {
    settings Settings

    mu       sync.RWMutex
    running  bool
    stopCh   chan struct{}
    tickCount int64
}

// Settings configuration
type Settings struct {
    Interval     string `json:"interval" title:"Interval" default:"1m" required:"true"
        description:"Time between emissions (e.g., 30s, 1m, 1h)"`
    EmitOnStart  bool   `json:"emitOnStart" title:"Emit on Start" default:"true"
        description:"Emit immediately when started"`
    MaxEmissions int    `json:"maxEmissions,omitempty" title:"Max Emissions" minimum:"0"
        description:"Maximum number of emissions (0 = unlimited)"`
    Payload      any    `json:"payload,omitempty" title:"Payload" configurable:"true"
        description:"Custom payload to include in each emission"`
}

// ControlState for start/stop buttons
type ControlState struct {
    Status     string `json:"status" readonly:"true" title:"Status"`
    TickCount  int64  `json:"tickCount" readonly:"true" title:"Emissions"`
    LastEmit   string `json:"lastEmit,omitempty" readonly:"true" title:"Last Emission"`
    NextEmit   string `json:"nextEmit,omitempty" readonly:"true" title:"Next Emission"`
    Start      bool   `json:"start" format:"button" title:"Start" colSpan:"col-span-6"`
    Stop       bool   `json:"stop" format:"button" title:"Stop" colSpan:"col-span-6"`
}

// TickOutput emitted on each interval
type TickOutput struct {
    Timestamp   int64  `json:"timestamp" title:"Timestamp (ns)"`
    Formatted   string `json:"formatted" title:"Formatted Time"`
    TickNumber  int64  `json:"tickNumber" title:"Tick Number"`
    Payload     any    `json:"payload,omitempty" title:"Payload"`
}

// Implement both Component and ControlHandler interfaces
var _ module.Component = (*Component)(nil)
var _ module.ControlHandler = (*Component)(nil)

func (c *Component) GetInfo() module.ComponentInfo {
    return module.ComponentInfo{
        Name:        ComponentName,
        Title:       "Periodic Emitter",
        Description: "Emits messages at configurable time intervals",
        Category:    "Triggers",
        Tags:        []string{"timer", "cron", "scheduler", "trigger"},
    }
}

func (c *Component) Ports() []module.Port {
    return []module.Port{
        {
            Name:          v1alpha1.SettingsPort,
            Label:         "Settings",
            Position:      module.PositionTop,
            Source:        true,
            Configuration: Settings{},
        },
        {
            Name:          v1alpha1.ControlPort,
            Label:         "Control",
            Position:      module.PositionTop,
            Source:        true,
            Configuration: ControlState{Status: "stopped"},
        },
        {
            Name:          "tick",
            Label:         "Tick",
            Position:      module.PositionRight,
            Source:        false,
            Configuration: TickOutput{},
        },
    }
}

func (c *Component) Handle(
    ctx context.Context,
    output module.Handler,
    port string,
    msg any,
) error {
    switch port {
    case v1alpha1.SettingsPort:
        c.settings = msg.(Settings)
        return nil

    default:
        return fmt.Errorf("unknown port: %s", port)
    }
}

// HandleControl implements the ControlHandler interface
func (c *Component) HandleControl(
    ctx context.Context,
    state any,
    port string,
    msg any,
) (any, error) {
    controlState, _ := state.(*ControlState)
    if controlState == nil {
        controlState = &ControlState{Status: "stopped"}
    }

    switch port {
    case "start":
        // Only leader can start
        if !utils.IsLeader(ctx) {
            return controlState, nil
        }

        if c.isRunning() {
            return controlState, nil
        }

        // Get output handler from context
        output := utils.GetOutputHandler(ctx)
        if output == nil {
            return controlState, fmt.Errorf("no output handler available")
        }

        c.start(ctx, output)

        return &ControlState{
            Status:    "running",
            TickCount: 0,
            NextEmit:  c.calculateNextEmit(),
        }, nil

    case "stop":
        if !utils.IsLeader(ctx) {
            return controlState, nil
        }

        c.stop()

        return &ControlState{
            Status:    "stopped",
            TickCount: c.getTickCount(),
            LastEmit:  controlState.LastEmit,
        }, nil
    }

    return controlState, nil
}

func (c *Component) start(ctx context.Context, output module.Handler) {
    c.mu.Lock()
    defer c.mu.Unlock()

    if c.running {
        return
    }

    c.running = true
    c.stopCh = make(chan struct{})
    c.tickCount = 0

    go c.runLoop(ctx, output)
}

func (c *Component) stop() {
    c.mu.Lock()
    defer c.mu.Unlock()

    if !c.running {
        return
    }

    c.running = false
    close(c.stopCh)
}

func (c *Component) isRunning() bool {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.running
}

func (c *Component) getTickCount() int64 {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.tickCount
}

func (c *Component) runLoop(ctx context.Context, output module.Handler) {
    interval, err := time.ParseDuration(c.settings.Interval)
    if err != nil {
        interval = time.Minute
    }

    ticker := time.NewTicker(interval)
    defer ticker.Stop()

    // Emit on start if configured
    if c.settings.EmitOnStart {
        c.emit(ctx, output)
    }

    for {
        select {
        case <-c.stopCh:
            return
        case <-ctx.Done():
            return
        case <-ticker.C:
            if !c.emit(ctx, output) {
                return // Max emissions reached
            }
        }
    }
}

func (c *Component) emit(ctx context.Context, output module.Handler) bool {
    c.mu.Lock()
    c.tickCount++
    tickNum := c.tickCount
    c.mu.Unlock()

    // Check max emissions
    if c.settings.MaxEmissions > 0 && tickNum > int64(c.settings.MaxEmissions) {
        c.stop()
        return false
    }

    now := time.Now()
    tickOutput := TickOutput{
        Timestamp:  now.UnixNano(),
        Formatted:  now.Format(time.RFC3339),
        TickNumber: tickNum,
        Payload:    c.settings.Payload,
    }

    // Non-blocking emit to avoid stalling the ticker
    go func() {
        _ = output(ctx, "tick", tickOutput)
    }()

    return true
}

func (c *Component) calculateNextEmit() string {
    interval, err := time.ParseDuration(c.settings.Interval)
    if err != nil {
        return "unknown"
    }
    return time.Now().Add(interval).Format(time.RFC3339)
}

func (c *Component) Instance() module.Component {
    return &Component{}
}

Usage Example

Settings Configuration

yaml
edges:
  - port: _settings
    data:
      interval: "30s"
      emitOnStart: true
      maxEmissions: 0
      payload:
        source: "scheduled_task"
        taskId: "cleanup_job"

Control Panel UI

The control panel shows:

┌──────────────────────────────────────┐
│ Status:      running                 │
│ Emissions:   42                      │
│ Last Emit:   2024-01-15T10:30:00Z    │
│ Next Emit:   2024-01-15T10:30:30Z    │
│                                      │
│  [  Start  ]     [  Stop  ]          │
└──────────────────────────────────────┘

Output

Each tick emits:

json
{
  "timestamp": 1705312200000000000,
  "formatted": "2024-01-15T10:30:00Z",
  "tickNumber": 42,
  "payload": {
    "source": "scheduled_task",
    "taskId": "cleanup_job"
  }
}

Key Patterns Demonstrated

1. ControlHandler Interface

Separate interface for button handling:

go
type ControlHandler interface {
    HandleControl(ctx context.Context, state any, port string, msg any) (any, error)
}

2. Leader-Only Execution

Only the leader instance runs the timer:

go
if !utils.IsLeader(ctx) {
    return controlState, nil
}

3. Graceful Shutdown

Use channels for clean goroutine termination:

go
c.stopCh = make(chan struct{})

// In runLoop
select {
case <-c.stopCh:
    return
case <-ctx.Done():
    return
case <-ticker.C:
    c.emit(ctx, output)
}

4. Thread-Safe State

Protect shared state with mutex:

go
c.mu.Lock()
c.tickCount++
c.mu.Unlock()

5. Non-Blocking Emit

Don't block the ticker on slow downstream:

go
go func() {
    _ = output(ctx, "tick", tickOutput)
}()

Control State Management

State Updates

Control state is returned from HandleControl:

go
return &ControlState{
    Status:    "running",
    TickCount: 0,
    NextEmit:  c.calculateNextEmit(),
}, nil

Read-Only Display Fields

go
Status    string `json:"status" readonly:"true" title:"Status"`
TickCount int64  `json:"tickCount" readonly:"true" title:"Emissions"`

Button Fields

go
Start bool `json:"start" format:"button" title:"Start" colSpan:"col-span-6"`
Stop  bool `json:"stop" format:"button" title:"Stop" colSpan:"col-span-6"`

Visual Flow

┌─────────────────────────────────────────────────────────┐
│                   Periodic Emitter                       │
│                                                          │
│  Settings:                                               │
│    interval: 30s                                         │
│    emitOnStart: true                                     │
│                                                          │
│  ┌─────────────────────────────────────────────────┐    │
│  │         Background Goroutine                     │    │
│  │                                                  │    │
│  │   ┌─────────┐     ┌─────────┐                   │    │
│  │   │  Wait   │────►│  Emit   │────► Tick Port    │    │
│  │   │  30s    │     │  Tick   │                   │    │
│  │   └─────────┘     └─────────┘                   │    │
│  │        ▲               │                        │    │
│  │        └───────────────┘                        │    │
│  └─────────────────────────────────────────────────┘    │
│                                                          │
│  Control: [Start] [Stop]                                │
└─────────────────────────────────────────────────────────┘

Common Use Cases

1. Scheduled Data Sync

yaml
interval: "1h"
payload:
  task: "sync_users"
  endpoint: "https://api.example.com/users"

2. Health Check Pings

yaml
interval: "30s"
emitOnStart: false
payload:
  type: "health_check"

3. Metrics Collection

yaml
interval: "1m"
payload:
  action: "collect_metrics"
  targets: ["cpu", "memory", "disk"]

4. Limited Batch Processing

yaml
interval: "5s"
maxEmissions: 10
payload:
  batchId: "{{$.batchId}}"

Extension Ideas

  1. Cron Expressions: Support cron-style scheduling
  2. Jitter: Add random jitter to prevent thundering herd
  3. Timezone Support: Configure timezone for scheduling
  4. Calendar Integration: Skip weekends/holidays
  5. Dynamic Interval: Adjust interval based on conditions

Build flow-based applications on Kubernetes