Periodic Emitter Component
A complete example of a time-based component with start/stop controls.
Overview
This component emits messages at configurable intervals. It demonstrates:
- Background goroutine management
- Start/Stop control interface
- Leader-only execution
- Graceful shutdown
Complete Implementation
go
package ticker
import (
"context"
"fmt"
"sync"
"time"
"github.com/tiny-systems/module/api/v1alpha1"
"github.com/tiny-systems/module/pkg/module"
"github.com/tiny-systems/module/pkg/utils"
)
const ComponentName = "periodic_emitter"
type Component struct {
settings Settings
mu sync.RWMutex
running bool
stopCh chan struct{}
tickCount int64
}
// Settings configuration
type Settings struct {
Interval string `json:"interval" title:"Interval" default:"1m" required:"true"
description:"Time between emissions (e.g., 30s, 1m, 1h)"`
EmitOnStart bool `json:"emitOnStart" title:"Emit on Start" default:"true"
description:"Emit immediately when started"`
MaxEmissions int `json:"maxEmissions,omitempty" title:"Max Emissions" minimum:"0"
description:"Maximum number of emissions (0 = unlimited)"`
Payload any `json:"payload,omitempty" title:"Payload" configurable:"true"
description:"Custom payload to include in each emission"`
}
// ControlState for start/stop buttons
type ControlState struct {
Status string `json:"status" readonly:"true" title:"Status"`
TickCount int64 `json:"tickCount" readonly:"true" title:"Emissions"`
LastEmit string `json:"lastEmit,omitempty" readonly:"true" title:"Last Emission"`
NextEmit string `json:"nextEmit,omitempty" readonly:"true" title:"Next Emission"`
Start bool `json:"start" format:"button" title:"Start" colSpan:"col-span-6"`
Stop bool `json:"stop" format:"button" title:"Stop" colSpan:"col-span-6"`
}
// TickOutput emitted on each interval
type TickOutput struct {
Timestamp int64 `json:"timestamp" title:"Timestamp (ns)"`
Formatted string `json:"formatted" title:"Formatted Time"`
TickNumber int64 `json:"tickNumber" title:"Tick Number"`
Payload any `json:"payload,omitempty" title:"Payload"`
}
// Implement both Component and ControlHandler interfaces
var _ module.Component = (*Component)(nil)
var _ module.ControlHandler = (*Component)(nil)
func (c *Component) GetInfo() module.ComponentInfo {
return module.ComponentInfo{
Name: ComponentName,
Title: "Periodic Emitter",
Description: "Emits messages at configurable time intervals",
Category: "Triggers",
Tags: []string{"timer", "cron", "scheduler", "trigger"},
}
}
func (c *Component) Ports() []module.Port {
return []module.Port{
{
Name: v1alpha1.SettingsPort,
Label: "Settings",
Position: module.PositionTop,
Source: true,
Configuration: Settings{},
},
{
Name: v1alpha1.ControlPort,
Label: "Control",
Position: module.PositionTop,
Source: true,
Configuration: ControlState{Status: "stopped"},
},
{
Name: "tick",
Label: "Tick",
Position: module.PositionRight,
Source: false,
Configuration: TickOutput{},
},
}
}
func (c *Component) Handle(
ctx context.Context,
output module.Handler,
port string,
msg any,
) error {
switch port {
case v1alpha1.SettingsPort:
c.settings = msg.(Settings)
return nil
default:
return fmt.Errorf("unknown port: %s", port)
}
}
// HandleControl implements the ControlHandler interface
func (c *Component) HandleControl(
ctx context.Context,
state any,
port string,
msg any,
) (any, error) {
controlState, _ := state.(*ControlState)
if controlState == nil {
controlState = &ControlState{Status: "stopped"}
}
switch port {
case "start":
// Only leader can start
if !utils.IsLeader(ctx) {
return controlState, nil
}
if c.isRunning() {
return controlState, nil
}
// Get output handler from context
output := utils.GetOutputHandler(ctx)
if output == nil {
return controlState, fmt.Errorf("no output handler available")
}
c.start(ctx, output)
return &ControlState{
Status: "running",
TickCount: 0,
NextEmit: c.calculateNextEmit(),
}, nil
case "stop":
if !utils.IsLeader(ctx) {
return controlState, nil
}
c.stop()
return &ControlState{
Status: "stopped",
TickCount: c.getTickCount(),
LastEmit: controlState.LastEmit,
}, nil
}
return controlState, nil
}
func (c *Component) start(ctx context.Context, output module.Handler) {
c.mu.Lock()
defer c.mu.Unlock()
if c.running {
return
}
c.running = true
c.stopCh = make(chan struct{})
c.tickCount = 0
go c.runLoop(ctx, output)
}
func (c *Component) stop() {
c.mu.Lock()
defer c.mu.Unlock()
if !c.running {
return
}
c.running = false
close(c.stopCh)
}
func (c *Component) isRunning() bool {
c.mu.RLock()
defer c.mu.RUnlock()
return c.running
}
func (c *Component) getTickCount() int64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.tickCount
}
func (c *Component) runLoop(ctx context.Context, output module.Handler) {
interval, err := time.ParseDuration(c.settings.Interval)
if err != nil {
interval = time.Minute
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
// Emit on start if configured
if c.settings.EmitOnStart {
c.emit(ctx, output)
}
for {
select {
case <-c.stopCh:
return
case <-ctx.Done():
return
case <-ticker.C:
if !c.emit(ctx, output) {
return // Max emissions reached
}
}
}
}
func (c *Component) emit(ctx context.Context, output module.Handler) bool {
c.mu.Lock()
c.tickCount++
tickNum := c.tickCount
c.mu.Unlock()
// Check max emissions
if c.settings.MaxEmissions > 0 && tickNum > int64(c.settings.MaxEmissions) {
c.stop()
return false
}
now := time.Now()
tickOutput := TickOutput{
Timestamp: now.UnixNano(),
Formatted: now.Format(time.RFC3339),
TickNumber: tickNum,
Payload: c.settings.Payload,
}
// Non-blocking emit to avoid stalling the ticker
go func() {
_ = output(ctx, "tick", tickOutput)
}()
return true
}
func (c *Component) calculateNextEmit() string {
interval, err := time.ParseDuration(c.settings.Interval)
if err != nil {
return "unknown"
}
return time.Now().Add(interval).Format(time.RFC3339)
}
func (c *Component) Instance() module.Component {
return &Component{}
}Usage Example
Settings Configuration
yaml
edges:
- port: _settings
data:
interval: "30s"
emitOnStart: true
maxEmissions: 0
payload:
source: "scheduled_task"
taskId: "cleanup_job"Control Panel UI
The control panel shows:
┌──────────────────────────────────────┐
│ Status: running │
│ Emissions: 42 │
│ Last Emit: 2024-01-15T10:30:00Z │
│ Next Emit: 2024-01-15T10:30:30Z │
│ │
│ [ Start ] [ Stop ] │
└──────────────────────────────────────┘Output
Each tick emits:
json
{
"timestamp": 1705312200000000000,
"formatted": "2024-01-15T10:30:00Z",
"tickNumber": 42,
"payload": {
"source": "scheduled_task",
"taskId": "cleanup_job"
}
}Key Patterns Demonstrated
1. ControlHandler Interface
Separate interface for button handling:
go
type ControlHandler interface {
HandleControl(ctx context.Context, state any, port string, msg any) (any, error)
}2. Leader-Only Execution
Only the leader instance runs the timer:
go
if !utils.IsLeader(ctx) {
return controlState, nil
}3. Graceful Shutdown
Use channels for clean goroutine termination:
go
c.stopCh = make(chan struct{})
// In runLoop
select {
case <-c.stopCh:
return
case <-ctx.Done():
return
case <-ticker.C:
c.emit(ctx, output)
}4. Thread-Safe State
Protect shared state with mutex:
go
c.mu.Lock()
c.tickCount++
c.mu.Unlock()5. Non-Blocking Emit
Don't block the ticker on slow downstream:
go
go func() {
_ = output(ctx, "tick", tickOutput)
}()Control State Management
State Updates
Control state is returned from HandleControl:
go
return &ControlState{
Status: "running",
TickCount: 0,
NextEmit: c.calculateNextEmit(),
}, nilRead-Only Display Fields
go
Status string `json:"status" readonly:"true" title:"Status"`
TickCount int64 `json:"tickCount" readonly:"true" title:"Emissions"`Button Fields
go
Start bool `json:"start" format:"button" title:"Start" colSpan:"col-span-6"`
Stop bool `json:"stop" format:"button" title:"Stop" colSpan:"col-span-6"`Visual Flow
┌─────────────────────────────────────────────────────────┐
│ Periodic Emitter │
│ │
│ Settings: │
│ interval: 30s │
│ emitOnStart: true │
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Background Goroutine │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Wait │────►│ Emit │────► Tick Port │ │
│ │ │ 30s │ │ Tick │ │ │
│ │ └─────────┘ └─────────┘ │ │
│ │ ▲ │ │ │
│ │ └───────────────┘ │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ Control: [Start] [Stop] │
└─────────────────────────────────────────────────────────┘Common Use Cases
1. Scheduled Data Sync
yaml
interval: "1h"
payload:
task: "sync_users"
endpoint: "https://api.example.com/users"2. Health Check Pings
yaml
interval: "30s"
emitOnStart: false
payload:
type: "health_check"3. Metrics Collection
yaml
interval: "1m"
payload:
action: "collect_metrics"
targets: ["cpu", "memory", "disk"]4. Limited Batch Processing
yaml
interval: "5s"
maxEmissions: 10
payload:
batchId: "{{$.batchId}}"Extension Ideas
- Cron Expressions: Support cron-style scheduling
- Jitter: Add random jitter to prevent thundering herd
- Timezone Support: Configure timezone for scheduling
- Calendar Integration: Skip weekends/holidays
- Dynamic Interval: Adjust interval based on conditions