Handling Messages
The Handle() method is where your component processes incoming messages. Understanding message handling patterns is essential for building effective components.
Basic Message Handling
go
func (c *MyComponent) Handle(
ctx context.Context,
output module.Handler,
port string,
msg any,
) error {
switch port {
case "input":
input := msg.(InputMessage)
result := c.process(input)
return output(ctx, "output", result)
}
return nil
}The Handler Signature
go
type Handler func(ctx context.Context, port string, msg any) error| Parameter | Description |
|---|---|
ctx | Context for cancellation and values |
port | Output port name to send to |
msg | Data to send |
| Returns | Error if sending failed |
Port Routing
Route messages based on the incoming port:
go
func (c *Processor) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
switch port {
case "input":
return c.handleInput(ctx, output, msg)
case v1alpha1.SettingsPort: // "_settings"
return c.handleSettings(msg)
case v1alpha1.ControlPort: // "_control"
return c.handleControl(ctx, output, msg)
case v1alpha1.ReconcilePort: // "_reconcile"
return c.handleReconcile(ctx, output, msg)
default:
// Unknown port - ignore
return nil
}
}Type Assertions
Messages arrive as any and must be type-asserted:
go
func (c *Component) handleInput(ctx context.Context, output module.Handler, msg any) error {
// Simple assertion (panics if wrong type)
input := msg.(InputMessage)
// Safe assertion
input, ok := msg.(InputMessage)
if !ok {
return fmt.Errorf("unexpected message type: %T", msg)
}
return c.process(ctx, output, input)
}Sending Output
Single Output
go
func (c *Transformer) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
input := msg.(Input)
result := transform(input)
return output(ctx, "output", result)
}Multiple Outputs
go
func (c *Splitter) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
input := msg.(Input)
// Send to multiple ports
if err := output(ctx, "output_a", input.PartA); err != nil {
return err
}
if err := output(ctx, "output_b", input.PartB); err != nil {
return err
}
return nil
}Conditional Output
go
func (c *Router) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
input := msg.(Message)
switch input.Type {
case "urgent":
return output(ctx, "priority", input)
case "normal":
return output(ctx, "standard", input)
default:
return output(ctx, "other", input)
}
}No Output
Some components don't produce output:
go
func (c *Logger) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
input := msg.(LogMessage)
log.Info(input.Message, "level", input.Level)
return nil // No output
}Blocking vs Non-Blocking
Default: Blocking
By default, output() blocks until the downstream flow completes:
go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
// This blocks until the entire downstream flow finishes
err := output(ctx, "output", msg)
// Only continues after downstream completes
if err != nil {
log.Error("downstream failed", "error", err)
}
return err
}Understanding the Block
Time ──────────────────────────────────────────────────────────────▶
Component A Component B Component C
│ │ │
│ output(B) │ │
│ ═══════════════════╪══════════════════╗ │
│ │ Handle() ║ │
│ │ ═════════════════╪═╪════════╗
│ │ │ │ ║
│ │ │ │ Handle()
│ │ │ │ return
│ │ return │ │════════╝
│ return │═════════════════╝│ │
│ ═══════════════════╝ │ │
│ │ │
▼ ▼ ▼Breaking the Chain
For fire-and-forget patterns, use goroutines carefully:
go
func (c *AsyncEmitter) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
// CAUTION: This breaks the blocking chain
go func() {
// Runs independently
if err := output(context.Background(), "async_output", msg); err != nil {
log.Error("async send failed", "error", err)
}
}()
// Returns immediately
return nil
}Error Handling
Return Errors
go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
input := msg.(Input)
result, err := c.process(input)
if err != nil {
// Error propagates back through the flow
return fmt.Errorf("processing failed: %w", err)
}
return output(ctx, "output", result)
}Error Port Pattern
go
type ErrorOutput struct {
Error string `json:"error"`
Message any `json:"message"`
}
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
input := msg.(Input)
result, err := c.process(input)
if err != nil {
// Route to error port instead of failing
return output(ctx, "error", ErrorOutput{
Error: err.Error(),
Message: input,
})
}
return output(ctx, "success", result)
}Retry Pattern
go
func (c *RetryableProcessor) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
input := msg.(Input)
var lastErr error
for attempt := 0; attempt < 3; attempt++ {
result, err := c.process(input)
if err == nil {
return output(ctx, "output", result)
}
lastErr = err
time.Sleep(time.Duration(attempt+1) * time.Second)
}
return fmt.Errorf("after 3 attempts: %w", lastErr)
}Context Usage
Cancellation
go
func (c *LongProcessor) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
items := msg.([]Item)
for _, item := range items {
select {
case <-ctx.Done():
return ctx.Err() // Graceful shutdown
default:
c.processItem(item)
}
}
return nil
}Leader Check
go
func (c *LeaderOnly) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
if port == v1alpha1.ControlPort {
// Only leader processes control messages
if !utils.IsLeader(ctx) {
return nil
}
return c.handleControl(ctx, output, msg)
}
return nil
}Timeout
go
func (c *TimedProcessor) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
// Add timeout to context
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
result, err := c.processWithContext(ctx, msg)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
return fmt.Errorf("processing timed out")
}
return err
}
return output(ctx, "output", result)
}State Management
Storing Settings
go
type Processor struct {
settings Settings
}
func (p *Processor) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
switch port {
case v1alpha1.SettingsPort:
p.settings = msg.(Settings)
return nil
case "input":
// Use stored settings
return p.processWithSettings(ctx, output, msg, p.settings)
}
return nil
}Accumulating State
go
type Aggregator struct {
items []Item
mu sync.Mutex
}
func (a *Aggregator) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
switch port {
case "item":
a.mu.Lock()
a.items = append(a.items, msg.(Item))
a.mu.Unlock()
return nil
case "flush":
a.mu.Lock()
items := a.items
a.items = nil
a.mu.Unlock()
return output(ctx, "batch", Batch{Items: items})
}
return nil
}Common Patterns
Pass-Through with Modification
go
func (c *Enricher) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
input := msg.(Message)
input.Timestamp = time.Now()
input.ProcessedBy = c.name
return output(ctx, "output", input)
}Fan-Out
go
func (c *Broadcaster) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
// Send same message to multiple outputs
for _, outPort := range []string{"out1", "out2", "out3"} {
if err := output(ctx, outPort, msg); err != nil {
return err
}
}
return nil
}Fan-In (via multiple input ports)
go
func (c *Merger) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
// Multiple input ports feed into one output
switch port {
case "input_a", "input_b", "input_c":
return output(ctx, "merged", msg)
}
return nil
}Next Steps
- Settings and Configuration - Configure components
- Control Ports - UI interaction
- Error Handling - Error patterns