Skip to content

Component Patterns

This guide covers common patterns for building TinySystems components. These patterns have been proven in production modules.

Pattern Categories

  • Processing Patterns - Transform, filter, route data
  • Source Patterns - Generate data, respond to events
  • Sink Patterns - Send data to external systems
  • Stateful Patterns - Manage component state

Processing Patterns

Transformer

Converts input to output format:

go
type Transformer struct {
    settings Settings
}

type Settings struct {
    Format string `json:"format" title:"Output Format" enum:"json,xml,csv"`
}

type Input struct {
    Data any `json:"data"`
}

type Output struct {
    Result string `json:"result"`
}

func (t *Transformer) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    switch port {
    case v1alpha1.SettingsPort:
        t.settings = msg.(Settings)
        return nil

    case "input":
        input := msg.(Input)
        result, err := t.transform(input.Data, t.settings.Format)
        if err != nil {
            return err
        }
        return output(ctx, "output", Output{Result: result})
    }
    return nil
}

Filter

Routes messages based on condition:

go
type Filter struct {
    settings Settings
}

type Settings struct {
    Condition string `json:"condition" title:"Filter Expression" required:"true"`
}

type Message struct {
    Data any `json:"data"`
}

func (f *Filter) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    if port == "input" {
        message := msg.(Message)

        match, err := f.evaluate(message.Data, f.settings.Condition)
        if err != nil {
            return output(ctx, "error", err.Error())
        }

        if match {
            return output(ctx, "match", message)
        }
        return output(ctx, "no_match", message)
    }
    return nil
}

func (f *Filter) Ports() []module.Port {
    return []module.Port{
        {Name: "input", Label: "Input", Source: true, Position: module.PositionLeft},
        {Name: "match", Label: "Match", Source: false, Position: module.PositionRight},
        {Name: "no_match", Label: "No Match", Source: false, Position: module.PositionBottom},
        {Name: "error", Label: "Error", Source: false, Position: module.PositionBottom},
    }
}

Router

Routes to multiple outputs based on key:

go
type Router struct {
    settings Settings
}

type Settings struct {
    Field  string   `json:"field" title:"Routing Field" required:"true"`
    Routes []string `json:"routes" title:"Route Names" minItems:"1"`
}

func (r *Router) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    if port == "input" {
        message := msg.(map[string]any)
        routeKey := fmt.Sprintf("%v", message[r.settings.Field])

        // Check if route exists
        for _, route := range r.settings.Routes {
            if route == routeKey {
                return output(ctx, route, message)
            }
        }

        // Default route
        return output(ctx, "default", message)
    }
    return nil
}

func (r *Router) Ports() []module.Port {
    ports := []module.Port{
        {Name: v1alpha1.SettingsPort, Source: true, Position: module.PositionTop},
        {Name: "input", Label: "Input", Source: true, Position: module.PositionLeft},
        {Name: "default", Label: "Default", Source: false, Position: module.PositionBottom},
    }

    // Dynamic ports from settings
    for _, route := range r.settings.Routes {
        ports = append(ports, module.Port{
            Name:     route,
            Label:    route,
            Source:   false,
            Position: module.PositionRight,
        })
    }

    return ports
}

Splitter

Splits arrays into individual items:

go
type Splitter struct{}

type Input struct {
    Items []any `json:"items"`
}

type Output struct {
    Item  any `json:"item"`
    Index int `json:"index"`
    Total int `json:"total"`
}

func (s *Splitter) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    if port == "input" {
        input := msg.(Input)
        total := len(input.Items)

        for i, item := range input.Items {
            select {
            case <-ctx.Done():
                return ctx.Err()
            default:
                if err := output(ctx, "item", Output{
                    Item:  item,
                    Index: i,
                    Total: total,
                }); err != nil {
                    return err
                }
            }
        }

        // Signal completion
        return output(ctx, "done", struct{ Total int }{Total: total})
    }
    return nil
}

Aggregator

Collects items and emits batches:

