Component Patterns
This guide covers common patterns for building TinySystems components. These patterns have been proven in production modules.
Pattern Categories
- Processing Patterns - Transform, filter, route data
- Source Patterns - Generate data, respond to events
- Sink Patterns - Send data to external systems
- Stateful Patterns - Manage component state
Processing Patterns
Transformer
Converts input to output format:
go
type Transformer struct {
settings Settings
}
type Settings struct {
Format string `json:"format" title:"Output Format" enum:"json,xml,csv"`
}
type Input struct {
Data any `json:"data"`
}
type Output struct {
Result string `json:"result"`
}
func (t *Transformer) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
switch port {
case v1alpha1.SettingsPort:
t.settings = msg.(Settings)
return nil
case "input":
input := msg.(Input)
result, err := t.transform(input.Data, t.settings.Format)
if err != nil {
return err
}
return output(ctx, "output", Output{Result: result})
}
return nil
}Filter
Routes messages based on condition:
go
type Filter struct {
settings Settings
}
type Settings struct {
Condition string `json:"condition" title:"Filter Expression" required:"true"`
}
type Message struct {
Data any `json:"data"`
}
func (f *Filter) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
if port == "input" {
message := msg.(Message)
match, err := f.evaluate(message.Data, f.settings.Condition)
if err != nil {
return output(ctx, "error", err.Error())
}
if match {
return output(ctx, "match", message)
}
return output(ctx, "no_match", message)
}
return nil
}
func (f *Filter) Ports() []module.Port {
return []module.Port{
{Name: "input", Label: "Input", Source: true, Position: module.PositionLeft},
{Name: "match", Label: "Match", Source: false, Position: module.PositionRight},
{Name: "no_match", Label: "No Match", Source: false, Position: module.PositionBottom},
{Name: "error", Label: "Error", Source: false, Position: module.PositionBottom},
}
}Router
Routes to multiple outputs based on key:
go
type Router struct {
settings Settings
}
type Settings struct {
Field string `json:"field" title:"Routing Field" required:"true"`
Routes []string `json:"routes" title:"Route Names" minItems:"1"`
}
func (r *Router) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
if port == "input" {
message := msg.(map[string]any)
routeKey := fmt.Sprintf("%v", message[r.settings.Field])
// Check if route exists
for _, route := range r.settings.Routes {
if route == routeKey {
return output(ctx, route, message)
}
}
// Default route
return output(ctx, "default", message)
}
return nil
}
func (r *Router) Ports() []module.Port {
ports := []module.Port{
{Name: v1alpha1.SettingsPort, Source: true, Position: module.PositionTop},
{Name: "input", Label: "Input", Source: true, Position: module.PositionLeft},
{Name: "default", Label: "Default", Source: false, Position: module.PositionBottom},
}
// Dynamic ports from settings
for _, route := range r.settings.Routes {
ports = append(ports, module.Port{
Name: route,
Label: route,
Source: false,
Position: module.PositionRight,
})
}
return ports
}Splitter
Splits arrays into individual items:
go
type Splitter struct{}
type Input struct {
Items []any `json:"items"`
}
type Output struct {
Item any `json:"item"`
Index int `json:"index"`
Total int `json:"total"`
}
func (s *Splitter) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
if port == "input" {
input := msg.(Input)
total := len(input.Items)
for i, item := range input.Items {
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := output(ctx, "item", Output{
Item: item,
Index: i,
Total: total,
}); err != nil {
return err
}
}
}
// Signal completion
return output(ctx, "done", struct{ Total int }{Total: total})
}
return nil
}Aggregator
Collects items and emits batches:
go
type Aggregator struct {
settings Settings
items []any
mu sync.Mutex
}
type Settings struct {
BatchSize int `json:"batchSize" title:"Batch Size" default:"10" minimum:"1"`
}
func (a *Aggregator) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
switch port {
case v1alpha1.SettingsPort:
a.settings = msg.(Settings)
return nil
case "item":
a.mu.Lock()
a.items = append(a.items, msg)
shouldFlush := len(a.items) >= a.settings.BatchSize
var batch []any
if shouldFlush {
batch = a.items
a.items = nil
}
a.mu.Unlock()
if shouldFlush {
return output(ctx, "batch", batch)
}
return nil
case "flush":
a.mu.Lock()
batch := a.items
a.items = nil
a.mu.Unlock()
if len(batch) > 0 {
return output(ctx, "batch", batch)
}
return nil
}
return nil
}Source Patterns
Ticker
Emits messages at intervals:
go
type Ticker struct {
settings Settings
isRunning bool
cancelFunc context.CancelFunc
mu sync.Mutex
}
type Settings struct {
Interval int `json:"interval" title:"Interval (ms)" default:"1000"`
Context any `json:"context,omitempty" title:"Data to emit"`
}
type Control struct {
Start bool `json:"start,omitempty" title:"Start" format:"button"`
Stop bool `json:"stop,omitempty" title:"Stop" format:"button"`
}
func (t *Ticker) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
switch port {
case v1alpha1.SettingsPort:
t.settings = msg.(Settings)
return nil
case v1alpha1.ControlPort:
if !utils.IsLeader(ctx) {
return nil
}
control := msg.(Control)
t.mu.Lock()
defer t.mu.Unlock()
if control.Start && !t.isRunning {
ctx, t.cancelFunc = context.WithCancel(ctx)
t.isRunning = true
output(context.Background(), v1alpha1.ReconcilePort, nil)
// Blocking loop
return t.tick(ctx, output)
}
if control.Stop && t.isRunning {
t.cancelFunc()
t.isRunning = false
output(context.Background(), v1alpha1.ReconcilePort, nil)
}
return nil
}
return nil
}
func (t *Ticker) tick(ctx context.Context, output module.Handler) error {
ticker := time.NewTicker(time.Duration(t.settings.Interval) * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
output(ctx, "tick", t.settings.Context)
case <-ctx.Done():
t.mu.Lock()
t.isRunning = false
t.mu.Unlock()
return ctx.Err()
}
}
}HTTP Server
Receives HTTP requests:
go
type HTTPServer struct {
settings Settings
nodeName string
currentPort int
listener net.Listener
mu sync.Mutex
}
type Settings struct {
Hostnames []string `json:"hostnames" title:"Hostnames"`
}
func (h *HTTPServer) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
switch port {
case v1alpha1.SettingsPort:
h.settings = msg.(Settings)
return nil
case v1alpha1.ReconcilePort:
node, ok := msg.(v1alpha1.TinyNode)
if !ok {
return nil
}
return h.handleReconcile(ctx, output, node)
case v1alpha1.ClientPort:
// Forward request to flow
req := msg.(Request)
result := output(ctx, "request", req)
return result
}
return nil
}
func (h *HTTPServer) handleReconcile(ctx context.Context, output module.Handler, node v1alpha1.TinyNode) error {
h.nodeName = node.Name
// Read configured port from metadata
configuredPort := 0
if portStr, ok := node.Status.Metadata["http-server-port"]; ok {
configuredPort, _ = strconv.Atoi(portStr)
}
h.mu.Lock()
defer h.mu.Unlock()
// Already running on correct port
if configuredPort == h.currentPort && h.currentPort > 0 {
return nil
}
// No port assigned - leader starts and publishes
if configuredPort == 0 {
if utils.IsLeader(ctx) {
return h.startAndPublish(ctx, output)
}
return nil
}
// Port assigned - all pods start
return h.startOnPort(configuredPort)
}
func (h *HTTPServer) startAndPublish(ctx context.Context, output module.Handler) error {
listener, err := net.Listen("tcp", ":0")
if err != nil {
return err
}
port := listener.Addr().(*net.TCPAddr).Port
h.listener = listener
h.currentPort = port
// Publish port to metadata
output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
if node.Status.Metadata == nil {
node.Status.Metadata = make(map[string]string)
}
node.Status.Metadata["http-server-port"] = strconv.Itoa(port)
})
return nil
}Signal (Manual Trigger)
Sends message on button click:
go
type Signal struct {
settings Settings
isSending bool
cancelFunc context.CancelFunc
mu sync.Mutex
}
type Settings struct {
Context any `json:"context,omitempty" title:"Data to send"`
}
type Control struct {
Send bool `json:"send,omitempty" title:"Send" format:"button"`
Reset bool `json:"reset,omitempty" title:"Reset" format:"button"`
}
func (s *Signal) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
if port == v1alpha1.ControlPort {
if !utils.IsLeader(ctx) {
return nil
}
control := msg.(Control)
s.mu.Lock()
if s.cancelFunc != nil {
s.cancelFunc()
s.cancelFunc = nil
}
if control.Send {
ctx, s.cancelFunc = context.WithCancel(ctx)
s.isSending = true
s.mu.Unlock()
output(context.Background(), v1alpha1.ReconcilePort, nil)
err := output(ctx, "output", s.settings.Context)
s.mu.Lock()
s.isSending = false
s.cancelFunc = nil
s.mu.Unlock()
output(context.Background(), v1alpha1.ReconcilePort, nil)
return err
}
if control.Reset {
s.isSending = false
s.mu.Unlock()
output(context.Background(), v1alpha1.ReconcilePort, nil)
<-ctx.Done()
return ctx.Err()
}
s.mu.Unlock()
}
return nil
}
func (s *Signal) getControl() Control {
s.mu.Lock()
defer s.mu.Unlock()
return Control{
Send: !s.isSending,
Reset: s.isSending,
}
}Sink Patterns
HTTP Client
Sends HTTP requests:
go
type HTTPClient struct {
settings Settings
client *http.Client
}
type Settings struct {
BaseURL string `json:"baseURL" title:"Base URL" format:"uri"`
Timeout int `json:"timeout" title:"Timeout (ms)" default:"30000"`
}
type Request struct {
Method string `json:"method" enum:"GET,POST,PUT,DELETE"`
Path string `json:"path"`
Headers map[string]string `json:"headers,omitempty"`
Body any `json:"body,omitempty"`
}
type Response struct {
StatusCode int `json:"statusCode"`
Headers map[string]string `json:"headers"`
Body any `json:"body"`
}
func (c *HTTPClient) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
switch port {
case v1alpha1.SettingsPort:
c.settings = msg.(Settings)
c.client = &http.Client{
Timeout: time.Duration(c.settings.Timeout) * time.Millisecond,
}
return nil
case "request":
req := msg.(Request)
resp, err := c.doRequest(ctx, req)
if err != nil {
return output(ctx, "error", map[string]string{"error": err.Error()})
}
return output(ctx, "response", resp)
}
return nil
}
func (c *HTTPClient) doRequest(ctx context.Context, req Request) (*Response, error) {
url := c.settings.BaseURL + req.Path
var body io.Reader
if req.Body != nil {
data, _ := json.Marshal(req.Body)
body = bytes.NewReader(data)
}
httpReq, err := http.NewRequestWithContext(ctx, req.Method, url, body)
if err != nil {
return nil, err
}
for k, v := range req.Headers {
httpReq.Header.Set(k, v)
}
resp, err := c.client.Do(httpReq)
if err != nil {
return nil, err
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
return &Response{
StatusCode: resp.StatusCode,
Headers: flattenHeaders(resp.Header),
Body: string(respBody),
}, nil
}
func (c *HTTPClient) Ports() []module.Port {
return []module.Port{
{Name: v1alpha1.SettingsPort, Source: true, Position: module.PositionTop},
{Name: "request", Label: "Request", Source: true, Position: module.PositionLeft},
{Name: "response", Label: "Response", Source: false, Position: module.PositionRight},
{Name: "error", Label: "Error", Source: false, Position: module.PositionBottom},
}
}Debug (Logger)
Logs messages for debugging:
go
type Debug struct {
settings Settings
}
type Settings struct {
Label string `json:"label" title:"Label" default:"debug"`
}
type Input struct {
Data any `json:"data"`
}
func (d *Debug) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
switch port {
case v1alpha1.SettingsPort:
d.settings = msg.(Settings)
return nil
case "input":
input := msg.(Input)
log.Info("debug",
"label", d.settings.Label,
"data", input.Data,
)
// Pass through
return output(ctx, "output", input)
}
return nil
}Stateful Patterns
Counter
Maintains count across messages:
go
type Counter struct {
count int64
mu sync.Mutex
}
func (c *Counter) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
switch port {
case "increment":
c.mu.Lock()
c.count++
count := c.count
c.mu.Unlock()
return output(ctx, "output", map[string]int64{"count": count})
case "reset":
c.mu.Lock()
c.count = 0
c.mu.Unlock()
return output(ctx, "output", map[string]int64{"count": 0})
case "get":
c.mu.Lock()
count := c.count
c.mu.Unlock()
return output(ctx, "output", map[string]int64{"count": count})
}
return nil
}Cache
Caches values with TTL:
go
type Cache struct {
settings Settings
cache map[string]cacheEntry
mu sync.RWMutex
}
type Settings struct {
TTL int `json:"ttl" title:"TTL (seconds)" default:"300"`
}
type cacheEntry struct {
Value any
ExpiresAt time.Time
}
func (c *Cache) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
switch port {
case v1alpha1.SettingsPort:
c.settings = msg.(Settings)
if c.cache == nil {
c.cache = make(map[string]cacheEntry)
}
return nil
case "get":
req := msg.(GetRequest)
c.mu.RLock()
entry, exists := c.cache[req.Key]
c.mu.RUnlock()
if exists && time.Now().Before(entry.ExpiresAt) {
return output(ctx, "hit", CacheHit{Key: req.Key, Value: entry.Value})
}
return output(ctx, "miss", CacheMiss{Key: req.Key})
case "set":
req := msg.(SetRequest)
c.mu.Lock()
c.cache[req.Key] = cacheEntry{
Value: req.Value,
ExpiresAt: time.Now().Add(time.Duration(c.settings.TTL) * time.Second),
}
c.mu.Unlock()
return nil
}
return nil
}Best Practices Summary
| Pattern | Key Points |
|---|---|
| Transformer | Stateless, settings-configurable |
| Filter | Two outputs: match/no_match |
| Router | Dynamic ports from settings |
| Splitter | Respects context cancellation |
| Aggregator | Thread-safe state, flush port |
| Ticker | Leader-only, cancelable loop |
| HTTP Server | CR-based port coordination |
| Signal | Blocking send, cancellation |
| HTTP Client | Error port, timeout handling |
| Cache | Thread-safe, TTL expiration |
Next Steps
- Component Interface - Interface details
- System Ports - System port reference
- Multi-Replica Coordination - Scaling patterns