Internal Routing
Internal routing handles message flow within a single module. Messages between components in the same module use Go channels instead of gRPC for efficiency.
Overview
┌─────────────────────────────────────────────────────────────────────────────┐
│ INTERNAL ROUTING │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ MODULE POD │
│ │
│ ┌─────────────────────────────────────┐│
│ │ SCHEDULER ││
│ │ ││
│ │ instancesMap: ││
│ │ node-abc123 → Runner A ││
│ │ node-def456 → Runner B ││
│ │ node-ghi789 → Runner C ││
│ │ ││
│ └──────────────┬───────────────────────┘│
│ │ │
│ ┌───────────┼───────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │Runner│ │Runner│ │Runner│ │
│ │ A │──▶│ B │──▶│ C │ │
│ └──────┘ └──────┘ └──────┘ │
│ (Go) (Go) (Go) │
│ channel channel channel │
└─────────────────────────────────────────┘The Scheduler
The Scheduler manages all component instances within a module:
type Scheduler struct {
instancesMap *InstancesMap // node-name → Runner
clientPool *Pool // module-name → gRPC connection
module *Module // Module metadata
}
func (s *Scheduler) Handle(ctx context.Context, msg runner.Msg) error {
nodeName, portName := parseDestination(msg.To)
// Check local instances first
if r, exists := s.instancesMap.Get(nodeName); exists {
return r.MsgHandler(ctx, msg) // Direct Go call
}
// Not local - must be remote
return s.sendToRemote(ctx, msg)
}Instance Registration
When a TinyNode CR is created, the controller registers it with the Scheduler:
func (r *TinyNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
node := &v1alpha1.TinyNode{}
r.Get(ctx, req.NamespacedName, node)
// Create or update instance
instance := r.Scheduler.GetOrCreateInstance(node.Name, node.Spec.Component)
// Deliver messages to instance
r.Scheduler.Update(ctx, node)
return ctrl.Result{}, nil
}Message Flow
Step 1: Output Call
Component calls output() function:
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
result := c.process(msg)
return output(ctx, "output", result) // This triggers routing
}Step 2: Edge Resolution
The Runner resolves the output port to connected edges:
func (r *Runner) MsgHandler(ctx context.Context, msg runner.Msg) error {
// Find edges from this port
edges := r.findEdges(msg.From)
for _, edge := range edges {
// Evaluate data transformation
data := r.evaluateExpression(edge.Data, msg.Data)
// Send to destination
err := r.scheduler.Handle(ctx, runner.Msg{
To: edge.To,
From: msg.From,
Data: data,
})
if err != nil {
return err
}
}
return nil
}Step 3: Local Delivery
For local nodes, delivery is direct:
func (s *Scheduler) Handle(ctx context.Context, msg runner.Msg) error {
nodeName, portName := parseDestination(msg.To)
if runner, exists := s.instancesMap.Get(nodeName); exists {
// Direct function call - no network!
return runner.MsgHandler(ctx, msg)
}
// ... remote handling
}Edge Evaluation
Edges define data transformation between ports:
edges:
- from: node-a.output
to: node-b.input
data:
field: "{{$.value}}"
computed: "{{$.a + $.b}}"The evaluator transforms data:
func (r *Runner) evaluateExpression(template any, sourceData any) any {
// Walk the template, evaluate {{...}} expressions
return evaluator.Evaluate(template, sourceData)
}Blocking Semantics
Internal routing maintains blocking semantics:
Component A Scheduler Component B Component C
│ │ │ │
│ output("out") │ │ │
│ ════════════════▶│ │ │
│ │ Handle(B.input) │ │
│ │ ══════════════════╪════════════════╗ │
│ │ │ Handle() ║ │
│ │ │ ══════════════▶║ │
│ │ │ ║ │
│ │ │ output() ║ │
│ │ │ ═══════════════╪══▶│
│ │ │ │ │ Handle()
│ │ │ │ │ return
│ │ │ │◀══╝
│ │ │ return │
│ │◀══════════════════╝ │
│ return │ │ │
│◀═════════════════╝ │ │
│ │ │ │
▼ ▼ ▼ ▼Concurrent Execution
Multiple instances can process messages concurrently:
type Runner struct {
component module.Component
msgHandler func(ctx context.Context, msg Msg) error
// Each Runner handles its own messages
// No global lock on Scheduler
}However, a single instance processes one message at a time (unless using async patterns).
Instance Map
The instances map provides thread-safe access:
type InstancesMap struct {
m map[string]*Runner
mu sync.RWMutex
}
func (im *InstancesMap) Get(name string) (*Runner, bool) {
im.mu.RLock()
defer im.mu.RUnlock()
r, ok := im.m[name]
return r, ok
}
func (im *InstancesMap) Set(name string, runner *Runner) {
im.mu.Lock()
defer im.mu.Unlock()
im.m[name] = runner
}Performance Characteristics
| Aspect | Internal | External (gRPC) |
|---|---|---|
| Latency | Microseconds | Milliseconds |
| Serialization | None | Protobuf |
| Network | None | TCP/HTTP2 |
| Blocking | Same goroutine | Separate goroutine |
Debugging Internal Routing
Trace Messages
Enable debug logging:
log.Debug("routing message",
"from", msg.From,
"to", msg.To,
"local", isLocal,
)Check Instance Map
// List all registered instances
for name, runner := range s.instancesMap.All() {
log.Info("registered instance", "name", name, "component", runner.ComponentName())
}Verify Edges
// Check edge configuration
for _, edge := range node.Spec.Edges {
log.Info("edge",
"from", edge.From,
"to", edge.To,
"hasData", edge.Data != nil,
)
}Common Issues
Missing Instance
Error: instance not found: node-xyz123Cause: TinyNode not yet reconciled or deleted.
Solution: Check TinyNode exists in Kubernetes.
Edge Not Connected
Warning: no edges from port: node-abc.outputCause: Output port has no connections.
Solution: Connect the port in the visual editor.
Expression Error
Error: expression evaluation failed: undefined variable $.missingCause: Source data doesn't have expected field.
Solution: Check source schema or use optional chaining ($.missing ?? 'default').
Next Steps
- Cross-Module Communication - Between modules
- gRPC Fundamentals - gRPC details
- Message Flow - Complete flow