Internal Routing
Internal routing handles message flow within a single module. Messages between components in the same module use Go channels instead of gRPC for efficiency.
Overview
+-----------------------------------------------------------------------+
| INTERNAL ROUTING |
+-----------------------------------------------------------------------+
+---------------------------------------------+
| MODULE POD |
| |
| +-----------------------------------------+ |
| | SCHEDULER | |
| | | |
| | instancesMap: | |
| | node-abc123 -> Runner A | |
| | node-def456 -> Runner B | |
| | node-ghi789 -> Runner C | |
| | | |
| +------------------+----------------------+ |
| | |
| +------------+------------+ |
| | | | |
| v v v |
| +------+ +------+ +------+ |
| |Runner| |Runner| |Runner| |
| | A |--->| B |--->| C | |
| +------+ +------+ +------+ |
| (Go) (Go) (Go) |
| channel channel channel |
+---------------------------------------------+The Scheduler
The Scheduler manages all component instances within a module:
type Scheduler struct {
instancesMap *InstancesMap // node-name -> Runner
clientPool *Pool // module-name -> gRPC connection
module *Module // Module metadata
}
func (s *Scheduler) Handle(ctx context.Context, msg runner.Msg) error {
nodeName, portName := parseDestination(msg.To)
// Check local instances first
if r, exists := s.instancesMap.Get(nodeName); exists {
return r.MsgHandler(ctx, msg) // Direct Go call
}
// Not local - must be remote
return s.sendToRemote(ctx, msg)
}Instance Registration
When a TinyNode CR is created, the controller registers it with the Scheduler:
func (r *TinyNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
node := &v1alpha1.TinyNode{}
r.Get(ctx, req.NamespacedName, node)
// Create or update instance
instance := r.Scheduler.GetOrCreateInstance(node.Name, node.Spec.Component)
// Deliver messages to instance
r.Scheduler.Update(ctx, node)
return ctrl.Result{}, nil
}Message Flow
Step 1: Output Call
Component calls output() function:
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
result := c.process(msg)
return output(ctx, "output", result) // This triggers routing
}Step 2: Edge Resolution
The Runner resolves the output port to connected edges:
func (r *Runner) MsgHandler(ctx context.Context, msg runner.Msg) error {
// Find edges from this port
edges := r.findEdges(msg.From)
for _, edge := range edges {
// Evaluate data transformation
data := r.evaluateExpression(edge.Data, msg.Data)
// Send to destination
err := r.scheduler.Handle(ctx, runner.Msg{
To: edge.To,
From: msg.From,
Data: data,
})
if err != nil {
return err
}
}
return nil
}Step 3: Local Delivery
For local nodes, delivery is direct:
func (s *Scheduler) Handle(ctx context.Context, msg runner.Msg) error {
nodeName, portName := parseDestination(msg.To)
if runner, exists := s.instancesMap.Get(nodeName); exists {
// Direct function call - no network!
return runner.MsgHandler(ctx, msg)
}
// ... remote handling
}Edge Evaluation
Edges define data transformation between ports:
edges:
- from: node-a.output
to: node-b.input
data:
field: "{{$.value}}"
computed: "{{$.a + $.b}}"The evaluator transforms data:
func (r *Runner) evaluateExpression(template any, sourceData any) any {
// Walk the template, evaluate {{...}} expressions
return evaluator.Evaluate(template, sourceData)
}Blocking Semantics
Internal routing maintains blocking semantics:
Component A Scheduler Component B Component C
| | | |
| output("out") | | |
| ================>| | |
| | Handle(B.input) | |
| | ==================+================+ |
| | | Handle() | |
| | | ==============>| |
| | | | |
| | | output() | |
| | | ===============+==>|
| | | | | Handle()
| | | | | return
| | | |<==+
| | | return |
| |<==================+ |
| return | | |
|<=================+ | |
| | | |
v v v vConcurrent Execution
Multiple instances can process messages concurrently:
type Runner struct {
component module.Component
msgHandler func(ctx context.Context, msg Msg) error
// Each Runner handles its own messages
// No global lock on Scheduler
}However, a single instance processes one message at a time (unless using async patterns).
Instance Map
The instances map provides thread-safe access:
type InstancesMap struct {
m map[string]*Runner
mu sync.RWMutex
}
func (im *InstancesMap) Get(name string) (*Runner, bool) {
im.mu.RLock()
defer im.mu.RUnlock()
r, ok := im.m[name]
return r, ok
}
func (im *InstancesMap) Set(name string, runner *Runner) {
im.mu.Lock()
defer im.mu.Unlock()
im.m[name] = runner
}Performance Characteristics
| Aspect | Internal | External (gRPC) |
|---|---|---|
| Latency | Microseconds | Milliseconds |
| Serialization | None | Protobuf |
| Network | None | TCP/HTTP2 |
| Blocking | Same goroutine | Separate goroutine |
Debugging Internal Routing
Trace Messages
Enable debug logging:
log.Debug("routing message",
"from", msg.From,
"to", msg.To,
"local", isLocal,
)Check Instance Map
// List all registered instances
for name, runner := range s.instancesMap.All() {
log.Info("registered instance", "name", name, "component", runner.ComponentName())
}Verify Edges
// Check edge configuration
for _, edge := range node.Spec.Edges {
log.Info("edge",
"from", edge.From,
"to", edge.To,
"hasData", edge.Data != nil,
)
}Common Issues
Missing Instance
Error: instance not found: node-xyz123Cause: TinyNode not yet reconciled or deleted.
Solution: Check TinyNode exists in Kubernetes.
Edge Not Connected
Warning: no edges from port: node-abc.outputCause: Output port has no connections.
Solution: Connect the port in the visual editor.
Expression Error
Error: expression evaluation failed: undefined variable $.missingCause: Source data doesn't have expected field.
Solution: Check source schema or use optional chaining ($.missing ?? 'default').
Next Steps
- Cross-Module Communication - Between modules
- gRPC Fundamentals - gRPC details
- Message Flow - Complete flow