Skip to content

Internal Routing

Internal routing handles message flow within a single module. Messages between components in the same module use Go channels instead of gRPC for efficiency.

Overview

+-----------------------------------------------------------------------+
|                          INTERNAL ROUTING                             |
+-----------------------------------------------------------------------+

                 +---------------------------------------------+
                 |                 MODULE POD                   |
                 |                                              |
                 |  +-----------------------------------------+ |
                 |  |               SCHEDULER                 | |
                 |  |                                         | |
                 |  |  instancesMap:                          | |
                 |  |    node-abc123 -> Runner A              | |
                 |  |    node-def456 -> Runner B              | |
                 |  |    node-ghi789 -> Runner C              | |
                 |  |                                         | |
                 |  +------------------+----------------------+ |
                 |                     |                        |
                 |        +------------+------------+           |
                 |        |            |            |           |
                 |        v            v            v           |
                 |     +------+    +------+    +------+         |
                 |     |Runner|    |Runner|    |Runner|         |
                 |     |  A   |--->|  B   |--->|  C   |         |
                 |     +------+    +------+    +------+         |
                 |      (Go)        (Go)        (Go)            |
                 |     channel     channel     channel          |
                 +---------------------------------------------+

The Scheduler

The Scheduler manages all component instances within a module:

go
type Scheduler struct {
    instancesMap *InstancesMap  // node-name -> Runner
    clientPool   *Pool          // module-name -> gRPC connection
    module       *Module        // Module metadata
}

func (s *Scheduler) Handle(ctx context.Context, msg runner.Msg) error {
    nodeName, portName := parseDestination(msg.To)

    // Check local instances first
    if r, exists := s.instancesMap.Get(nodeName); exists {
        return r.MsgHandler(ctx, msg)  // Direct Go call
    }

    // Not local - must be remote
    return s.sendToRemote(ctx, msg)
}

Instance Registration

When a TinyNode CR is created, the controller registers it with the Scheduler:

go
func (r *TinyNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    node := &v1alpha1.TinyNode{}
    r.Get(ctx, req.NamespacedName, node)

    // Create or update instance
    instance := r.Scheduler.GetOrCreateInstance(node.Name, node.Spec.Component)

    // Deliver messages to instance
    r.Scheduler.Update(ctx, node)

    return ctrl.Result{}, nil
}

Message Flow

Step 1: Output Call

Component calls output() function:

go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    result := c.process(msg)
    return output(ctx, "output", result)  // This triggers routing
}

Step 2: Edge Resolution

The Runner resolves the output port to connected edges:

go
func (r *Runner) MsgHandler(ctx context.Context, msg runner.Msg) error {
    // Find edges from this port
    edges := r.findEdges(msg.From)

    for _, edge := range edges {
        // Evaluate data transformation
        data := r.evaluateExpression(edge.Data, msg.Data)

        // Send to destination
        err := r.scheduler.Handle(ctx, runner.Msg{
            To:   edge.To,
            From: msg.From,
            Data: data,
        })
        if err != nil {
            return err
        }
    }
    return nil
}

Step 3: Local Delivery

For local nodes, delivery is direct:

go
func (s *Scheduler) Handle(ctx context.Context, msg runner.Msg) error {
    nodeName, portName := parseDestination(msg.To)

    if runner, exists := s.instancesMap.Get(nodeName); exists {
        // Direct function call - no network!
        return runner.MsgHandler(ctx, msg)
    }

    // ... remote handling
}

Edge Evaluation

Edges define data transformation between ports:

yaml
edges:
  - from: node-a.output
    to: node-b.input
    data:
      field: "{{$.value}}"
      computed: "{{$.a + $.b}}"

The evaluator transforms data:

go
func (r *Runner) evaluateExpression(template any, sourceData any) any {
    // Walk the template, evaluate {{...}} expressions
    return evaluator.Evaluate(template, sourceData)
}

Blocking Semantics

Internal routing maintains blocking semantics:

Component A          Scheduler          Component B          Component C
     |                  |                   |                    |
     | output("out")    |                   |                    |
     | ================>|                   |                    |
     |                  | Handle(B.input)   |                    |
     |                  | ==================+================+   |
     |                  |                   | Handle()       |   |
     |                  |                   | ==============>|   |
     |                  |                   |                |   |
     |                  |                   |   output()     |   |
     |                  |                   | ===============+==>|
     |                  |                   |                |   | Handle()
     |                  |                   |                |   | return
     |                  |                   |                |<==+
     |                  |                   | return         |
     |                  |<==================+                |
     | return           |                   |                |
     |<=================+                   |                |
     |                  |                   |                |
     v                  v                   v                v

Concurrent Execution

Multiple instances can process messages concurrently:

go
type Runner struct {
    component  module.Component
    msgHandler func(ctx context.Context, msg Msg) error
    // Each Runner handles its own messages
    // No global lock on Scheduler
}

However, a single instance processes one message at a time (unless using async patterns).

Instance Map

The instances map provides thread-safe access:

go
type InstancesMap struct {
    m  map[string]*Runner
    mu sync.RWMutex
}

func (im *InstancesMap) Get(name string) (*Runner, bool) {
    im.mu.RLock()
    defer im.mu.RUnlock()
    r, ok := im.m[name]
    return r, ok
}

func (im *InstancesMap) Set(name string, runner *Runner) {
    im.mu.Lock()
    defer im.mu.Unlock()
    im.m[name] = runner
}

Performance Characteristics

AspectInternalExternal (gRPC)
LatencyMicrosecondsMilliseconds
SerializationNoneProtobuf
NetworkNoneTCP/HTTP2
BlockingSame goroutineSeparate goroutine

Debugging Internal Routing

Trace Messages

Enable debug logging:

go
log.Debug("routing message",
    "from", msg.From,
    "to", msg.To,
    "local", isLocal,
)

Check Instance Map

go
// List all registered instances
for name, runner := range s.instancesMap.All() {
    log.Info("registered instance", "name", name, "component", runner.ComponentName())
}

Verify Edges

go
// Check edge configuration
for _, edge := range node.Spec.Edges {
    log.Info("edge",
        "from", edge.From,
        "to", edge.To,
        "hasData", edge.Data != nil,
    )
}

Common Issues

Missing Instance

Error: instance not found: node-xyz123

Cause: TinyNode not yet reconciled or deleted.

Solution: Check TinyNode exists in Kubernetes.

Edge Not Connected

Warning: no edges from port: node-abc.output

Cause: Output port has no connections.

Solution: Connect the port in the visual editor.

Expression Error

Error: expression evaluation failed: undefined variable $.missing

Cause: Source data doesn't have expected field.

Solution: Check source schema or use optional chaining ($.missing ?? 'default').

Next Steps

Build flow-based applications on Kubernetes