Skip to content

Array Iterator Component

A complete example of an array processing component with batch capabilities.

Overview

This component iterates over arrays, emitting each item individually or in batches. It demonstrates:

  • Array processing
  • Batch operations
  • Progress tracking
  • Completion signals

Complete Implementation

go
package iterator

import (
    "context"
    "fmt"

    "github.com/tiny-systems/module/api/v1alpha1"
    "github.com/tiny-systems/module/pkg/module"
)

const ComponentName = "array_iterator"

type Component struct {
    settings Settings
}

// Settings configuration
type Settings struct {
    BatchSize     int  `json:"batchSize" title:"Batch Size" default:"1" minimum:"1"
        description:"Number of items to emit per iteration (1 = single item)"`
    EmitIndex     bool `json:"emitIndex" title:"Include Index" default:"true"
        description:"Include the array index in output"`
    EmitProgress  bool `json:"emitProgress" title:"Include Progress" default:"true"
        description:"Include progress information (current/total)"`
    ContinueOnError bool `json:"continueOnError" title:"Continue on Error" default:"true"
        description:"Continue iteration if downstream processing fails"`
}

// Input containing the array
type Input struct {
    Array    []any          `json:"array" title:"Array" required:"true" configurable:"true"
        description:"Array to iterate over"`
    Context  map[string]any `json:"context,omitempty" title:"Context"
        description:"Additional context passed to each item"`
}

// ItemOutput for single items
type ItemOutput struct {
    Item     any            `json:"item" title:"Item"`
    Index    int            `json:"index,omitempty" title:"Index"`
    Total    int            `json:"total,omitempty" title:"Total Items"`
    Progress float64        `json:"progress,omitempty" title:"Progress (%)"`
    IsFirst  bool           `json:"isFirst" title:"Is First Item"`
    IsLast   bool           `json:"isLast" title:"Is Last Item"`
    Context  map[string]any `json:"context,omitempty" title:"Context"`
}

// BatchOutput for batch mode
type BatchOutput struct {
    Items      []any          `json:"items" title:"Items"`
    StartIndex int            `json:"startIndex" title:"Start Index"`
    EndIndex   int            `json:"endIndex" title:"End Index"`
    Total      int            `json:"total" title:"Total Items"`
    BatchNum   int            `json:"batchNum" title:"Batch Number"`
    TotalBatches int          `json:"totalBatches" title:"Total Batches"`
    IsLastBatch bool          `json:"isLastBatch" title:"Is Last Batch"`
    Context    map[string]any `json:"context,omitempty" title:"Context"`
}

// CompletionOutput sent after all items
type CompletionOutput struct {
    TotalProcessed int            `json:"totalProcessed" title:"Total Processed"`
    Errors         int            `json:"errors" title:"Error Count"`
    Context        map[string]any `json:"context,omitempty" title:"Context"`
}

// EmptyOutput for empty arrays
type EmptyOutput struct {
    Context map[string]any `json:"context,omitempty" title:"Context"`
}

var _ module.Component = (*Component)(nil)

func (c *Component) GetInfo() module.ComponentInfo {
    return module.ComponentInfo{
        Name:        ComponentName,
        Title:       "Array Iterator",
        Description: "Iterates over arrays, emitting items individually or in batches",
        Category:    "Flow Control",
        Tags:        []string{"array", "loop", "iterate", "batch"},
    }
}

func (c *Component) Ports() []module.Port {
    return []module.Port{
        {
            Name:          v1alpha1.SettingsPort,
            Label:         "Settings",
            Position:      module.PositionTop,
            Source:        true,
            Configuration: Settings{},
        },
        {
            Name:          "input",
            Label:         "Input",
            Position:      module.PositionLeft,
            Source:        true,
            Configuration: Input{},
        },
        {
            Name:          "item",
            Label:         "Item",
            Position:      module.PositionRight,
            Source:        false,
            Configuration: ItemOutput{},
        },
        {
            Name:          "batch",
            Label:         "Batch",
            Position:      module.PositionRight,
            Source:        false,
            Configuration: BatchOutput{},
        },
        {
            Name:          "complete",
            Label:         "Complete",
            Position:      module.PositionBottom,
            Source:        false,
            Configuration: CompletionOutput{},
        },
        {
            Name:          "empty",
            Label:         "Empty",
            Position:      module.PositionBottom,
            Source:        false,
            Configuration: EmptyOutput{},
        },
    }
}

