Skip to content

Message Flow

Understanding how messages flow through the system is essential for building reliable components. This page explains the complete execution path from trigger to completion.

Execution Overview

┌─────────────────────────────────────────────────────────────────────────────┐
│                          MESSAGE EXECUTION FLOW                              │
└─────────────────────────────────────────────────────────────────────────────┘

    TRIGGER                    KUBERNETES                    COMPONENT
    ───────                    ──────────                    ─────────

┌───────────────┐         ┌─────────────────┐         ┌─────────────────┐
│ HTTP Request  │         │                 │         │                 │
│ Timer/Cron    │────────▶│  TinySignal CR  │────────▶│ Signal          │
│ Manual Button │         │                 │         │ Controller      │
│ Edge Output   │         └─────────────────┘         │ (Leader only)   │
└───────────────┘                                     └────────┬────────┘


                                                      ┌─────────────────┐
                                                      │                 │
                                                      │   Scheduler     │
                                                      │   .Handle()     │
                                                      │                 │
                                                      └────────┬────────┘


                                                      ┌─────────────────┐
                                                      │                 │
                                                      │   Runner        │
                                                      │   .MsgHandler() │
                                                      │                 │
                                                      └────────┬────────┘


                                                      ┌─────────────────┐
                                                      │                 │
                                                      │   Component     │
                                                      │   .Handle()     │
                                                      │                 │
                                                      └────────┬────────┘


                                                      ┌─────────────────┐
                                                      │  output(ctx,    │
                                                      │    port, data)  │
                                                      └────────┬────────┘

                          ┌────────────────────────────────────┼────────────────────────────────────┐
                          │                                    │                                    │
                          ▼                                    ▼                                    ▼
                 ┌─────────────────┐                  ┌─────────────────┐                  ┌─────────────────┐
                 │ No edge         │                  │ Same module     │                  │ Different       │
                 │ (terminal)      │                  │ (Go channel)    │                  │ module (gRPC)   │
                 └─────────────────┘                  └────────┬────────┘                  └────────┬────────┘
                                                               │                                    │
                                                               └───────────────┬───────────────────┘


                                                                      ┌─────────────────┐
                                                                      │ Edge Expression │
                                                                      │ Evaluation      │
                                                                      └────────┬────────┘


                                                                      ┌─────────────────┐
                                                                      │ Next Component  │
                                                                      │ .Handle()       │
                                                                      └─────────────────┘

Detailed Execution Steps

Step 1: Trigger Creation

Messages can be triggered by:

  • HTTP Request: HTTP Server component receives a request
  • Timer: Ticker component fires at intervals
  • Manual: User clicks a button in the UI
  • Edge: Previous node sends output

Step 2: TinySignal Creation

For external triggers, a TinySignal CR is created:

yaml
apiVersion: operator.tinysystems.io/v1alpha1
kind: TinySignal
metadata:
  name: signal-abc123
  namespace: default
  annotations:
    tinysystems.io/signal-nonce: "unique-id-123"
spec:
  node: my-node-name
  port: input
  data:
    message: "Hello World"

Step 3: Signal Controller Processing

The Signal Controller (running on the leader pod only) processes the signal:

go
// tinysignal_controller.go
func (r *TinySignalReconciler) Reconcile(ctx context.Context, req ctrl.Request) {
    // Only leader processes signals
    if !r.IsLeader.Load() {
        return ctrl.Result{}, nil
    }

    signal := &v1alpha1.TinySignal{}
    r.Get(ctx, req.NamespacedName, signal)

    // Send to scheduler
    r.Scheduler.Handle(ctx, runner.Msg{
        To:   signal.Spec.Node + "." + signal.Spec.Port,
        Data: signal.Spec.Data,
    })

    // Delete processed signal
    r.Delete(ctx, signal)
}

Step 4: Scheduler Routing

The Scheduler finds the appropriate runner:

go
// scheduler.go
func (s *Scheduler) Handle(ctx context.Context, msg runner.Msg) error {
    // Parse destination: "node-name.port-name"
    parts := strings.SplitN(msg.To, ".", 2)
    nodeName, portName := parts[0], parts[1]

    // Find runner instance
    r, exists := s.instancesMap.Get(nodeName)
    if !exists {
        // Check if node belongs to different module
        return s.routeToRemoteModule(ctx, msg)
    }

    // Send to local runner
    return r.MsgHandler(ctx, runner.Msg{
        To:   portName,
        From: msg.From,
        Data: msg.Data,
    })
}

Step 5: Runner Processing

The Runner wraps the component and handles:

  • Message caching
  • Schema generation
  • OpenTelemetry tracing
  • Error handling