go
type Aggregator struct {
    settings Settings
    items    []any
    mu       sync.Mutex
}

type Settings struct {
    BatchSize int `json:"batchSize" title:"Batch Size" default:"10" minimum:"1"`
}

func (a *Aggregator) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    switch port {
    case v1alpha1.SettingsPort:
        a.settings = msg.(Settings)
        return nil

    case "item":
        a.mu.Lock()
        a.items = append(a.items, msg)
        shouldFlush := len(a.items) >= a.settings.BatchSize
        var batch []any
        if shouldFlush {
            batch = a.items
            a.items = nil
        }
        a.mu.Unlock()

        if shouldFlush {
            return output(ctx, "batch", batch)
        }
        return nil

    case "flush":
        a.mu.Lock()
        batch := a.items
        a.items = nil
        a.mu.Unlock()

        if len(batch) > 0 {
            return output(ctx, "batch", batch)
        }
        return nil
    }
    return nil
}

Source Patterns

Ticker

Emits messages at intervals:

go
type Ticker struct {
    settings   Settings
    isRunning  bool
    cancelFunc context.CancelFunc
    mu         sync.Mutex
}

type Settings struct {
    Interval int `json:"interval" title:"Interval (ms)" default:"1000"`
    Context  any `json:"context,omitempty" title:"Data to emit"`
}

type Control struct {
    Start bool `json:"start,omitempty" title:"Start" format:"button"`
    Stop  bool `json:"stop,omitempty" title:"Stop" format:"button"`
}

func (t *Ticker) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    switch port {
    case v1alpha1.SettingsPort:
        t.settings = msg.(Settings)
        return nil

    case v1alpha1.ControlPort:
        if !utils.IsLeader(ctx) {
            return nil
        }

        control := msg.(Control)

        t.mu.Lock()
        defer t.mu.Unlock()

        if control.Start && !t.isRunning {
            ctx, t.cancelFunc = context.WithCancel(ctx)
            t.isRunning = true
            output(context.Background(), v1alpha1.ReconcilePort, nil)

            // Blocking loop
            return t.tick(ctx, output)
        }

        if control.Stop && t.isRunning {
            t.cancelFunc()
            t.isRunning = false
            output(context.Background(), v1alpha1.ReconcilePort, nil)
        }

        return nil
    }
    return nil
}

func (t *Ticker) tick(ctx context.Context, output module.Handler) error {
    ticker := time.NewTicker(time.Duration(t.settings.Interval) * time.Millisecond)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            output(ctx, "tick", t.settings.Context)
        case <-ctx.Done():
            t.mu.Lock()
            t.isRunning = false
            t.mu.Unlock()
            return ctx.Err()
        }
    }
}

HTTP Server

Receives HTTP requests:

go
type HTTPServer struct {
    settings    Settings
    nodeName    string
    currentPort int
    listener    net.Listener
    mu          sync.Mutex
}

type Settings struct {
    Hostnames []string `json:"hostnames" title:"Hostnames"`
}

func (h *HTTPServer) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    switch port {
    case v1alpha1.SettingsPort:
        h.settings = msg.(Settings)
        return nil

    case v1alpha1.ReconcilePort:
        node, ok := msg.(v1alpha1.TinyNode)
        if !ok {
            return nil
        }
        return h.handleReconcile(ctx, output, node)

    case v1alpha1.ClientPort:
        // Forward request to flow
        req := msg.(Request)
        result := output(ctx, "request", req)
        return result
    }
    return nil
}

func (h *HTTPServer) handleReconcile(ctx context.Context, output module.Handler, node v1alpha1.TinyNode) error {
    h.nodeName = node.Name

    // Read configured port from metadata
    configuredPort := 0
    if portStr, ok := node.Status.Metadata["http-server-port"]; ok {
        configuredPort, _ = strconv.Atoi(portStr)
    }

    h.mu.Lock()
    defer h.mu.Unlock()

    // Already running on correct port
    if configuredPort == h.currentPort && h.currentPort > 0 {
        return nil
    }

    // No port assigned - leader starts and publishes
    if configuredPort == 0 {
        if utils.IsLeader(ctx) {
            return h.startAndPublish(ctx, output)
        }
        return nil
    }

    // Port assigned - all pods start
    return h.startOnPort(configuredPort)
}

