Skip to content

Internal Routing

Internal routing handles message flow within a single module. Messages between components in the same module use Go channels instead of gRPC for efficiency.

Overview

┌─────────────────────────────────────────────────────────────────────────────┐
│                         INTERNAL ROUTING                                     │
└─────────────────────────────────────────────────────────────────────────────┘

                    ┌─────────────────────────────────────────┐
                    │              MODULE POD                  │
                    │                                          │
                    │  ┌─────────────────────────────────────┐│
                    │  │            SCHEDULER                 ││
                    │  │                                      ││
                    │  │  instancesMap:                       ││
                    │  │    node-abc123 → Runner A            ││
                    │  │    node-def456 → Runner B            ││
                    │  │    node-ghi789 → Runner C            ││
                    │  │                                      ││
                    │  └──────────────┬───────────────────────┘│
                    │                 │                        │
                    │     ┌───────────┼───────────┐           │
                    │     │           │           │           │
                    │     ▼           ▼           ▼           │
                    │  ┌──────┐   ┌──────┐   ┌──────┐        │
                    │  │Runner│   │Runner│   │Runner│        │
                    │  │  A   │──▶│  B   │──▶│  C   │        │
                    │  └──────┘   └──────┘   └──────┘        │
                    │   (Go)       (Go)       (Go)           │
                    │  channel    channel    channel          │
                    └─────────────────────────────────────────┘

The Scheduler

The Scheduler manages all component instances within a module:

go
type Scheduler struct {
    instancesMap *InstancesMap  // node-name → Runner
    clientPool   *Pool          // module-name → gRPC connection
    module       *Module        // Module metadata
}

func (s *Scheduler) Handle(ctx context.Context, msg runner.Msg) error {
    nodeName, portName := parseDestination(msg.To)

    // Check local instances first
    if r, exists := s.instancesMap.Get(nodeName); exists {
        return r.MsgHandler(ctx, msg)  // Direct Go call
    }

    // Not local - must be remote
    return s.sendToRemote(ctx, msg)
}

Instance Registration

When a TinyNode CR is created, the controller registers it with the Scheduler:

go
func (r *TinyNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    node := &v1alpha1.TinyNode{}
    r.Get(ctx, req.NamespacedName, node)

    // Create or update instance
    instance := r.Scheduler.GetOrCreateInstance(node.Name, node.Spec.Component)

    // Deliver messages to instance
    r.Scheduler.Update(ctx, node)

    return ctrl.Result{}, nil
}

Message Flow

Step 1: Output Call

Component calls output() function:

go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    result := c.process(msg)
    return output(ctx, "output", result)  // This triggers routing
}

Step 2: Edge Resolution

The Runner resolves the output port to connected edges:

go
func (r *Runner) MsgHandler(ctx context.Context, msg runner.Msg) error {
    // Find edges from this port
    edges := r.findEdges(msg.From)

    for _, edge := range edges {
        // Evaluate data transformation
        data := r.evaluateExpression(edge.Data, msg.Data)

        // Send to destination
        err := r.scheduler.Handle(ctx, runner.Msg{
            To:   edge.To,
            From: msg.From,
            Data: data,
        })
        if err != nil {
            return err
        }
    }
    return nil
}

Step 3: Local Delivery

For local nodes, delivery is direct:

go
func (s *Scheduler) Handle(ctx context.Context, msg runner.Msg) error {
    nodeName, portName := parseDestination(msg.To)

    if runner, exists := s.instancesMap.Get(nodeName); exists {
        // Direct function call - no network!
        return runner.MsgHandler(ctx, msg)
    }

    // ... remote handling
}

Edge Evaluation

Edges define data transformation between ports:

yaml
edges:
  - from: node-a.output
    to: node-b.input
    data:
      field: "{{$.value}}"
      computed: "{{$.a + $.b}}"

The evaluator transforms data:

go
func (r *Runner) evaluateExpression(template any, sourceData any) any {
    // Walk the template, evaluate {{...}} expressions
    return evaluator.Evaluate(template, sourceData)
}

Blocking Semantics

Internal routing maintains blocking semantics:

Component A          Scheduler          Component B          Component C
     │                  │                   │                    │
     │ output("out")    │                   │                    │
     │ ════════════════▶│                   │                    │
     │                  │ Handle(B.input)   │                    │
     │                  │ ══════════════════╪════════════════╗   │
     │                  │                   │ Handle()       ║   │
     │                  │                   │ ══════════════▶║   │
     │                  │                   │                ║   │
     │                  │                   │   output()     ║   │
     │                  │                   │ ═══════════════╪══▶│
     │                  │                   │                │   │ Handle()
     │                  │                   │                │   │ return
     │                  │                   │                │◀══╝
     │                  │                   │ return         │
     │                  │◀══════════════════╝                │
     │ return           │                   │                │
     │◀═════════════════╝                   │                │
     │                  │                   │                │
     ▼                  ▼                   ▼                ▼

Concurrent Execution

Multiple instances can process messages concurrently:

go
type Runner struct {
    component  module.Component
    msgHandler func(ctx context.Context, msg Msg) error
    // Each Runner handles its own messages
    // No global lock on Scheduler
}

However, a single instance processes one message at a time (unless using async patterns).

Instance Map

The instances map provides thread-safe access:

go
type InstancesMap struct {
    m  map[string]*Runner
    mu sync.RWMutex
}

func (im *InstancesMap) Get(name string) (*Runner, bool) {
    im.mu.RLock()
    defer im.mu.RUnlock()
    r, ok := im.m[name]
    return r, ok
}

func (im *InstancesMap) Set(name string, runner *Runner) {
    im.mu.Lock()
    defer im.mu.Unlock()
    im.m[name] = runner
}

Performance Characteristics

AspectInternalExternal (gRPC)
LatencyMicrosecondsMilliseconds
SerializationNoneProtobuf
NetworkNoneTCP/HTTP2
BlockingSame goroutineSeparate goroutine

Debugging Internal Routing

Trace Messages

Enable debug logging:

go
log.Debug("routing message",
    "from", msg.From,
    "to", msg.To,
    "local", isLocal,
)

Check Instance Map

go
// List all registered instances
for name, runner := range s.instancesMap.All() {
    log.Info("registered instance", "name", name, "component", runner.ComponentName())
}

Verify Edges

go
// Check edge configuration
for _, edge := range node.Spec.Edges {
    log.Info("edge",
        "from", edge.From,
        "to", edge.To,
        "hasData", edge.Data != nil,
    )
}

Common Issues

Missing Instance

Error: instance not found: node-xyz123

Cause: TinyNode not yet reconciled or deleted.

Solution: Check TinyNode exists in Kubernetes.

Edge Not Connected

Warning: no edges from port: node-abc.output

Cause: Output port has no connections.

Solution: Connect the port in the visual editor.

Expression Error

Error: expression evaluation failed: undefined variable $.missing

Cause: Source data doesn't have expected field.

Solution: Check source schema or use optional chaining ($.missing ?? 'default').

Next Steps

Build flow-based applications on Kubernetes