go
// runner.go
func (r *Runner) MsgHandler(ctx context.Context, msg runner.Msg) error {
    // Create trace span
    ctx, span := r.tracer.Start(ctx, "handle-message")
    defer span.End()

    // Evaluate expressions and deserialize message
    data := r.evaluateAndDeserialize(msg.Data, msg.To)

    // Call component's Handle method
    result := r.component.Handle(ctx, r.outputHandler, msg.To, data)

    // Handle result (error or success)
    return r.processResult(result)
}

Step 6: Component Handle

Your component receives the message:

go
func (c *MyComponent) Handle(
    ctx context.Context,
    output module.Handler,
    port string,
    msg any,
) any {
    input := msg.(InputType)

    // Process...
    result := process(input)

    // Send to output port
    return output(ctx, "output", result)
}

Step 7: Output Handler

The output handler routes the result:

go
// runner.go - output handler
func (r *Runner) outputHandler(ctx context.Context, port string, data any) any {
    // Find edges from this port
    edges := r.getEdgesFromPort(port)

    for _, edge := range edges {
        // Evaluate edge expressions
        transformedData := evaluator.Evaluate(edge.Configuration, data)

        // Route to next node
        if r.isSameModule(edge.To) {
            // Use Go channel (fast, no serialization)
            r.scheduler.Handle(ctx, runner.Msg{
                To:   edge.To + "." + edge.ToPort,
                Data: transformedData,
            })
        } else {
            // Use gRPC (cross-module)
            r.grpcClient.Send(ctx, edge.To, edge.ToPort, transformedData)
        }
    }

    return nil
}

Step 8: Edge Expression Evaluation

Data is transformed using expressions:

go
// Edge configuration
{
    "userId": "{{$.user.id}}",
    "fullName": "{{'Hello ' + $.user.firstName + ' ' + $.user.lastName}}",
    "count": "{{$.items.length}}"
}

// Source data
{
    "user": {"id": "123", "firstName": "John", "lastName": "Doe"},
    "items": [1, 2, 3]
}

// Result after evaluation
{
    "userId": "123",
    "fullName": "Hello John Doe",
    "count": 3
}

Step 9: Next Node

The process repeats for the next node in the flow.

Same Module vs Cross-Module

Same Module Communication

┌─────────────────────────────────────────────────────────────┐
│                    SAME MODULE                               │
│                                                              │
│    Component A          Go Channel         Component B       │
│   ┌───────────┐     ─────────────────▶    ┌───────────┐     │
│   │  output() │                           │  Handle() │     │
│   └───────────┘     No serialization      └───────────┘     │
│                     Fast, direct                             │
└─────────────────────────────────────────────────────────────┘
  • Fast: Direct Go function call
  • No serialization: Data passed as-is
  • Same process: Shared memory

Cross-Module Communication

┌──────────────────────┐          gRPC           ┌──────────────────────┐
│     MODULE A         │ ◀─────────────────────▶ │     MODULE B         │
│                      │                          │                      │
│  Component A         │   1. Serialize (JSON)    │  Component C         │
│  ┌───────────┐       │   2. gRPC Send           │  ┌───────────┐       │
│  │  output() │───────│──────────────────────────│──│  Handle() │       │
│  └───────────┘       │   3. Deserialize         │  └───────────┘       │
│                      │                          │                      │
└──────────────────────┘                          └──────────────────────┘
  • Network call: gRPC over TCP
  • Serialization: JSON marshaling/unmarshaling
  • Discovery: Via TinyModule CRD

Blocking Behavior

Critical: The default execution is blocking.

When you call output(ctx, "output", data):

  1. The message is sent to the next node
  2. The next node's Handle() is called
  3. All downstream processing completes
  4. Only then does output() return
go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    fmt.Println("Before output")

    // This BLOCKS until downstream completes
    err := output(ctx, "output", msg)

    fmt.Println("After output")  // Runs after ALL downstream is done
    return err
}

See Blocking vs Async for details.

Error Propagation

Errors propagate back through the call chain:

Node A                    Node B                    Node C
  │                         │                         │
  │ output() ───────────────│─────────────────────────│──▶ returns error
  │                         │                         │
  │ ◀───────────────────────│◀────────────────────────│─── error propagates
  │                         │                         │
  │ Handle() returns error  │                         │
  │                         │                         │
go
func (c *ComponentB) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    result := process(msg)
    if result.Error != nil {
        return result.Error  // Error goes back to Node A
    }
    return output(ctx, "output", result)
}

Next Steps

Build flow-based applications on Kubernetes