func (h *HTTPServer) startAndPublish(ctx context.Context, output module.Handler) error {
    listener, err := net.Listen("tcp", ":0")
    if err != nil {
        return err
    }

    port := listener.Addr().(*net.TCPAddr).Port
    h.listener = listener
    h.currentPort = port

    // Publish port to metadata
    output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
        if node.Status.Metadata == nil {
            node.Status.Metadata = make(map[string]string)
        }
        node.Status.Metadata["http-server-port"] = strconv.Itoa(port)
    })

    return nil
}

Signal (Manual Trigger)

Sends message on button click:

go
type Signal struct {
    settings   Settings
    isSending  bool
    cancelFunc context.CancelFunc
    mu         sync.Mutex
}

type Settings struct {
    Context any `json:"context,omitempty" title:"Data to send"`
}

type Control struct {
    Send  bool `json:"send,omitempty" title:"Send" format:"button"`
    Reset bool `json:"reset,omitempty" title:"Reset" format:"button"`
}

func (s *Signal) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    if port == v1alpha1.ControlPort {
        if !utils.IsLeader(ctx) {
            return nil
        }

        control := msg.(Control)

        s.mu.Lock()
        if s.cancelFunc != nil {
            s.cancelFunc()
            s.cancelFunc = nil
        }

        if control.Send {
            ctx, s.cancelFunc = context.WithCancel(ctx)
            s.isSending = true
            s.mu.Unlock()

            output(context.Background(), v1alpha1.ReconcilePort, nil)

            err := output(ctx, "output", s.settings.Context)

            s.mu.Lock()
            s.isSending = false
            s.cancelFunc = nil
            s.mu.Unlock()

            output(context.Background(), v1alpha1.ReconcilePort, nil)
            return err
        }

        if control.Reset {
            s.isSending = false
            s.mu.Unlock()
            output(context.Background(), v1alpha1.ReconcilePort, nil)
            <-ctx.Done()
            return ctx.Err()
        }

        s.mu.Unlock()
    }
    return nil
}

func (s *Signal) getControl() Control {
    s.mu.Lock()
    defer s.mu.Unlock()
    return Control{
        Send:  !s.isSending,
        Reset: s.isSending,
    }
}

Sink Patterns

HTTP Client

Sends HTTP requests:

go
type HTTPClient struct {
    settings Settings
    client   *http.Client
}

type Settings struct {
    BaseURL string `json:"baseURL" title:"Base URL" format:"uri"`
    Timeout int    `json:"timeout" title:"Timeout (ms)" default:"30000"`
}

type Request struct {
    Method  string            `json:"method" enum:"GET,POST,PUT,DELETE"`
    Path    string            `json:"path"`
    Headers map[string]string `json:"headers,omitempty"`
    Body    any               `json:"body,omitempty"`
}

type Response struct {
    StatusCode int               `json:"statusCode"`
    Headers    map[string]string `json:"headers"`
    Body       any               `json:"body"`
}

func (c *HTTPClient) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    switch port {
    case v1alpha1.SettingsPort:
        c.settings = msg.(Settings)
        c.client = &http.Client{
            Timeout: time.Duration(c.settings.Timeout) * time.Millisecond,
        }
        return nil

    case "request":
        req := msg.(Request)
        resp, err := c.doRequest(ctx, req)
        if err != nil {
            return output(ctx, "error", map[string]string{"error": err.Error()})
        }
        return output(ctx, "response", resp)
    }
    return nil
}

