Blocking vs Async Execution
Understanding the execution model is crucial for building reliable components. TinySystems uses a blocking-by-default model with explicit async patterns when needed.
Default Behavior: Blocking
When you call output(), execution blocks until all downstream processing completes:
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
fmt.Println("1. Starting")
// This blocks until downstream chain completes
err := output(ctx, "output", msg)
fmt.Println("2. Downstream complete")
return err
}Execution Timeline
Time ──────────────────────────────────────────────────────────────────▶
Node A Node B Node C Node D
│ │ │ │
│ Handle() │ │ │
│ │ │ │ │
│ ▼ │ │ │
│ output()──────│──▶ Handle() │ │
│ │ BLOCKED │ │ │ │
│ │ │ ▼ │ │
│ │ │ output()────│──▶ Handle() │
│ │ │ │ BLOCKED │ │ │
│ │ │ │ │ ▼ │
│ │ │ │ │ output()────│──▶ Handle()
│ │ │ │ │ │ BLOCKED │ │
│ │ │ │ │ │ │ ▼
│ │ │ │ │ │ │ return nil
│ │ │ │ │ ◀─────────│─────┘
│ │ │ │ │ return nil │
│ │ │ ◀─────────│─────┘ │
│ │ │ return nil │ │
│ ◀───────────│─────┘ │ │
│ return nil │ │ │
▼ │ │ │Why Blocking by Default?
1. Backpressure
Slow consumers naturally slow down producers:
// Producer (fast)
func (c *Producer) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
for _, item := range items {
// Blocks if downstream is slow
output(ctx, "output", item)
}
return nil
}
// Consumer (slow)
func (c *Consumer) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
time.Sleep(100 * time.Millisecond) // Slow processing
return nil
}Without blocking, the producer would overwhelm the system with messages.
2. Rate Limiting
The ticker component demonstrates this:
// Ticker emits every N milliseconds
func (t *Ticker) emit(ctx context.Context, handler module.Handler) error {
timer := time.NewTimer(t.settings.Delay)
for {
select {
case <-timer.C:
// Blocks until downstream completes
_ = handler(ctx, OutPort, t.settings.Context)
// Only THEN reset timer
timer.Reset(t.settings.Delay)
case <-ctx.Done():
return ctx.Err()
}
}
}If processing takes longer than the interval, the next tick waits.
3. Error Propagation
Errors flow back to the source:
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
err := output(ctx, "output", msg)
if err != nil {
// We know downstream failed
log.Error("downstream failed", "error", err)
return err
}
return nil
}4. Transaction-like Semantics
Know when a unit of work is complete:
func (c *BatchProcessor) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
batch := msg.(Batch)
for _, item := range batch.Items {
err := output(ctx, "output", item)
if err != nil {
return err // Stop on first error
}
}
// All items processed successfully
return output(ctx, "complete", BatchComplete{Count: len(batch.Items)})
}Async Execution
For fire-and-forget scenarios, use goroutines explicitly:
func (c *AsyncComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
go func() {
// Preserve trace context
asyncCtx := trace.ContextWithSpanContext(
context.Background(),
trace.SpanContextFromContext(ctx),
)
// This runs independently
_ = output(asyncCtx, "output", msg)
}()
return nil // Returns immediately
}When to Use Async
| Use Case | Blocking | Async |
|---|---|---|
| Sequential processing | ✓ | |
| Error handling needed | ✓ | |
| Rate limiting | ✓ | |
| Fire-and-forget | ✓ | |
| Parallel fan-out | ✓ | |
| Long-running background tasks | ✓ |
Async Risks
Goroutine leaks: If downstream blocks forever, goroutines accumulate:
// DANGER: Potential goroutine leak
func (c *LeakyComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
go func() {
// If downstream never responds, this goroutine lives forever
output(context.Background(), "output", msg)
}()
return nil
}Mitigation: Use context with timeout:
func (c *SafeAsyncComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
go func() {
// Timeout after 30 seconds
asyncCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Preserve tracing
asyncCtx = trace.ContextWithSpanContext(asyncCtx, trace.SpanContextFromContext(ctx))
_ = output(asyncCtx, "output", msg)
}()
return nil
}Parallel Processing
For parallel fan-out, combine async with error collection:
func (c *ParallelProcessor) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
items := msg.([]Item)
var wg sync.WaitGroup
errors := make(chan error, len(items))
for _, item := range items {
wg.Add(1)
go func(item Item) {
defer wg.Done()
if err := output(ctx, "output", item); err != nil {
errors <- err
}
}(item)
}
// Wait for all to complete
wg.Wait()
close(errors)
// Check for errors
for err := range errors {
return err // Return first error
}
return nil
}The Split Component Pattern
The split component demonstrates blocking iteration:
func (c *Split) Handle(ctx context.Context, handler module.Handler, port string, msg any) any {
input := msg.(InMessage)
for _, item := range input.Array {
// Each item blocks until processed
if err := handler(ctx, OutPort, OutMessage{
Context: input.Context,
Item: item,
}); err != nil {
return err // Stop on error
}
}
return nil
}Behavior:
- Items processed sequentially
- Error in any item stops iteration
- Downstream controls pace
Context Handling in Async
Always preserve trace context for observability:
import "go.opentelemetry.io/otel/trace"
func (c *AsyncComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
// Extract span context from incoming request
spanCtx := trace.SpanContextFromContext(ctx)
go func() {
// Create new context with span context (but no deadline/cancellation)
asyncCtx := trace.ContextWithSpanContext(context.Background(), spanCtx)
// Now traces will be connected
output(asyncCtx, "output", msg)
}()
return nil
}Comparison Summary
| Aspect | Blocking | Async |
|---|---|---|
| Return timing | After downstream completes | Immediately |
| Error handling | Errors propagate back | Errors lost (unless handled) |
| Backpressure | Natural | None |
| Goroutine count | Bounded | Can grow unbounded |
| Tracing | Automatic | Manual context propagation |
| Use case | Most components | Fire-and-forget, parallel |
Best Practices
- Default to blocking - Use async only when necessary
- Handle async errors - Log or handle errors in goroutines
- Preserve trace context - Keep observability working
- Use timeouts - Prevent goroutine leaks
- Consider backpressure - Async can overwhelm downstream
Next Steps
- Error Handling - Handle errors in both modes
- Component Patterns - See patterns in action
- Observability - Tracing async operations