func (c *Component) Handle(
    ctx context.Context,
    output module.Handler,
    port string,
    msg any,
) error {
    switch port {
    case v1alpha1.SettingsPort:
        c.settings = msg.(Settings)
        if c.settings.BatchSize < 1 {
            c.settings.BatchSize = 1
        }
        return nil

    case "input":
        return c.handleInput(ctx, output, msg.(Input))

    default:
        return fmt.Errorf("unknown port: %s", port)
    }
}

func (c *Component) handleInput(
    ctx context.Context,
    output module.Handler,
    input Input,
) error {
    // Handle empty array
    if len(input.Array) == 0 {
        return output(ctx, "empty", EmptyOutput{
            Context: input.Context,
        })
    }

    total := len(input.Array)
    errorCount := 0

    // Batch mode
    if c.settings.BatchSize > 1 {
        return c.processBatches(ctx, output, input, &errorCount)
    }

    // Single item mode
    for i, item := range input.Array {
        itemOutput := ItemOutput{
            Item:    item,
            IsFirst: i == 0,
            IsLast:  i == total-1,
            Context: input.Context,
        }

        if c.settings.EmitIndex {
            itemOutput.Index = i
        }

        if c.settings.EmitProgress {
            itemOutput.Total = total
            itemOutput.Progress = float64(i+1) / float64(total) * 100
        }

        err := output(ctx, "item", itemOutput)
        if err != nil {
            errorCount++
            if !c.settings.ContinueOnError {
                return err
            }
        }
    }

    // Send completion
    return output(ctx, "complete", CompletionOutput{
        TotalProcessed: total,
        Errors:         errorCount,
        Context:        input.Context,
    })
}

func (c *Component) processBatches(
    ctx context.Context,
    output module.Handler,
    input Input,
    errorCount *int,
) error {
    total := len(input.Array)
    batchSize := c.settings.BatchSize
    totalBatches := (total + batchSize - 1) / batchSize

    for batchNum := 0; batchNum < totalBatches; batchNum++ {
        startIndex := batchNum * batchSize
        endIndex := startIndex + batchSize
        if endIndex > total {
            endIndex = total
        }

        batch := input.Array[startIndex:endIndex]

        batchOutput := BatchOutput{
            Items:        batch,
            StartIndex:   startIndex,
            EndIndex:     endIndex - 1,
            Total:        total,
            BatchNum:     batchNum + 1,
            TotalBatches: totalBatches,
            IsLastBatch:  batchNum == totalBatches-1,
            Context:      input.Context,
        }

        err := output(ctx, "batch", batchOutput)
        if err != nil {
            *errorCount++
            if !c.settings.ContinueOnError {
                return err
            }
        }
    }

    // Send completion
    return output(ctx, "complete", CompletionOutput{
        TotalProcessed: total,
        Errors:         *errorCount,
        Context:        input.Context,
    })
}

func (c *Component) Instance() module.Component {
    return &Component{}
}

Usage Examples

Single Item Mode

yaml
edges:
  - port: _settings
    data:
      batchSize: 1
      emitIndex: true
      emitProgress: true

Input:

json
{
  "array": ["apple", "banana", "cherry"],
  "context": {"source": "fruit_list"}
}

Outputs (3 messages):

json
// Message 1
{
  "item": "apple",
  "index": 0,
  "total": 3,
  "progress": 33.33,
  "isFirst": true,
  "isLast": false,
  "context": {"source": "fruit_list"}
}

