Skip to content

Blocking vs Async Execution

Understanding the execution model is crucial for building reliable components. TinySystems uses a blocking-by-default model with explicit async patterns when needed.

Default Behavior: Blocking

When you call output(), execution blocks until all downstream processing completes:

go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    fmt.Println("1. Starting")

    // This blocks until downstream chain completes
    err := output(ctx, "output", msg)

    fmt.Println("2. Downstream complete")
    return err
}

Execution Timeline

Time ──────────────────────────────────────────────────────────────────▶

Node A          Node B          Node C          Node D
  │               │               │               │
  │ Handle()      │               │               │
  │   │           │               │               │
  │   ▼           │               │               │
  │ output()──────│──▶ Handle()   │               │
  │   │ BLOCKED   │     │         │               │
  │   │           │     ▼         │               │
  │   │           │   output()────│──▶ Handle()   │
  │   │           │     │ BLOCKED │     │         │
  │   │           │     │         │     ▼         │
  │   │           │     │         │   output()────│──▶ Handle()
  │   │           │     │         │     │ BLOCKED │     │
  │   │           │     │         │     │         │     ▼
  │   │           │     │         │     │         │   return nil
  │   │           │     │         │     ◀─────────│─────┘
  │   │           │     │         │   return nil  │
  │   │           │     ◀─────────│─────┘         │
  │   │           │   return nil  │               │
  │   ◀───────────│─────┘         │               │
  │ return nil    │               │               │
  ▼               │               │               │

Why Blocking by Default?

1. Backpressure

Slow consumers naturally slow down producers:

go
// Producer (fast)
func (c *Producer) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    for _, item := range items {
        // Blocks if downstream is slow
        output(ctx, "output", item)
    }
    return nil
}

// Consumer (slow)
func (c *Consumer) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    time.Sleep(100 * time.Millisecond)  // Slow processing
    return nil
}

Without blocking, the producer would overwhelm the system with messages.

2. Rate Limiting

The ticker component demonstrates this:

go
// Ticker emits every N milliseconds
func (t *Ticker) emit(ctx context.Context, handler module.Handler) error {
    timer := time.NewTimer(t.settings.Delay)

    for {
        select {
        case <-timer.C:
            // Blocks until downstream completes
            _ = handler(ctx, OutPort, t.settings.Context)

            // Only THEN reset timer
            timer.Reset(t.settings.Delay)
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

If processing takes longer than the interval, the next tick waits.

3. Error Propagation

Errors flow back to the source:

go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    err := output(ctx, "output", msg)
    if err != nil {
        // We know downstream failed
        log.Error("downstream failed", "error", err)
        return err
    }
    return nil
}

4. Transaction-like Semantics

Know when a unit of work is complete:

go
func (c *BatchProcessor) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    batch := msg.(Batch)

    for _, item := range batch.Items {
        err := output(ctx, "output", item)
        if err != nil {
            return err  // Stop on first error
        }
    }

    // All items processed successfully
    return output(ctx, "complete", BatchComplete{Count: len(batch.Items)})
}

Async Execution

For fire-and-forget scenarios, use goroutines explicitly:

go
func (c *AsyncComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    go func() {
        // Preserve trace context
        asyncCtx := trace.ContextWithSpanContext(
            context.Background(),
            trace.SpanContextFromContext(ctx),
        )

        // This runs independently
        _ = output(asyncCtx, "output", msg)
    }()

    return nil  // Returns immediately
}

When to Use Async

Use CaseBlockingAsync
Sequential processing
Error handling needed
Rate limiting
Fire-and-forget
Parallel fan-out
Long-running background tasks

Async Risks

Goroutine leaks: If downstream blocks forever, goroutines accumulate:

go
// DANGER: Potential goroutine leak
func (c *LeakyComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    go func() {
        // If downstream never responds, this goroutine lives forever
        output(context.Background(), "output", msg)
    }()
    return nil
}

Mitigation: Use context with timeout:

go
func (c *SafeAsyncComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    go func() {
        // Timeout after 30 seconds
        asyncCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
        defer cancel()

        // Preserve tracing
        asyncCtx = trace.ContextWithSpanContext(asyncCtx, trace.SpanContextFromContext(ctx))

        _ = output(asyncCtx, "output", msg)
    }()
    return nil
}

Parallel Processing

For parallel fan-out, combine async with error collection:

go
func (c *ParallelProcessor) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    items := msg.([]Item)

    var wg sync.WaitGroup
    errors := make(chan error, len(items))

    for _, item := range items {
        wg.Add(1)
        go func(item Item) {
            defer wg.Done()
            if err := output(ctx, "output", item); err != nil {
                errors <- err
            }
        }(item)
    }

    // Wait for all to complete
    wg.Wait()
    close(errors)

    // Check for errors
    for err := range errors {
        return err  // Return first error
    }

    return nil
}

The Split Component Pattern

The split component demonstrates blocking iteration:

go
func (c *Split) Handle(ctx context.Context, handler module.Handler, port string, msg any) any {
    input := msg.(InMessage)

    for _, item := range input.Array {
        // Each item blocks until processed
        if err := handler(ctx, OutPort, OutMessage{
            Context: input.Context,
            Item:    item,
        }); err != nil {
            return err  // Stop on error
        }
    }

    return nil
}

Behavior:

  • Items processed sequentially
  • Error in any item stops iteration
  • Downstream controls pace

Context Handling in Async

Always preserve trace context for observability:

go
import "go.opentelemetry.io/otel/trace"

func (c *AsyncComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    // Extract span context from incoming request
    spanCtx := trace.SpanContextFromContext(ctx)

    go func() {
        // Create new context with span context (but no deadline/cancellation)
        asyncCtx := trace.ContextWithSpanContext(context.Background(), spanCtx)

        // Now traces will be connected
        output(asyncCtx, "output", msg)
    }()

    return nil
}

Comparison Summary

AspectBlockingAsync
Return timingAfter downstream completesImmediately
Error handlingErrors propagate backErrors lost (unless handled)
BackpressureNaturalNone
Goroutine countBoundedCan grow unbounded
TracingAutomaticManual context propagation
Use caseMost componentsFire-and-forget, parallel

Best Practices

  1. Default to blocking - Use async only when necessary
  2. Handle async errors - Log or handle errors in goroutines
  3. Preserve trace context - Keep observability working
  4. Use timeouts - Prevent goroutine leaks
  5. Consider backpressure - Async can overwhelm downstream

Next Steps

Build flow-based applications on Kubernetes