Array Iterator Component
A complete example of an array processing component with batch capabilities.
Overview
This component iterates over arrays, emitting each item individually or in batches. It demonstrates:
- Array processing
- Batch operations
- Progress tracking
- Completion signals
Complete Implementation
go
package iterator
import (
"context"
"fmt"
"github.com/tiny-systems/module/api/v1alpha1"
"github.com/tiny-systems/module/pkg/module"
)
const ComponentName = "array_iterator"
type Component struct {
settings Settings
}
// Settings configuration
type Settings struct {
BatchSize int `json:"batchSize" title:"Batch Size" default:"1" minimum:"1"
description:"Number of items to emit per iteration (1 = single item)"`
EmitIndex bool `json:"emitIndex" title:"Include Index" default:"true"
description:"Include the array index in output"`
EmitProgress bool `json:"emitProgress" title:"Include Progress" default:"true"
description:"Include progress information (current/total)"`
ContinueOnError bool `json:"continueOnError" title:"Continue on Error" default:"true"
description:"Continue iteration if downstream processing fails"`
}
// Input containing the array
type Input struct {
Array []any `json:"array" title:"Array" required:"true" configurable:"true"
description:"Array to iterate over"`
Context map[string]any `json:"context,omitempty" title:"Context"
description:"Additional context passed to each item"`
}
// ItemOutput for single items
type ItemOutput struct {
Item any `json:"item" title:"Item"`
Index int `json:"index,omitempty" title:"Index"`
Total int `json:"total,omitempty" title:"Total Items"`
Progress float64 `json:"progress,omitempty" title:"Progress (%)"`
IsFirst bool `json:"isFirst" title:"Is First Item"`
IsLast bool `json:"isLast" title:"Is Last Item"`
Context map[string]any `json:"context,omitempty" title:"Context"`
}
// BatchOutput for batch mode
type BatchOutput struct {
Items []any `json:"items" title:"Items"`
StartIndex int `json:"startIndex" title:"Start Index"`
EndIndex int `json:"endIndex" title:"End Index"`
Total int `json:"total" title:"Total Items"`
BatchNum int `json:"batchNum" title:"Batch Number"`
TotalBatches int `json:"totalBatches" title:"Total Batches"`
IsLastBatch bool `json:"isLastBatch" title:"Is Last Batch"`
Context map[string]any `json:"context,omitempty" title:"Context"`
}
// CompletionOutput sent after all items
type CompletionOutput struct {
TotalProcessed int `json:"totalProcessed" title:"Total Processed"`
Errors int `json:"errors" title:"Error Count"`
Context map[string]any `json:"context,omitempty" title:"Context"`
}
// EmptyOutput for empty arrays
type EmptyOutput struct {
Context map[string]any `json:"context,omitempty" title:"Context"`
}
var _ module.Component = (*Component)(nil)
func (c *Component) GetInfo() module.ComponentInfo {
return module.ComponentInfo{
Name: ComponentName,
Title: "Array Iterator",
Description: "Iterates over arrays, emitting items individually or in batches",
Category: "Flow Control",
Tags: []string{"array", "loop", "iterate", "batch"},
}
}
func (c *Component) Ports() []module.Port {
return []module.Port{
{
Name: v1alpha1.SettingsPort,
Label: "Settings",
Position: module.PositionTop,
Source: true,
Configuration: Settings{},
},
{
Name: "input",
Label: "Input",
Position: module.PositionLeft,
Source: true,
Configuration: Input{},
},
{
Name: "item",
Label: "Item",
Position: module.PositionRight,
Source: false,
Configuration: ItemOutput{},
},
{
Name: "batch",
Label: "Batch",
Position: module.PositionRight,
Source: false,
Configuration: BatchOutput{},
},
{
Name: "complete",
Label: "Complete",
Position: module.PositionBottom,
Source: false,
Configuration: CompletionOutput{},
},
{
Name: "empty",
Label: "Empty",
Position: module.PositionBottom,
Source: false,
Configuration: EmptyOutput{},
},
}
}
func (c *Component) Handle(
ctx context.Context,
output module.Handler,
port string,
msg any,
) error {
switch port {
case v1alpha1.SettingsPort:
c.settings = msg.(Settings)
if c.settings.BatchSize < 1 {
c.settings.BatchSize = 1
}
return nil
case "input":
return c.handleInput(ctx, output, msg.(Input))
default:
return fmt.Errorf("unknown port: %s", port)
}
}
func (c *Component) handleInput(
ctx context.Context,
output module.Handler,
input Input,
) error {
// Handle empty array
if len(input.Array) == 0 {
return output(ctx, "empty", EmptyOutput{
Context: input.Context,
})
}
total := len(input.Array)
errorCount := 0
// Batch mode
if c.settings.BatchSize > 1 {
return c.processBatches(ctx, output, input, &errorCount)
}
// Single item mode
for i, item := range input.Array {
itemOutput := ItemOutput{
Item: item,
IsFirst: i == 0,
IsLast: i == total-1,
Context: input.Context,
}
if c.settings.EmitIndex {
itemOutput.Index = i
}
if c.settings.EmitProgress {
itemOutput.Total = total
itemOutput.Progress = float64(i+1) / float64(total) * 100
}
err := output(ctx, "item", itemOutput)
if err != nil {
errorCount++
if !c.settings.ContinueOnError {
return err
}
}
}
// Send completion
return output(ctx, "complete", CompletionOutput{
TotalProcessed: total,
Errors: errorCount,
Context: input.Context,
})
}
func (c *Component) processBatches(
ctx context.Context,
output module.Handler,
input Input,
errorCount *int,
) error {
total := len(input.Array)
batchSize := c.settings.BatchSize
totalBatches := (total + batchSize - 1) / batchSize
for batchNum := 0; batchNum < totalBatches; batchNum++ {
startIndex := batchNum * batchSize
endIndex := startIndex + batchSize
if endIndex > total {
endIndex = total
}
batch := input.Array[startIndex:endIndex]
batchOutput := BatchOutput{
Items: batch,
StartIndex: startIndex,
EndIndex: endIndex - 1,
Total: total,
BatchNum: batchNum + 1,
TotalBatches: totalBatches,
IsLastBatch: batchNum == totalBatches-1,
Context: input.Context,
}
err := output(ctx, "batch", batchOutput)
if err != nil {
*errorCount++
if !c.settings.ContinueOnError {
return err
}
}
}
// Send completion
return output(ctx, "complete", CompletionOutput{
TotalProcessed: total,
Errors: *errorCount,
Context: input.Context,
})
}
func (c *Component) Instance() module.Component {
return &Component{}
}Usage Examples
Single Item Mode
yaml
edges:
- port: _settings
data:
batchSize: 1
emitIndex: true
emitProgress: trueInput:
json
{
"array": ["apple", "banana", "cherry"],
"context": {"source": "fruit_list"}
}Outputs (3 messages):
json
// Message 1
{
"item": "apple",
"index": 0,
"total": 3,
"progress": 33.33,
"isFirst": true,
"isLast": false,
"context": {"source": "fruit_list"}
}
// Message 2
{
"item": "banana",
"index": 1,
"total": 3,
"progress": 66.67,
"isFirst": false,
"isLast": false,
"context": {"source": "fruit_list"}
}
// Message 3
{
"item": "cherry",
"index": 2,
"total": 3,
"progress": 100,
"isFirst": false,
"isLast": true,
"context": {"source": "fruit_list"}
}
// Completion
{
"totalProcessed": 3,
"errors": 0,
"context": {"source": "fruit_list"}
}Batch Mode
yaml
edges:
- port: _settings
data:
batchSize: 2Input:
json
{
"array": [1, 2, 3, 4, 5]
}Outputs (3 messages):
json
// Batch 1
{
"items": [1, 2],
"startIndex": 0,
"endIndex": 1,
"total": 5,
"batchNum": 1,
"totalBatches": 3,
"isLastBatch": false
}
// Batch 2
{
"items": [3, 4],
"startIndex": 2,
"endIndex": 3,
"total": 5,
"batchNum": 2,
"totalBatches": 3,
"isLastBatch": false
}
// Batch 3
{
"items": [5],
"startIndex": 4,
"endIndex": 4,
"total": 5,
"batchNum": 3,
"totalBatches": 3,
"isLastBatch": true
}Visual Flow
┌─────────────────────────────────────────────────────────┐
│ Array Iterator │
│ │
│ Input Array: [A, B, C, D, E] │
│ Batch Size: 2 │
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Batch 1: [A, B] ─────────────────────────────► │────┼───► Batch Port
│ │ Batch 2: [C, D] ─────────────────────────────► │ │
│ │ Batch 3: [E] ─────────────────────────────► │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ After all batches: ────────────────────────────────────┼───► Complete Port
└─────────────────────────────────────────────────────────┘Key Patterns Demonstrated
1. Multiple Output Types
Different output ports for different scenarios:
go
"item" // Single item processing
"batch" // Batch processing
"complete" // All items processed
"empty" // Empty array handling2. Context Propagation
Pass context through the iteration:
go
type Input struct {
Array []any `json:"array"`
Context map[string]any `json:"context,omitempty"`
}This allows correlating items with their source.
3. Progress Tracking
Include progress information for UI/monitoring:
go
if c.settings.EmitProgress {
itemOutput.Total = total
itemOutput.Progress = float64(i+1) / float64(total) * 100
}4. Error Resilience
Continue processing despite downstream errors:
go
if err != nil {
errorCount++
if !c.settings.ContinueOnError {
return err
}
}5. Blocking Iteration
The iterator blocks on each output() call:
go
for i, item := range input.Array {
err := output(ctx, "item", itemOutput) // Blocks until downstream completes
// ...
}Common Use Cases
1. Processing API Results
yaml
# Iterate over API response items
input:
array: "{{$.response.data.items}}"
context:
apiEndpoint: "{{$.request.url}}"2. Batch Database Inserts
yaml
settings:
batchSize: 100
continueOnError: true
# Each batch goes to database insert component3. Parallel Processing with Aggregation
┌─────────────┐
Array ─────────────►│ Iterator │─────► Item Processing ─────┐
└─────────────┘ │
│ │
│ complete │
▼ │
┌─────────────┐ │
│ Aggregator │◄────────────────────────────┘
└─────────────┘Extension Ideas
- Parallel Iteration: Process multiple items concurrently
- Filtering: Skip items based on conditions
- Mapping: Transform items during iteration
- Chunking: Split array based on custom logic (not just size)
- Async Mode: Non-blocking iteration with callbacks