Skip to content

Conditional Router Component

A complete example of a routing component with multiple output paths.

Overview

This component routes messages to different output ports based on configurable conditions. It demonstrates:

  • Multiple output ports
  • Condition evaluation
  • Default routing
  • Expression-based rules

Complete Implementation

go
package router

import (
    "context"
    "fmt"
    "regexp"
    "strconv"

    "github.com/tiny-systems/module/api/v1alpha1"
    "github.com/tiny-systems/module/pkg/module"
)

const ComponentName = "conditional_router"

// Component struct
type Component struct {
    settings Settings
}

// Rule defines a routing condition
type Rule struct {
    Name      string `json:"name" title:"Rule Name" required:"true"
        description:"Identifier for this rule"`
    Field     string `json:"field" title:"Field" required:"true"
        description:"Field path to evaluate"`
    Operator  string `json:"operator" title:"Operator" required:"true"
        enum:"equals,not_equals,contains,starts_with,ends_with,greater_than,less_than,matches"
        description:"Comparison operator"`
    Value     string `json:"value" title:"Value" required:"true"
        description:"Value to compare against"`
    OutputPort string `json:"outputPort" title:"Output Port" required:"true"
        description:"Port to route matching messages to"`
}

// Settings for the router
type Settings struct {
    Rules       []Rule `json:"rules" title:"Routing Rules" minItems:"1"
        description:"List of routing rules (evaluated in order)"`
    DefaultPort string `json:"defaultPort" title:"Default Port" default:"default"
        description:"Port for messages that don't match any rule"`
    StopOnMatch bool   `json:"stopOnMatch" title:"Stop on First Match" default:"true"
        description:"Stop evaluating rules after first match"`
}

// Input message
type Input struct {
    Data any `json:"data" title:"Data" configurable:"true"
        description:"Message to route"`
}

// RouterOutput wraps the routed message
type RouterOutput struct {
    Data        any    `json:"data" title:"Data"`
    MatchedRule string `json:"matchedRule" title:"Matched Rule"`
}

// DefaultOutput for unmatched messages
type DefaultOutput struct {
    Data           any      `json:"data" title:"Data"`
    EvaluatedRules []string `json:"evaluatedRules" title:"Evaluated Rules"`
}

var _ module.Component = (*Component)(nil)

func (c *Component) GetInfo() module.ComponentInfo {
    return module.ComponentInfo{
        Name:        ComponentName,
        Title:       "Conditional Router",
        Description: "Routes messages based on configurable conditions",
        Category:    "Flow Control",
        Tags:        []string{"router", "condition", "branch"},
    }
}

func (c *Component) Ports() []module.Port {
    ports := []module.Port{
        {
            Name:          v1alpha1.SettingsPort,
            Label:         "Settings",
            Position:      module.PositionTop,
            Source:        true,
            Configuration: Settings{},
        },
        {
            Name:          "input",
            Label:         "Input",
            Position:      module.PositionLeft,
            Source:        true,
            Configuration: Input{},
        },
        // Route A output
        {
            Name:          "route_a",
            Label:         "Route A",
            Position:      module.PositionRight,
            Source:        false,
            Configuration: RouterOutput{},
        },
        // Route B output
        {
            Name:          "route_b",
            Label:         "Route B",
            Position:      module.PositionRight,
            Source:        false,
            Configuration: RouterOutput{},
        },
        // Route C output
        {
            Name:          "route_c",
            Label:         "Route C",
            Position:      module.PositionRight,
            Source:        false,
            Configuration: RouterOutput{},
        },
        // Default output
        {
            Name:          "default",
            Label:         "Default",
            Position:      module.PositionBottom,
            Source:        false,
            Configuration: DefaultOutput{},
        },
    }
    return ports
}

func (c *Component) Handle(
    ctx context.Context,
    output module.Handler,
    port string,
    msg any,
) error {
    switch port {
    case v1alpha1.SettingsPort:
        c.settings = msg.(Settings)
        return nil

    case "input":
        return c.handleInput(ctx, output, msg.(Input))

    default:
        return fmt.Errorf("unknown port: %s", port)
    }
}

func (c *Component) handleInput(
    ctx context.Context,
    output module.Handler,
    input Input,
) error {
    data, ok := input.Data.(map[string]any)
    if !ok {
        // Non-map data goes to default
        return output(ctx, c.settings.DefaultPort, DefaultOutput{
            Data:           input.Data,
            EvaluatedRules: []string{},
        })
    }

    var evaluatedRules []string

    for _, rule := range c.settings.Rules {
        evaluatedRules = append(evaluatedRules, rule.Name)

        matched, err := c.evaluateRule(data, rule)
        if err != nil {
            continue // Skip rules that error
        }

        if matched {
            err := output(ctx, rule.OutputPort, RouterOutput{
                Data:        input.Data,
                MatchedRule: rule.Name,
            })
            if err != nil {
                return err
            }

            if c.settings.StopOnMatch {
                return nil
            }
        }
    }

    // No rules matched, send to default
    return output(ctx, c.settings.DefaultPort, DefaultOutput{
        Data:           input.Data,
        EvaluatedRules: evaluatedRules,
    })
}