// Message 2
{
  "item": "banana",
  "index": 1,
  "total": 3,
  "progress": 66.67,
  "isFirst": false,
  "isLast": false,
  "context": {"source": "fruit_list"}
}

// Message 3
{
  "item": "cherry",
  "index": 2,
  "total": 3,
  "progress": 100,
  "isFirst": false,
  "isLast": true,
  "context": {"source": "fruit_list"}
}

// Completion
{
  "totalProcessed": 3,
  "errors": 0,
  "context": {"source": "fruit_list"}
}

Batch Mode

yaml
edges:
  - port: _settings
    data:
      batchSize: 2

Input:

json
{
  "array": [1, 2, 3, 4, 5]
}

Outputs (3 messages):

json
// Batch 1
{
  "items": [1, 2],
  "startIndex": 0,
  "endIndex": 1,
  "total": 5,
  "batchNum": 1,
  "totalBatches": 3,
  "isLastBatch": false
}

// Batch 2
{
  "items": [3, 4],
  "startIndex": 2,
  "endIndex": 3,
  "total": 5,
  "batchNum": 2,
  "totalBatches": 3,
  "isLastBatch": false
}

// Batch 3
{
  "items": [5],
  "startIndex": 4,
  "endIndex": 4,
  "total": 5,
  "batchNum": 3,
  "totalBatches": 3,
  "isLastBatch": true
}

Visual Flow

┌─────────────────────────────────────────────────────────┐
│                    Array Iterator                        │
│                                                          │
│  Input Array: [A, B, C, D, E]                           │
│  Batch Size: 2                                          │
│                                                          │
│  ┌─────────────────────────────────────────────────┐    │
│  │ Batch 1: [A, B] ─────────────────────────────►  │────┼───► Batch Port
│  │ Batch 2: [C, D] ─────────────────────────────►  │    │
│  │ Batch 3: [E]    ─────────────────────────────►  │    │
│  └─────────────────────────────────────────────────┘    │
│                                                          │
│  After all batches: ────────────────────────────────────┼───► Complete Port
└─────────────────────────────────────────────────────────┘

Key Patterns Demonstrated

1. Multiple Output Types

Different output ports for different scenarios:

go
"item"     // Single item processing
"batch"    // Batch processing
"complete" // All items processed
"empty"    // Empty array handling

2. Context Propagation

Pass context through the iteration:

go
type Input struct {
    Array   []any          `json:"array"`
    Context map[string]any `json:"context,omitempty"`
}

This allows correlating items with their source.

3. Progress Tracking

Include progress information for UI/monitoring:

go
if c.settings.EmitProgress {
    itemOutput.Total = total
    itemOutput.Progress = float64(i+1) / float64(total) * 100
}

4. Error Resilience

Continue processing despite downstream errors:

go
if err != nil {
    errorCount++
    if !c.settings.ContinueOnError {
        return err
    }
}

5. Blocking Iteration

The iterator blocks on each output() call:

go
for i, item := range input.Array {
    err := output(ctx, "item", itemOutput)  // Blocks until downstream completes
    // ...
}

Common Use Cases

1. Processing API Results

yaml
# Iterate over API response items
input:
  array: "{{$.response.data.items}}"
  context:
    apiEndpoint: "{{$.request.url}}"

2. Batch Database Inserts

yaml
settings:
  batchSize: 100
  continueOnError: true

# Each batch goes to database insert component

3. Parallel Processing with Aggregation

                    ┌─────────────┐
Array ─────────────►│  Iterator   │─────► Item Processing ─────┐
                    └─────────────┘                             │
                           │                                    │
                           │ complete                           │
                           ▼                                    │
                    ┌─────────────┐                             │
                    │  Aggregator │◄────────────────────────────┘
                    └─────────────┘

Extension Ideas

  1. Parallel Iteration: Process multiple items concurrently
  2. Filtering: Skip items based on conditions
  3. Mapping: Transform items during iteration
  4. Chunking: Split array based on custom logic (not just size)
  5. Async Mode: Non-blocking iteration with callbacks

Build flow-based applications on Kubernetes