Message Flow
Understanding how messages flow through the system is essential for building reliable components. This page explains the complete execution path from trigger to completion.
Execution Overview
+-----------------------------------------------------------------------------+
| MESSAGE EXECUTION FLOW |
+-----------------------------------------------------------------------------+
TRIGGER KUBERNETES COMPONENT
------- ---------- ---------
+---------------+ +-----------------+ +-----------------+
| HTTP Request | | | | |
| Timer/Cron |-------->| TinySignal CR |-------->| Signal |
| Manual Button | | | | Controller |
| Edge Output | +-----------------+ | (Leader only) |
+---------------+ +--------+--------+
|
v
+-----------------+
| |
| Scheduler |
| .Handle() |
| |
+--------+--------+
|
v
+-----------------+
| |
| Runner |
| .MsgHandler() |
| |
+--------+--------+
|
v
+-----------------+
| |
| Component |
| .Handle() |
| |
+--------+--------+
|
v
+-----------------+
| output(ctx, |
| port, data) |
+--------+--------+
|
+------------------------------------+------------------------------------+
| | |
v v v
+-----------------+ +-----------------+ +-----------------+
| No edge | | Same module | | Different |
| (terminal) | | (Go channel) | | module (gRPC) |
+-----------------+ +--------+--------+ +--------+--------+
| |
+---------------+-------------------+
|
v
+-----------------+
| Edge Expression |
| Evaluation |
+--------+--------+
|
v
+-----------------+
| Next Component |
| .Handle() |
+-----------------+Detailed Execution Steps
Step 1: Trigger Creation
Messages can be triggered by:
- HTTP Request: HTTP Server component receives a request
- Timer: Ticker component fires at intervals
- Manual: User clicks a button in the UI
- Edge: Previous node sends output
Step 2: TinySignal Creation
For external triggers, a TinySignal CR is created:
yaml
apiVersion: operator.tinysystems.io/v1alpha1
kind: TinySignal
metadata:
name: signal-abc123
namespace: default
spec:
node: my-node-name
port: input
data:
message: "Hello World"Step 3: Signal Controller Processing
The Signal Controller (running on the leader pod only) processes the signal:
go
// tinysignal_controller.go
func (r *TinySignalReconciler) Reconcile(ctx context.Context, req ctrl.Request) {
// Only leader processes signals
if !r.IsLeader.Load() {
return ctrl.Result{}, nil
}
signal := &v1alpha1.TinySignal{}
r.Get(ctx, req.NamespacedName, signal)
// Send to scheduler
r.Scheduler.Handle(ctx, runner.Msg{
To: signal.Spec.Node + "." + signal.Spec.Port,
Data: signal.Spec.Data,
})
// Delete processed signal
r.Delete(ctx, signal)
}Step 4: Scheduler Routing
The Scheduler finds the appropriate runner:
go
// scheduler.go
func (s *Scheduler) Handle(ctx context.Context, msg runner.Msg) error {
// Parse destination: "node-name.port-name"
parts := strings.SplitN(msg.To, ".", 2)
nodeName, portName := parts[0], parts[1]
// Find runner instance
r, exists := s.instancesMap.Get(nodeName)
if !exists {
// Check if node belongs to different module
return s.routeToRemoteModule(ctx, msg)
}
// Send to local runner
return r.MsgHandler(ctx, runner.Msg{
To: portName,
From: msg.From,
Data: msg.Data,
})
}Step 5: Runner Processing
The Runner wraps the component and handles:
- Message caching
- Schema generation
- OpenTelemetry tracing
- Error handling
go
// runner.go
func (r *Runner) MsgHandler(ctx context.Context, msg runner.Msg) error {
// Create trace span
ctx, span := r.tracer.Start(ctx, "handle-message")
defer span.End()
// Evaluate expressions and deserialize message
data := r.evaluateAndDeserialize(msg.Data, msg.To)
// Call component's Handle method
result := r.component.Handle(ctx, r.outputHandler, msg.To, data)
// Handle result (error or success)
return r.processResult(result)
}Step 6: Component Handle
Your component receives the message:
go
func (c *MyComponent) Handle(
ctx context.Context,
output module.Handler,
port string,
msg any,
) any {
input := msg.(InputType)
// Process...
result := process(input)
// Send to output port
return output(ctx, "output", result)
}Step 7: Output Handler
The output handler routes the result:
go
// runner.go - output handler
func (r *Runner) outputHandler(ctx context.Context, port string, data any) any {
// Find edges from this port
edges := r.getEdgesFromPort(port)
for _, edge := range edges {
// Evaluate edge expressions
transformedData := evaluator.Evaluate(edge.Configuration, data)
// Route to next node
if r.isSameModule(edge.To) {
// Use Go channel (fast, no serialization)
r.scheduler.Handle(ctx, runner.Msg{
To: edge.To + "." + edge.ToPort,
Data: transformedData,
})
} else {
// Use gRPC (cross-module)
r.grpcClient.Send(ctx, edge.To, edge.ToPort, transformedData)
}
}
return nil
}Step 8: Edge Expression Evaluation
Data is transformed using expressions:
go
// Edge configuration
{
"userId": "{{$.user.id}}",
"fullName": "{{'Hello ' + $.user.firstName + ' ' + $.user.lastName}}",
"count": "{{$.items.length}}"
}
// Source data
{
"user": {"id": "123", "firstName": "John", "lastName": "Doe"},
"items": [1, 2, 3]
}
// Result after evaluation
{
"userId": "123",
"fullName": "Hello John Doe",
"count": 3
}Step 9: Next Node
The process repeats for the next node in the flow.
Same Module vs Cross-Module
Same Module Communication
+-------------------------------------------------------------+
| SAME MODULE |
| |
| Component A Go Channel Component B |
| +-----------+ -----------------> +-----------+ |
| | output() | | Handle() | |
| +-----------+ No serialization +-----------+ |
| Fast, direct |
+-------------------------------------------------------------+- Fast: Direct Go function call
- No serialization: Data passed as-is
- Same process: Shared memory
Cross-Module Communication
+----------------------+ gRPC +----------------------+
| MODULE A | <---------------------> | MODULE B |
| | | |
| Component A | 1. Serialize (JSON) | Component C |
| +-----------+ | 2. gRPC Send | +-----------+ |
| | output() |-------|--------------------------|--| Handle() | |
| +-----------+ | 3. Deserialize | +-----------+ |
| | | |
+----------------------+ +----------------------+- Network call: gRPC over TCP
- Serialization: JSON marshaling/unmarshaling
- Discovery: Via TinyModule CRD
Blocking Behavior
Critical: The default execution is blocking.
When you call output(ctx, "output", data):
- The message is sent to the next node
- The next node's
Handle()is called - All downstream processing completes
- Only then does
output()return
go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
fmt.Println("Before output")
// This BLOCKS until downstream completes
err := output(ctx, "output", msg)
fmt.Println("After output") // Runs after ALL downstream is done
return err
}See Blocking vs Async for details.
Error Propagation
Errors propagate back through the call chain:
Node A Node B Node C
| | |
| output() ---------------|-------------------------|--> returns error
| | |
| <-----------------------|<------------------------|--- error propagates
| | |
| Handle() returns error | |
| | |go
func (c *ComponentB) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
result := process(msg)
if result.Error != nil {
return result.Error // Error goes back to Node A
}
return output(ctx, "output", result)
}Next Steps
- Blocking vs Async - Understand execution modes
- Error Handling - Handle errors properly
- Internal Routing - Deep dive into routing