Handler Interface Reference
Complete reference for the message handler function used in TinySystems components.
Handler Definition
type Handler func(ctx context.Context, port string, message any) errorThe Handler is a callback function provided to the component's Handle method for emitting output messages.
Parameters
ctx (context.Context)
The context carries request-scoped values, cancellation signals, and tracing information.
Must always pass through the context:
// Correct
output(ctx, "output", result)
// Incorrect - loses tracing
output(context.Background(), "output", result)Context Values:
| Function | Description |
|---|---|
utils.IsLeader(ctx) | Check if current instance is leader |
trace.SpanFromContext(ctx) | Get current tracing span |
ctx.Done() | Channel closed on cancellation |
ctx.Err() | Error if context cancelled/expired |
port (string)
The name of the output port to emit the message on.
Requirements:
- Must match a port defined in
Ports()withSource: false - Case-sensitive
Examples:
output(ctx, "output", data)
output(ctx, "out_success", result)
output(ctx, "error", errInfo)message (any)
The data to emit. Should match the port's schema.
Type Requirements:
- Must be JSON-serializable
- Should match the Configuration struct for the port
- Can be a struct, map, slice, or primitive
Examples:
// Struct
output(ctx, "output", Output{
Result: "processed",
Count: 42,
})
// Map
output(ctx, "output", map[string]any{
"result": "processed",
"count": 42,
})
// Primitive (if schema allows)
output(ctx, "count", 42)Return Value
error
| Return | Meaning |
|---|---|
nil | Message delivered successfully |
error | Delivery failed |
Error Handling:
err := output(ctx, "output", result)
if err != nil {
// Handle delivery failure
return err
}Blocking Behavior
Default: Blocking
By default, the handler blocks until all downstream processing completes:
func (c *Component) Handle(ctx context.Context, output Handler, port string, msg any) error {
result := process(msg)
// This blocks until downstream nodes finish
err := output(ctx, "output", result)
// Execution continues only after downstream completes
return err
}Flow:
output() called
│
├─▶ Message delivered to next node
│ │
│ ├─▶ Next node processes
│ │ │
│ │ └─▶ (continues downstream)
│ │
│ └─▶ Returns when complete
│
└─▶ output() returnsBenefits of Blocking
- Backpressure: Automatically limits processing rate
- Resource Management: Prevents unbounded queue growth
- Error Propagation: Errors return to source
- Ordering: Maintains message order
Non-Blocking (Async) Usage
For fire-and-forget operations, use a goroutine:
func (c *Component) Handle(ctx context.Context, output Handler, port string, msg any) error {
// Capture span context for tracing
spanCtx := trace.SpanContextFromContext(ctx)
go func() {
// Create new context with span context
newCtx := trace.ContextWithSpanContext(context.Background(), spanCtx)
result := process(msg)
output(newCtx, "output", result)
}()
// Returns immediately
return nil
}Important: When using async:
- Preserve span context for tracing
- Handle errors within the goroutine
- Be aware of goroutine leaks
- Consider using the Async component instead
Multiple Outputs
Sequential Outputs
func (c *Component) Handle(ctx context.Context, output Handler, port string, msg any) error {
// First output
if err := output(ctx, "processed", result1); err != nil {
return err
}
// Second output (after first completes)
if err := output(ctx, "logged", result2); err != nil {
return err
}
return nil
}Parallel Outputs
func (c *Component) Handle(ctx context.Context, output Handler, port string, msg any) error {
var wg sync.WaitGroup
var mu sync.Mutex
var firstErr error
outputs := []struct {
port string
data any
}{
{"notify_email", emailData},
{"notify_slack", slackData},
}
for _, out := range outputs {
wg.Add(1)
go func(port string, data any) {
defer wg.Done()
if err := output(ctx, port, data); err != nil {
mu.Lock()
if firstErr == nil {
firstErr = err
}
mu.Unlock()
}
}(out.port, out.data)
}
wg.Wait()
return firstErr
}Conditional Output
Route Based on Condition
func (c *Component) Handle(ctx context.Context, output Handler, port string, msg any) error {
input := msg.(Input)
if input.Valid {
return output(ctx, "out_success", input)
}
return output(ctx, "out_error", ErrorInfo{
Message: "Invalid input",
})
}Optional Output
func (c *Component) Handle(ctx context.Context, output Handler, port string, msg any) error {
input := msg.(Input)
// Always process
result := process(input)
// Conditionally notify
if c.settings.EnableNotifications {
if err := output(ctx, "notification", result); err != nil {
// Log but don't fail
log.Warn("notification failed", "error", err)
}
}
return output(ctx, "output", result)
}Special Handler Patterns
Reconcile Port Handler
For _reconcile port, the handler updates the TinyNode:
func (c *Component) Handle(ctx context.Context, output Handler, port string, msg any) error {
if port == v1alpha1.ReconcilePort {
node := msg.(*v1alpha1.TinyNode)
// Update node metadata
return output(ctx, v1alpha1.ReconcilePort, func(n *v1alpha1.TinyNode) {
if n.Status.Metadata == nil {
n.Status.Metadata = make(map[string]string)
}
n.Status.Metadata["my-key"] = "my-value"
})
}
// ...
}Control Port Handler
Control responses don't use the standard handler:
func (c *Component) HandleControl(ctx context.Context, state any, port string, msg any) (any, error) {
// Return new state, not via output handler
return &ControlState{
Status: "running",
}, nil
}Error Handling
Propagate Errors
err := output(ctx, "output", result)
if err != nil {
return fmt.Errorf("failed to deliver output: %w", err)
}Handle Partial Failures
var errors []error
for _, item := range items {
if err := output(ctx, "output", item); err != nil {
errors = append(errors, err)
}
}
if len(errors) > 0 {
return fmt.Errorf("failed to deliver %d items", len(errors))
}
return nilTimeout Handling
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
err := output(ctx, "output", result)
if errors.Is(err, context.DeadlineExceeded) {
return errors.PermanentError(fmt.Errorf("delivery timeout"))
}Best Practices
1. Always Check Errors
// Good
if err := output(ctx, "output", result); err != nil {
return err
}
// Risky
output(ctx, "output", result)
return nil2. Pass Context Through
// Good
output(ctx, "output", result)
// Bad - loses tracing/cancellation
output(context.Background(), "output", result)3. Match Schema Types
// Good - matches port schema
output(ctx, "output", Output{
Result: value,
})
// Risky - may not match schema
output(ctx, "output", map[string]any{
"result": value,
})4. Use Descriptive Port Names
// Good
output(ctx, "out_success", result)
output(ctx, "out_validation_error", err)
// Less clear
output(ctx, "out1", result)
output(ctx, "out2", err)5. Document Output Behavior
// Emits to "output" port with processed result
// Blocks until downstream processing completes
// Returns error if delivery fails
return output(ctx, "output", result)