func (c *HTTPClient) doRequest(ctx context.Context, req Request) (*Response, error) {
    url := c.settings.BaseURL + req.Path

    var body io.Reader
    if req.Body != nil {
        data, _ := json.Marshal(req.Body)
        body = bytes.NewReader(data)
    }

    httpReq, err := http.NewRequestWithContext(ctx, req.Method, url, body)
    if err != nil {
        return nil, err
    }

    for k, v := range req.Headers {
        httpReq.Header.Set(k, v)
    }

    resp, err := c.client.Do(httpReq)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    respBody, _ := io.ReadAll(resp.Body)

    return &Response{
        StatusCode: resp.StatusCode,
        Headers:    flattenHeaders(resp.Header),
        Body:       string(respBody),
    }, nil
}

func (c *HTTPClient) Ports() []module.Port {
    return []module.Port{
        {Name: v1alpha1.SettingsPort, Source: true, Position: module.PositionTop},
        {Name: "request", Label: "Request", Source: true, Position: module.PositionLeft},
        {Name: "response", Label: "Response", Source: false, Position: module.PositionRight},
        {Name: "error", Label: "Error", Source: false, Position: module.PositionBottom},
    }
}

Debug (Logger)

Logs messages for debugging:

go
type Debug struct {
    settings Settings
}

type Settings struct {
    Label string `json:"label" title:"Label" default:"debug"`
}

type Input struct {
    Data any `json:"data"`
}

func (d *Debug) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    switch port {
    case v1alpha1.SettingsPort:
        d.settings = msg.(Settings)
        return nil

    case "input":
        input := msg.(Input)
        log.Info("debug",
            "label", d.settings.Label,
            "data", input.Data,
        )
        // Pass through
        return output(ctx, "output", input)
    }
    return nil
}

Stateful Patterns

Counter

Maintains count across messages:

go
type Counter struct {
    count int64
    mu    sync.Mutex
}

func (c *Counter) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    switch port {
    case "increment":
        c.mu.Lock()
        c.count++
        count := c.count
        c.mu.Unlock()

        return output(ctx, "output", map[string]int64{"count": count})

    case "reset":
        c.mu.Lock()
        c.count = 0
        c.mu.Unlock()

        return output(ctx, "output", map[string]int64{"count": 0})

    case "get":
        c.mu.Lock()
        count := c.count
        c.mu.Unlock()

        return output(ctx, "output", map[string]int64{"count": count})
    }
    return nil
}

Cache

Caches values with TTL:

go
type Cache struct {
    settings Settings
    cache    map[string]cacheEntry
    mu       sync.RWMutex
}

type Settings struct {
    TTL int `json:"ttl" title:"TTL (seconds)" default:"300"`
}

type cacheEntry struct {
    Value     any
    ExpiresAt time.Time
}

func (c *Cache) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    switch port {
    case v1alpha1.SettingsPort:
        c.settings = msg.(Settings)
        if c.cache == nil {
            c.cache = make(map[string]cacheEntry)
        }
        return nil

    case "get":
        req := msg.(GetRequest)

        c.mu.RLock()
        entry, exists := c.cache[req.Key]
        c.mu.RUnlock()

        if exists && time.Now().Before(entry.ExpiresAt) {
            return output(ctx, "hit", CacheHit{Key: req.Key, Value: entry.Value})
        }
        return output(ctx, "miss", CacheMiss{Key: req.Key})

    case "set":
        req := msg.(SetRequest)

        c.mu.Lock()
        c.cache[req.Key] = cacheEntry{
            Value:     req.Value,
            ExpiresAt: time.Now().Add(time.Duration(c.settings.TTL) * time.Second),
        }
        c.mu.Unlock()

        return nil
    }
    return nil
}

Best Practices Summary

PatternKey Points
TransformerStateless, settings-configurable
FilterTwo outputs: match/no_match
RouterDynamic ports from settings
SplitterRespects context cancellation
AggregatorThread-safe state, flush port
TickerLeader-only, cancelable loop
HTTP ServerCR-based port coordination
SignalBlocking send, cancellation
HTTP ClientError port, timeout handling
CacheThread-safe, TTL expiration

Next Steps

Build flow-based applications on Kubernetes