Skip to content

Control Ports

Control ports enable UI interaction with components through buttons and actions in the visual editor. They are essential for components that need user-triggered operations.

Control Port Overview

┌─────────────────────────────────────────────────────────────────────────────┐
│                         CONTROL PORT FLOW                                    │
└─────────────────────────────────────────────────────────────────────────────┘

    ┌───────────────┐        ┌───────────────┐        ┌───────────────┐
    │  Visual UI    │        │  TinySignal   │        │   Component   │
    │               │───────▶│               │───────▶│               │
    │ User clicks   │ Create │  CR created   │ Deliver │  Handle()    │
    │ button        │        │  in K8s       │        │  _control    │
    └───────────────┘        └───────────────┘        └───────────────┘

Defining Control Ports

go
type Control struct {
    Start bool `json:"start,omitempty" title:"Start" format:"button"`
    Stop  bool `json:"stop,omitempty" title:"Stop" format:"button"`
}

func (c *Ticker) Ports() []module.Port {
    return []module.Port{
        {
            Name:     v1alpha1.ControlPort,  // "_control"
            Label:    "Control",
            Source:   true,
            Position: module.PositionTop,
            Schema:   schema.FromGo(c.getControl()),
        },
        // Other ports...
    }
}

func (c *Ticker) getControl() Control {
    c.mu.Lock()
    defer c.mu.Unlock()

    return Control{
        // Only show relevant buttons based on state
        Start: !c.isRunning,
        Stop:  c.isRunning,
    }
}

Button Format

The format:"button" tag creates a clickable button in the UI:

go
type Control struct {
    // Shows as a button
    Send bool `json:"send" title:"Send" format:"button"`

    // Shows as a button with description
    Reset bool `json:"reset" title:"Reset" format:"button" description:"Clear all data"`
}

Button Visibility

Buttons appear based on their value:

go
func (c *Component) getControl() Control {
    return Control{
        Start: true,   // Button visible and enabled
        Stop:  false,  // Button hidden
    }
}

Handling Control Messages

Basic Pattern

go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    if port == v1alpha1.ControlPort {
        control := msg.(Control)

        if control.Start {
            return c.start(ctx, output)
        }
        if control.Stop {
            return c.stop(ctx, output)
        }
    }
    return nil
}

Leader-Only Control

Control actions typically should only run on the leader pod:

go
func (c *Ticker) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    if port == v1alpha1.ControlPort {
        // Only leader processes control actions
        if !utils.IsLeader(ctx) {
            return nil
        }

        control := msg.(Control)

        if control.Start {
            return c.startTicking(ctx, output)
        }
        if control.Stop {
            c.stopTicking()
            return nil
        }
    }
    return nil
}

Complete Ticker Example

A component that periodically emits messages:

go
package ticker

import (
    "context"
    "sync"
    "time"

    "github.com/tiny-systems/module/api/v1alpha1"
    "github.com/tiny-systems/module/module"
    "github.com/tiny-systems/module/pkg/schema"
    "github.com/tiny-systems/module/pkg/utils"
)

type Settings struct {
    Delay   int `json:"delay" title:"Delay (ms)" default:"1000" minimum:"100"`
    Context any `json:"context,omitempty" title:"Context" description:"Data to emit"`
}

type Control struct {
    Start bool `json:"start,omitempty" title:"Start" format:"button"`
    Stop  bool `json:"stop,omitempty" title:"Stop" format:"button"`
}

type Ticker struct {
    settings   Settings
    cancelFunc context.CancelFunc
    isRunning  bool
    mu         sync.Mutex
}

func (t *Ticker) GetInfo() module.Info {
    return module.Info{
        Name:        "ticker",
        Description: "Emits messages at regular intervals",
        Icon:        "IconClock",
        Tags:        []string{"utility", "timer"},
    }
}

func (t *Ticker) Ports() []module.Port {
    return []module.Port{
        {
            Name:     v1alpha1.SettingsPort,
            Label:    "Settings",
            Source:   true,
            Position: module.PositionTop,
            Schema:   schema.FromGo(Settings{}),
        },
        {
            Name:     v1alpha1.ControlPort,
            Label:    "Control",
            Source:   true,
            Position: module.PositionTop,
            Schema:   schema.FromGo(t.getControl()),
        },
        {
            Name:     "output",
            Label:    "Tick",
            Source:   false,
            Position: module.PositionRight,
            Schema:   schema.FromGo(any(nil)),
        },
    }
}

func (t *Ticker) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    switch port {
    case v1alpha1.SettingsPort:
        t.settings = msg.(Settings)
        return nil

    case v1alpha1.ControlPort:
        // Only leader handles control
        if !utils.IsLeader(ctx) {
            return nil
        }

        control := msg.(Control)

        t.mu.Lock()
        defer t.mu.Unlock()

        if control.Start && !t.isRunning {
            ctx, t.cancelFunc = context.WithCancel(ctx)
            t.isRunning = true

            // Update UI to show Stop button
            output(context.Background(), v1alpha1.ReconcilePort, nil)

            // Start emitting (blocks until stopped)
            return t.emit(ctx, output)
        }

        if control.Stop && t.isRunning {
            if t.cancelFunc != nil {
                t.cancelFunc()
            }
            t.isRunning = false

            // Update UI to show Start button
            output(context.Background(), v1alpha1.ReconcilePort, nil)
        }

        return nil
    }
    return nil
}

