Skip to content

Handler Interface Reference

Complete reference for the message handler function used in TinySystems components.

Handler Definition

go
type Handler func(ctx context.Context, port string, message any) error

The Handler is a callback function provided to the component's Handle method for emitting output messages.

Parameters

ctx (context.Context)

The context carries request-scoped values, cancellation signals, and tracing information.

Must always pass through the context:

go
// Correct
output(ctx, "output", result)

// Incorrect - loses tracing
output(context.Background(), "output", result)

Context Values:

FunctionDescription
utils.IsLeader(ctx)Check if current instance is leader
trace.SpanFromContext(ctx)Get current tracing span
ctx.Done()Channel closed on cancellation
ctx.Err()Error if context cancelled/expired

port (string)

The name of the output port to emit the message on.

Requirements:

  • Must match a port defined in Ports() with Source: false
  • Case-sensitive

Examples:

go
output(ctx, "output", data)
output(ctx, "out_success", result)
output(ctx, "error", errInfo)

message (any)

The data to emit. Should match the port's schema.

Type Requirements:

  • Must be JSON-serializable
  • Should match the Configuration struct for the port
  • Can be a struct, map, slice, or primitive

Examples:

go
// Struct
output(ctx, "output", Output{
    Result: "processed",
    Count:  42,
})

// Map
output(ctx, "output", map[string]any{
    "result": "processed",
    "count":  42,
})

// Primitive (if schema allows)
output(ctx, "count", 42)

Return Value

error

ReturnMeaning
nilMessage delivered successfully
errorDelivery failed

Error Handling:

go
err := output(ctx, "output", result)
if err != nil {
    // Handle delivery failure
    return err
}

Blocking Behavior

Default: Blocking

By default, the handler blocks until all downstream processing completes:

go
func (c *Component) Handle(ctx context.Context, output Handler, port string, msg any) error {
    result := process(msg)

    // This blocks until downstream nodes finish
    err := output(ctx, "output", result)

    // Execution continues only after downstream completes
    return err
}

Flow:

output() called

    ├─▶ Message delivered to next node
    │       │
    │       ├─▶ Next node processes
    │       │       │
    │       │       └─▶ (continues downstream)
    │       │
    │       └─▶ Returns when complete

    └─▶ output() returns

Benefits of Blocking

  1. Backpressure: Automatically limits processing rate
  2. Resource Management: Prevents unbounded queue growth
  3. Error Propagation: Errors return to source
  4. Ordering: Maintains message order

Non-Blocking (Async) Usage

For fire-and-forget operations, use a goroutine:

go
func (c *Component) Handle(ctx context.Context, output Handler, port string, msg any) error {
    // Capture span context for tracing
    spanCtx := trace.SpanContextFromContext(ctx)

    go func() {
        // Create new context with span context
        newCtx := trace.ContextWithSpanContext(context.Background(), spanCtx)

        result := process(msg)
        output(newCtx, "output", result)
    }()

    // Returns immediately
    return nil
}

Important: When using async:

  • Preserve span context for tracing
  • Handle errors within the goroutine
  • Be aware of goroutine leaks
  • Consider using the Async component instead

Multiple Outputs

Sequential Outputs

go
func (c *Component) Handle(ctx context.Context, output Handler, port string, msg any) error {
    // First output
    if err := output(ctx, "processed", result1); err != nil {
        return err
    }

    // Second output (after first completes)
    if err := output(ctx, "logged", result2); err != nil {
        return err
    }

    return nil
}

Parallel Outputs

go
func (c *Component) Handle(ctx context.Context, output Handler, port string, msg any) error {
    var wg sync.WaitGroup
    var mu sync.Mutex
    var firstErr error

    outputs := []struct {
        port string
        data any
    }{
        {"notify_email", emailData},
        {"notify_slack", slackData},
    }

    for _, out := range outputs {
        wg.Add(1)
        go func(port string, data any) {
            defer wg.Done()
            if err := output(ctx, port, data); err != nil {
                mu.Lock()
                if firstErr == nil {
                    firstErr = err
                }
                mu.Unlock()
            }
        }(out.port, out.data)
    }

    wg.Wait()
    return firstErr
}

Conditional Output

Route Based on Condition

go
func (c *Component) Handle(ctx context.Context, output Handler, port string, msg any) error {
    input := msg.(Input)

    if input.Valid {
        return output(ctx, "out_success", input)
    }
    return output(ctx, "out_error", ErrorInfo{
        Message: "Invalid input",
    })
}

Optional Output

go
func (c *Component) Handle(ctx context.Context, output Handler, port string, msg any) error {
    input := msg.(Input)

    // Always process
    result := process(input)

    // Conditionally notify
    if c.settings.EnableNotifications {
        if err := output(ctx, "notification", result); err != nil {
            // Log but don't fail
            log.Warn("notification failed", "error", err)
        }
    }

    return output(ctx, "output", result)
}

Special Handler Patterns

Reconcile Port Handler

For _reconcile port, the handler updates the TinyNode:

go
func (c *Component) Handle(ctx context.Context, output Handler, port string, msg any) error {
    if port == v1alpha1.ReconcilePort {
        node := msg.(*v1alpha1.TinyNode)

        // Update node metadata
        return output(ctx, v1alpha1.ReconcilePort, func(n *v1alpha1.TinyNode) {
            if n.Status.Metadata == nil {
                n.Status.Metadata = make(map[string]string)
            }
            n.Status.Metadata["my-key"] = "my-value"
        })
    }
    // ...
}

Control Port Handler

Control responses don't use the standard handler:

go
func (c *Component) HandleControl(ctx context.Context, state any, port string, msg any) (any, error) {
    // Return new state, not via output handler
    return &ControlState{
        Status: "running",
    }, nil
}

Error Handling

Propagate Errors

go
err := output(ctx, "output", result)
if err != nil {
    return fmt.Errorf("failed to deliver output: %w", err)
}

Handle Partial Failures

go
var errors []error

for _, item := range items {
    if err := output(ctx, "output", item); err != nil {
        errors = append(errors, err)
    }
}

if len(errors) > 0 {
    return fmt.Errorf("failed to deliver %d items", len(errors))
}
return nil

Timeout Handling

go
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

err := output(ctx, "output", result)
if errors.Is(err, context.DeadlineExceeded) {
    return errors.PermanentError(fmt.Errorf("delivery timeout"))
}

Best Practices

1. Always Check Errors

go
// Good
if err := output(ctx, "output", result); err != nil {
    return err
}

// Risky
output(ctx, "output", result)
return nil

2. Pass Context Through

go
// Good
output(ctx, "output", result)

// Bad - loses tracing/cancellation
output(context.Background(), "output", result)

3. Match Schema Types

go
// Good - matches port schema
output(ctx, "output", Output{
    Result: value,
})

// Risky - may not match schema
output(ctx, "output", map[string]any{
    "result": value,
})

4. Use Descriptive Port Names

go
// Good
output(ctx, "out_success", result)
output(ctx, "out_validation_error", err)

// Less clear
output(ctx, "out1", result)
output(ctx, "out2", err)

5. Document Output Behavior

go
// Emits to "output" port with processed result
// Blocks until downstream processing completes
// Returns error if delivery fails
return output(ctx, "output", result)

Build flow-based applications on Kubernetes