Skip to content

Handling Messages

The Handle() method is where your component processes incoming messages. Understanding message handling patterns is essential for building effective components.

Basic Message Handling

go
func (c *MyComponent) Handle(
    ctx context.Context,
    output module.Handler,
    port string,
    msg any,
) error {
    switch port {
    case "input":
        input := msg.(InputMessage)
        result := c.process(input)
        return output(ctx, "output", result)
    }
    return nil
}

The Handler Signature

go
type Handler func(ctx context.Context, port string, msg any) error
ParameterDescription
ctxContext for cancellation and values
portOutput port name to send to
msgData to send
ReturnsError if sending failed

Port Routing

Route messages based on the incoming port:

go
func (c *Processor) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    switch port {
    case "input":
        return c.handleInput(ctx, output, msg)

    case v1alpha1.SettingsPort:  // "_settings"
        return c.handleSettings(msg)

    case v1alpha1.ControlPort:   // "_control"
        return c.handleControl(ctx, output, msg)

    case v1alpha1.ReconcilePort: // "_reconcile"
        return c.handleReconcile(ctx, output, msg)

    default:
        // Unknown port - ignore
        return nil
    }
}

Type Assertions

Messages arrive as any and must be type-asserted:

go
func (c *Component) handleInput(ctx context.Context, output module.Handler, msg any) error {
    // Simple assertion (panics if wrong type)
    input := msg.(InputMessage)

    // Safe assertion
    input, ok := msg.(InputMessage)
    if !ok {
        return fmt.Errorf("unexpected message type: %T", msg)
    }

    return c.process(ctx, output, input)
}

Sending Output

Single Output

go
func (c *Transformer) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    input := msg.(Input)
    result := transform(input)
    return output(ctx, "output", result)
}

Multiple Outputs

go
func (c *Splitter) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    input := msg.(Input)

    // Send to multiple ports
    if err := output(ctx, "output_a", input.PartA); err != nil {
        return err
    }
    if err := output(ctx, "output_b", input.PartB); err != nil {
        return err
    }

    return nil
}

Conditional Output

go
func (c *Router) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    input := msg.(Message)

    switch input.Type {
    case "urgent":
        return output(ctx, "priority", input)
    case "normal":
        return output(ctx, "standard", input)
    default:
        return output(ctx, "other", input)
    }
}

No Output

Some components don't produce output:

go
func (c *Logger) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    input := msg.(LogMessage)
    log.Info(input.Message, "level", input.Level)
    return nil  // No output
}

Blocking vs Non-Blocking

Default: Blocking

By default, output() blocks until the downstream flow completes:

go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    // This blocks until the entire downstream flow finishes
    err := output(ctx, "output", msg)

    // Only continues after downstream completes
    if err != nil {
        log.Error("downstream failed", "error", err)
    }

    return err
}

Understanding the Block

Time ──────────────────────────────────────────────────────────────▶

Component A          Component B          Component C
    │                    │                    │
    │ output(B)          │                    │
    │ ═══════════════════╪══════════════════╗ │
    │                    │ Handle()         ║ │
    │                    │ ═════════════════╪═╪════════╗
    │                    │                  │ │        ║
    │                    │                  │ │ Handle()
    │                    │                  │ │ return
    │                    │ return           │ │════════╝
    │ return             │═════════════════╝│ │
    │ ═══════════════════╝                  │ │
    │                    │                    │
    ▼                    ▼                    ▼

Breaking the Chain

For fire-and-forget patterns, use goroutines carefully:

go
func (c *AsyncEmitter) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    // CAUTION: This breaks the blocking chain
    go func() {
        // Runs independently
        if err := output(context.Background(), "async_output", msg); err != nil {
            log.Error("async send failed", "error", err)
        }
    }()

    // Returns immediately
    return nil
}

Error Handling

Return Errors

go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    input := msg.(Input)

    result, err := c.process(input)
    if err != nil {
        // Error propagates back through the flow
        return fmt.Errorf("processing failed: %w", err)
    }

    return output(ctx, "output", result)
}

Error Port Pattern

go
type ErrorOutput struct {
    Error   string `json:"error"`
    Message any    `json:"message"`
}

func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    input := msg.(Input)

    result, err := c.process(input)
    if err != nil {
        // Route to error port instead of failing
        return output(ctx, "error", ErrorOutput{
            Error:   err.Error(),
            Message: input,
        })
    }

    return output(ctx, "success", result)
}

Retry Pattern

go
func (c *RetryableProcessor) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    input := msg.(Input)

    var lastErr error
    for attempt := 0; attempt < 3; attempt++ {
        result, err := c.process(input)
        if err == nil {
            return output(ctx, "output", result)
        }

        lastErr = err
        time.Sleep(time.Duration(attempt+1) * time.Second)
    }

    return fmt.Errorf("after 3 attempts: %w", lastErr)
}

Context Usage

Cancellation

go
func (c *LongProcessor) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    items := msg.([]Item)

    for _, item := range items {
        select {
        case <-ctx.Done():
            return ctx.Err()  // Graceful shutdown
        default:
            c.processItem(item)
        }
    }

    return nil
}

Leader Check

go
func (c *LeaderOnly) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    if port == v1alpha1.ControlPort {
        // Only leader processes control messages
        if !utils.IsLeader(ctx) {
            return nil
        }
        return c.handleControl(ctx, output, msg)
    }
    return nil
}

Timeout

go
func (c *TimedProcessor) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    // Add timeout to context
    ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
    defer cancel()

    result, err := c.processWithContext(ctx, msg)
    if err != nil {
        if ctx.Err() == context.DeadlineExceeded {
            return fmt.Errorf("processing timed out")
        }
        return err
    }

    return output(ctx, "output", result)
}

State Management

Storing Settings

go
type Processor struct {
    settings Settings
}

func (p *Processor) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    switch port {
    case v1alpha1.SettingsPort:
        p.settings = msg.(Settings)
        return nil

    case "input":
        // Use stored settings
        return p.processWithSettings(ctx, output, msg, p.settings)
    }
    return nil
}

Accumulating State

go
type Aggregator struct {
    items []Item
    mu    sync.Mutex
}

func (a *Aggregator) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    switch port {
    case "item":
        a.mu.Lock()
        a.items = append(a.items, msg.(Item))
        a.mu.Unlock()
        return nil

    case "flush":
        a.mu.Lock()
        items := a.items
        a.items = nil
        a.mu.Unlock()

        return output(ctx, "batch", Batch{Items: items})
    }
    return nil
}

Common Patterns

Pass-Through with Modification

go
func (c *Enricher) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    input := msg.(Message)
    input.Timestamp = time.Now()
    input.ProcessedBy = c.name
    return output(ctx, "output", input)
}

Fan-Out

go
func (c *Broadcaster) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    // Send same message to multiple outputs
    for _, outPort := range []string{"out1", "out2", "out3"} {
        if err := output(ctx, outPort, msg); err != nil {
            return err
        }
    }
    return nil
}

Fan-In (via multiple input ports)

go
func (c *Merger) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    // Multiple input ports feed into one output
    switch port {
    case "input_a", "input_b", "input_c":
        return output(ctx, "merged", msg)
    }
    return nil
}

Next Steps

Build flow-based applications on Kubernetes