func (t *Ticker) emit(ctx context.Context, output module.Handler) error {
    timer := time.NewTimer(time.Duration(t.settings.Delay) * time.Millisecond)
    defer timer.Stop()

    for {
        select {
        case <-timer.C:
            // Emit the configured context
            output(ctx, "output", t.settings.Context)
            timer.Reset(time.Duration(t.settings.Delay) * time.Millisecond)

        case <-ctx.Done():
            t.mu.Lock()
            t.isRunning = false
            t.mu.Unlock()
            return ctx.Err()
        }
    }
}

func (t *Ticker) getControl() Control {
    t.mu.Lock()
    defer t.mu.Unlock()

    return Control{
        Start: !t.isRunning,
        Stop:  t.isRunning,
    }
}

func (t *Ticker) Instance() module.Component {
    return &Ticker{}
}

Signal Component Example

A component that sends a single message on button click:

go
package signal

type Control struct {
    Send  bool `json:"send,omitempty" title:"Send" format:"button"`
    Reset bool `json:"reset,omitempty" title:"Reset" format:"button"`
}

type Signal struct {
    settings   Settings
    cancelFunc context.CancelFunc
    isSending  bool
    mu         sync.Mutex
}

func (s *Signal) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    if port == v1alpha1.ControlPort {
        if !utils.IsLeader(ctx) {
            return nil
        }

        control := msg.(Control)

        s.mu.Lock()

        // Cancel any in-progress send
        if s.cancelFunc != nil {
            s.cancelFunc()
            s.cancelFunc = nil
        }

        if control.Reset {
            s.isSending = false
            s.mu.Unlock()
            output(context.Background(), v1alpha1.ReconcilePort, nil)
            <-ctx.Done()
            return ctx.Err()
        }

        if control.Send {
            ctx, s.cancelFunc = context.WithCancel(ctx)
            s.isSending = true
            s.mu.Unlock()

            // Update UI
            output(context.Background(), v1alpha1.ReconcilePort, nil)

            // Send message (blocks until downstream completes)
            err := output(ctx, "output", s.settings.Context)

            // Send complete
            s.mu.Lock()
            s.isSending = false
            s.cancelFunc = nil
            s.mu.Unlock()

            output(context.Background(), v1alpha1.ReconcilePort, nil)
            return err
        }

        s.mu.Unlock()
    }
    return nil
}

func (s *Signal) getControl() Control {
    s.mu.Lock()
    defer s.mu.Unlock()

    return Control{
        Send:  !s.isSending,
        Reset: s.isSending,
    }
}

Updating UI State

After control actions, update the UI by sending to the reconcile port:

go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    if port == v1alpha1.ControlPort {
        control := msg.(Control)

        if control.Start {
            c.isRunning = true

            // Trigger UI update
            // The nil message tells the system to re-fetch port schemas
            output(context.Background(), v1alpha1.ReconcilePort, nil)

            return c.run(ctx, output)
        }
    }
    return nil
}

Control with Parameters

Controls can include input parameters:

go
type Control struct {
    // Button with no parameters
    Start bool `json:"start,omitempty" title:"Start" format:"button"`

    // Button with associated data
    SendCustom bool   `json:"sendCustom,omitempty" title:"Send Custom" format:"button"`
    CustomData string `json:"customData,omitempty" title:"Custom Data"`
}

func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    if port == v1alpha1.ControlPort {
        control := msg.(Control)

        if control.SendCustom {
            return output(ctx, "output", control.CustomData)
        }
    }
    return nil
}

Best Practices

1. Leader-Only for State Changes

go
if port == v1alpha1.ControlPort {
    if !utils.IsLeader(ctx) {
        return nil  // Skip on non-leader pods
    }
    // Process control
}

2. Update UI After State Changes

go
c.isRunning = true
output(context.Background(), v1alpha1.ReconcilePort, nil)  // Update UI

3. Handle Cancellation

go
if control.Stop {
    if c.cancelFunc != nil {
        c.cancelFunc()
    }
}

4. Thread-Safe State

go
func (c *Component) getControl() Control {
    c.mu.Lock()
    defer c.mu.Unlock()
    return Control{Start: !c.isRunning, Stop: c.isRunning}
}

5. Clear Button Labels

go
type Control struct {
    // Good: Clear action
    StartProcessing bool `json:"start" title:"Start Processing" format:"button"`

    // Bad: Unclear
    Go bool `json:"go" title:"Go" format:"button"`
}

Next Steps

Build flow-based applications on Kubernetes