Message Flow
Understanding how messages flow through the system is essential for building reliable components. This page explains the complete execution path from trigger to completion.
Execution Overview
┌─────────────────────────────────────────────────────────────────────────────┐
│ MESSAGE EXECUTION FLOW │
└─────────────────────────────────────────────────────────────────────────────┘
TRIGGER KUBERNETES COMPONENT
─────── ────────── ─────────
┌───────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ HTTP Request │ │ │ │ │
│ Timer/Cron │────────▶│ TinySignal CR │────────▶│ Signal │
│ Manual Button │ │ │ │ Controller │
│ Edge Output │ └─────────────────┘ │ (Leader only) │
└───────────────┘ └────────┬────────┘
│
▼
┌─────────────────┐
│ │
│ Scheduler │
│ .Handle() │
│ │
└────────┬────────┘
│
▼
┌─────────────────┐
│ │
│ Runner │
│ .MsgHandler() │
│ │
└────────┬────────┘
│
▼
┌─────────────────┐
│ │
│ Component │
│ .Handle() │
│ │
└────────┬────────┘
│
▼
┌─────────────────┐
│ output(ctx, │
│ port, data) │
└────────┬────────┘
│
┌────────────────────────────────────┼────────────────────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ No edge │ │ Same module │ │ Different │
│ (terminal) │ │ (Go channel) │ │ module (gRPC) │
└─────────────────┘ └────────┬────────┘ └────────┬────────┘
│ │
└───────────────┬───────────────────┘
│
▼
┌─────────────────┐
│ Edge Expression │
│ Evaluation │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Next Component │
│ .Handle() │
└─────────────────┘Detailed Execution Steps
Step 1: Trigger Creation
Messages can be triggered by:
- HTTP Request: HTTP Server component receives a request
- Timer: Ticker component fires at intervals
- Manual: User clicks a button in the UI
- Edge: Previous node sends output
Step 2: TinySignal Creation
For external triggers, a TinySignal CR is created:
yaml
apiVersion: operator.tinysystems.io/v1alpha1
kind: TinySignal
metadata:
name: signal-abc123
namespace: default
annotations:
tinysystems.io/signal-nonce: "unique-id-123"
spec:
node: my-node-name
port: input
data:
message: "Hello World"Step 3: Signal Controller Processing
The Signal Controller (running on the leader pod only) processes the signal:
go
// tinysignal_controller.go
func (r *TinySignalReconciler) Reconcile(ctx context.Context, req ctrl.Request) {
// Only leader processes signals
if !r.IsLeader.Load() {
return ctrl.Result{}, nil
}
signal := &v1alpha1.TinySignal{}
r.Get(ctx, req.NamespacedName, signal)
// Send to scheduler
r.Scheduler.Handle(ctx, runner.Msg{
To: signal.Spec.Node + "." + signal.Spec.Port,
Data: signal.Spec.Data,
})
// Delete processed signal
r.Delete(ctx, signal)
}Step 4: Scheduler Routing
The Scheduler finds the appropriate runner:
go
// scheduler.go
func (s *Scheduler) Handle(ctx context.Context, msg runner.Msg) error {
// Parse destination: "node-name.port-name"
parts := strings.SplitN(msg.To, ".", 2)
nodeName, portName := parts[0], parts[1]
// Find runner instance
r, exists := s.instancesMap.Get(nodeName)
if !exists {
// Check if node belongs to different module
return s.routeToRemoteModule(ctx, msg)
}
// Send to local runner
return r.MsgHandler(ctx, runner.Msg{
To: portName,
From: msg.From,
Data: msg.Data,
})
}Step 5: Runner Processing
The Runner wraps the component and handles:
- Message caching
- Schema generation
- OpenTelemetry tracing
- Error handling
go
// runner.go
func (r *Runner) MsgHandler(ctx context.Context, msg runner.Msg) error {
// Create trace span
ctx, span := r.tracer.Start(ctx, "handle-message")
defer span.End()
// Evaluate expressions and deserialize message
data := r.evaluateAndDeserialize(msg.Data, msg.To)
// Call component's Handle method
result := r.component.Handle(ctx, r.outputHandler, msg.To, data)
// Handle result (error or success)
return r.processResult(result)
}Step 6: Component Handle
Your component receives the message:
go
func (c *MyComponent) Handle(
ctx context.Context,
output module.Handler,
port string,
msg any,
) any {
input := msg.(InputType)
// Process...
result := process(input)
// Send to output port
return output(ctx, "output", result)
}Step 7: Output Handler
The output handler routes the result:
go
// runner.go - output handler
func (r *Runner) outputHandler(ctx context.Context, port string, data any) any {
// Find edges from this port
edges := r.getEdgesFromPort(port)
for _, edge := range edges {
// Evaluate edge expressions
transformedData := evaluator.Evaluate(edge.Configuration, data)
// Route to next node
if r.isSameModule(edge.To) {
// Use Go channel (fast, no serialization)
r.scheduler.Handle(ctx, runner.Msg{
To: edge.To + "." + edge.ToPort,
Data: transformedData,
})
} else {
// Use gRPC (cross-module)
r.grpcClient.Send(ctx, edge.To, edge.ToPort, transformedData)
}
}
return nil
}Step 8: Edge Expression Evaluation
Data is transformed using expressions:
go
// Edge configuration
{
"userId": "{{$.user.id}}",
"fullName": "{{'Hello ' + $.user.firstName + ' ' + $.user.lastName}}",
"count": "{{$.items.length}}"
}
// Source data
{
"user": {"id": "123", "firstName": "John", "lastName": "Doe"},
"items": [1, 2, 3]
}
// Result after evaluation
{
"userId": "123",
"fullName": "Hello John Doe",
"count": 3
}Step 9: Next Node
The process repeats for the next node in the flow.
Same Module vs Cross-Module
Same Module Communication
┌─────────────────────────────────────────────────────────────┐
│ SAME MODULE │
│ │
│ Component A Go Channel Component B │
│ ┌───────────┐ ─────────────────▶ ┌───────────┐ │
│ │ output() │ │ Handle() │ │
│ └───────────┘ No serialization └───────────┘ │
│ Fast, direct │
└─────────────────────────────────────────────────────────────┘- Fast: Direct Go function call
- No serialization: Data passed as-is
- Same process: Shared memory
Cross-Module Communication
┌──────────────────────┐ gRPC ┌──────────────────────┐
│ MODULE A │ ◀─────────────────────▶ │ MODULE B │
│ │ │ │
│ Component A │ 1. Serialize (JSON) │ Component C │
│ ┌───────────┐ │ 2. gRPC Send │ ┌───────────┐ │
│ │ output() │───────│──────────────────────────│──│ Handle() │ │
│ └───────────┘ │ 3. Deserialize │ └───────────┘ │
│ │ │ │
└──────────────────────┘ └──────────────────────┘- Network call: gRPC over TCP
- Serialization: JSON marshaling/unmarshaling
- Discovery: Via TinyModule CRD
Blocking Behavior
Critical: The default execution is blocking.
When you call output(ctx, "output", data):
- The message is sent to the next node
- The next node's
Handle()is called - All downstream processing completes
- Only then does
output()return
go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
fmt.Println("Before output")
// This BLOCKS until downstream completes
err := output(ctx, "output", msg)
fmt.Println("After output") // Runs after ALL downstream is done
return err
}See Blocking vs Async for details.
Error Propagation
Errors propagate back through the call chain:
Node A Node B Node C
│ │ │
│ output() ───────────────│─────────────────────────│──▶ returns error
│ │ │
│ ◀───────────────────────│◀────────────────────────│─── error propagates
│ │ │
│ Handle() returns error │ │
│ │ │go
func (c *ComponentB) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
result := process(msg)
if result.Error != nil {
return result.Error // Error goes back to Node A
}
return output(ctx, "output", result)
}Next Steps
- Blocking vs Async - Understand execution modes
- Error Handling - Handle errors properly
- Internal Routing - Deep dive into routing