func (c *Component) evaluateRule(data map[string]any, rule Rule) (bool, error) {
    // Get field value
    fieldValue, ok := data[rule.Field]
    if !ok {
        return false, fmt.Errorf("field not found: %s", rule.Field)
    }

    // Convert to string for comparison
    fieldStr := fmt.Sprintf("%v", fieldValue)
    ruleValue := rule.Value

    switch rule.Operator {
    case "equals":
        return fieldStr == ruleValue, nil

    case "not_equals":
        return fieldStr != ruleValue, nil

    case "contains":
        return contains(fieldStr, ruleValue), nil

    case "starts_with":
        return startsWith(fieldStr, ruleValue), nil

    case "ends_with":
        return endsWith(fieldStr, ruleValue), nil

    case "greater_than":
        return compareNumeric(fieldValue, ruleValue, ">"), nil

    case "less_than":
        return compareNumeric(fieldValue, ruleValue, "<"), nil

    case "matches":
        re, err := regexp.Compile(ruleValue)
        if err != nil {
            return false, err
        }
        return re.MatchString(fieldStr), nil

    default:
        return false, fmt.Errorf("unknown operator: %s", rule.Operator)
    }
}

func contains(s, substr string) bool {
    return len(s) >= len(substr) &&
        (s == substr || len(substr) == 0 ||
         findSubstring(s, substr) >= 0)
}

func findSubstring(s, substr string) int {
    for i := 0; i <= len(s)-len(substr); i++ {
        if s[i:i+len(substr)] == substr {
            return i
        }
    }
    return -1
}

func startsWith(s, prefix string) bool {
    return len(s) >= len(prefix) && s[:len(prefix)] == prefix
}

func endsWith(s, suffix string) bool {
    return len(s) >= len(suffix) && s[len(s)-len(suffix):] == suffix
}

func compareNumeric(fieldValue any, ruleValue string, op string) bool {
    var fieldNum float64
    switch v := fieldValue.(type) {
    case float64:
        fieldNum = v
    case int:
        fieldNum = float64(v)
    case string:
        var err error
        fieldNum, err = strconv.ParseFloat(v, 64)
        if err != nil {
            return false
        }
    default:
        return false
    }

    ruleNum, err := strconv.ParseFloat(ruleValue, 64)
    if err != nil {
        return false
    }

    switch op {
    case ">":
        return fieldNum > ruleNum
    case "<":
        return fieldNum < ruleNum
    default:
        return false
    }
}

func (c *Component) Instance() module.Component {
    return &Component{}
}

Usage Example

Settings Configuration

yaml
edges:
  - port: _settings
    data:
      rules:
        - name: "high_priority"
          field: "priority"
          operator: "equals"
          value: "high"
          outputPort: "route_a"
        - name: "error_status"
          field: "status"
          operator: "greater_than"
          value: "399"
          outputPort: "route_b"
        - name: "admin_user"
          field: "role"
          operator: "equals"
          value: "admin"
          outputPort: "route_c"
      defaultPort: "default"
      stopOnMatch: true

Input Examples

High Priority Message:

json
{
  "data": {
    "priority": "high",
    "message": "Urgent task"
  }
}

Routes to: route_a

Error Response:

json
{
  "data": {
    "status": 500,
    "error": "Internal server error"
  }
}

Routes to: route_b

Normal Message:

json
{
  "data": {
    "priority": "low",
    "status": 200
  }
}

Routes to: default

Visual Flow

                    ┌─────────────────────┐
                    │  Conditional Router │
                    │                     │
    Input ─────────►│  Rules:             │
                    │  1. priority=high   │──────► Route A (High Priority)
                    │  2. status>399      │──────► Route B (Errors)
                    │  3. role=admin      │──────► Route C (Admin)
                    │                     │
                    │  Default            │──────► Default (Everything else)
                    └─────────────────────┘

Key Patterns Demonstrated

1. Multiple Output Ports

go
{
    Name:     "route_a",
    Label:    "Route A",
    Position: module.PositionRight,
    Source:   false,
},
{
    Name:     "route_b",
    Label:    "Route B",
    Position: module.PositionRight,
    Source:   false,
},

2. Dynamic Rule Evaluation

Rules are evaluated in order, allowing priority-based routing:

go
for _, rule := range c.settings.Rules {
    if matched {
        output(ctx, rule.OutputPort, ...)
        if c.settings.StopOnMatch {
            return nil
        }
    }
}

3. Default Fallback

Always provide a default path for unmatched messages:

go
return output(ctx, c.settings.DefaultPort, DefaultOutput{
    Data:           input.Data,
    EvaluatedRules: evaluatedRules,
})

4. Rule Metadata in Output

Include information about which rule matched:

go
type RouterOutput struct {
    Data        any    `json:"data"`
    MatchedRule string `json:"matchedRule"`
}

Advanced Configuration

Multi-Match Mode

When stopOnMatch is false, a message can be sent to multiple routes:

yaml
stopOnMatch: false
rules:
  - name: "log_all"
    field: "type"
    operator: "not_equals"
    value: ""
    outputPort: "logging"
  - name: "high_priority"
    field: "priority"
    operator: "equals"
    value: "high"
    outputPort: "priority_queue"

This sends messages to both logging and (if high priority) priority_queue.

Regex Matching

yaml
rules:
  - name: "email_pattern"
    field: "email"
    operator: "matches"
    value: "^[^@]+@company\\.com$"
    outputPort: "internal"

Extension Ideas

  1. Nested Field Access: Support paths like user.profile.role
  2. Multiple Conditions: AND/OR logic for complex rules
  3. Dynamic Ports: Create output ports based on field values
  4. Rule Templates: Pre-defined rule sets for common patterns

Build flow-based applications on Kubernetes