Scheduled Data Sync Flow
A complete example flow that periodically syncs data between systems.
Overview
This flow demonstrates:
- Scheduled/periodic execution
- Pagination handling
- Batch processing
- Database operations
- Progress tracking
- Error recovery
Flow Architecture
┌──────────────┐ ┌───────────────┐ ┌──────────────┐
│ Periodic │────►│ Fetch Page │────►│ Process │
│ Emitter │ │ from API │ │ Items │
└──────────────┘ └───────┬───────┘ └──────┬───────┘
▲ │ │
│ │ ▼
│ ┌──────┴──────┐ ┌──────────────┐
│ │ Has More │ │ Upsert │
│ │ Pages? │ │ Database │
│ └──────┬──────┘ └──────┬───────┘
│ │ │
│ yes │ no ▼
│ ┌──────┴──────┐ ┌──────────────┐
│ │ Next Page │ │ Complete │
│ │ Trigger │ │ Handler │
│ └─────────────┘ └──────────────┘
│ │
└─────────────────────┘Node Configurations
1. Periodic Emitter (Scheduler)
Component: common-module/ticker
Settings:
interval: "1h"
emitOnStart: false
maxEmissions: 0
payload:
syncType: "full"
startTime: nullControl State:
# Start the scheduler
start: truePurpose: Triggers the sync process every hour.
2. Initialize Sync State
Component: common-module/modify
Input Edge Configuration:
data:
syncId: "{{now()}}"
syncType: "{{$.payload.syncType}}"
page: 1
pageSize: 100
totalProcessed: 0
totalPages: null
startTime: "{{RFC3339(now())}}"
errors: []Purpose: Sets up initial state for pagination tracking.
3. API Client (Fetch Page)
Component: http-module/client
Settings:
baseUrl: "https://api.external-system.com/v1"
defaultHeaders:
Authorization: "Bearer {{$.secrets.apiToken}}"
Accept: "application/json"
timeout: "60s"
retryCount: 3
retryDelay: "5s"Input Edge Configuration:
data:
method: "GET"
url: "/users"
queryParams:
page: "{{string($.page)}}"
limit: "{{string($.pageSize)}}"
updated_since: "{{$.lastSyncTime || ''}}"4. Extract Page Data
Component: common-module/modify
Input Edge Configuration:
data:
# Preserve sync state
syncId: "{{$.syncId}}"
page: "{{$.page}}"
pageSize: "{{$.pageSize}}"
startTime: "{{$.startTime}}"
errors: "{{$.errors}}"
# Extract response data
items: "{{$.response.bodyJson.data}}"
totalItems: "{{$.response.bodyJson.total}}"
totalPages: "{{ceil($.response.bodyJson.total / $.pageSize)}}"
hasMore: "{{$.page < ceil($.response.bodyJson.total / $.pageSize)}}"
# Track progress
totalProcessed: "{{$.totalProcessed + len($.response.bodyJson.data)}}"5. Array Iterator
Component: common-module/iterator
Settings:
batchSize: 10
emitIndex: true
emitProgress: true
continueOnError: trueInput Edge Configuration:
data:
array: "{{$.items}}"
context:
syncId: "{{$.syncId}}"
page: "{{$.page}}"
totalPages: "{{$.totalPages}}"
hasMore: "{{$.hasMore}}"
startTime: "{{$.startTime}}"
errors: "{{$.errors}}"6. Transform User Data
Component: common-module/modify
Input Edge Configuration (from iterator batch port):
data:
users: |
{{$.items.map(item => ({
external_id: item.id,
email: lower(item.email),
name: item.first_name + ' ' + item.last_name,
status: item.active ? 'active' : 'inactive',
metadata: JSON.stringify({
source: 'external-system',
original_id: item.id,
synced_at: RFC3339(now())
}),
updated_at: RFC3339(now())
}))}}
context: "{{$.context}}"
batchNum: "{{$.batchNum}}"7. Database Upsert
Component: database-module/connector
Settings:
driver: "postgres"
host: "{{$.secrets.dbHost}}"
port: 5432
database: "myapp"
username: "{{$.secrets.dbUser}}"
password: "{{$.secrets.dbPassword}}"
sslMode: "require"
maxOpenConns: 10Input Edge Configuration:
data:
query: |
INSERT INTO users (external_id, email, name, status, metadata, updated_at)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (external_id)
DO UPDATE SET
email = EXCLUDED.email,
name = EXCLUDED.name,
status = EXCLUDED.status,
metadata = EXCLUDED.metadata,
updated_at = EXCLUDED.updated_at
# Note: In practice, you'd use batch insert for efficiency
parameters: "{{$.users[0] ? [$.users[0].external_id, $.users[0].email, $.users[0].name, $.users[0].status, $.users[0].metadata, $.users[0].updated_at] : []}}"
operation: "execute"8. Pagination Router
Component: common-module/router
Settings:
rules:
- name: "has_more_pages"
field: "hasMore"
operator: "equals"
value: "true"
outputPort: "route_a"
defaultPort: "default"Purpose: Routes to next page or completion based on pagination state.
9. Next Page Trigger
Component: common-module/modify
Input Edge Configuration (from route_a):
target:
node: api-client
port: request
data:
syncId: "{{$.context.syncId}}"
page: "{{$.context.page + 1}}"
pageSize: 100
totalProcessed: "{{$.context.totalProcessed}}"
totalPages: "{{$.context.totalPages}}"
startTime: "{{$.context.startTime}}"
errors: "{{$.context.errors}}"Purpose: Increments page and triggers next API call.
10. Sync Complete Handler
Component: common-module/modify
Input Edge Configuration (from default):
data:
syncId: "{{$.context.syncId}}"
status: "completed"
totalProcessed: "{{$.context.totalProcessed}}"
totalPages: "{{$.context.totalPages}}"
duration: "{{(now() - int($.context.startTime)) / 1000000000}}"
errors: "{{$.context.errors}}"
completedAt: "{{RFC3339(now())}}"11. Log Sync Result
Component: database-module/connector
Input Edge Configuration:
data:
query: |
INSERT INTO sync_logs (sync_id, status, records_processed, pages, duration_seconds, errors, completed_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
parameters:
- "{{$.syncId}}"
- "{{$.status}}"
- "{{$.totalProcessed}}"
- "{{$.totalPages}}"
- "{{$.duration}}"
- "{{JSON.stringify($.errors)}}"
- "{{$.completedAt}}"12. Error Handler
Component: common-module/modify
Collects errors without stopping the sync:
Input Edge Configuration:
data:
# Add error to list and continue
context:
syncId: "{{$.context.syncId}}"
errors: "{{[...$.context.errors, {page: $.context.page, error: $.error, timestamp: RFC3339(now())}]}}"Incremental Sync Variant
For incremental syncs, store the last sync timestamp:
Modified Initialize Node
data:
syncId: "{{now()}}"
syncType: "incremental"
page: 1
pageSize: 100
# Get last sync time from database or use default
lastSyncTime: "{{$.lastSyncTime || '2024-01-01T00:00:00Z'}}"Store Last Sync Time
After successful completion:
data:
query: |
INSERT INTO sync_metadata (key, value, updated_at)
VALUES ('last_user_sync', $1, $2)
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at
parameters:
- "{{$.completedAt}}"
- "{{$.completedAt}}"Flow Monitoring
Progress Tracking
The iterator emits progress information:
{
"batchNum": 5,
"totalBatches": 20,
"progress": 25,
"isLastBatch": false
}Sync Logs Table
CREATE TABLE sync_logs (
id SERIAL PRIMARY KEY,
sync_id BIGINT NOT NULL,
status VARCHAR(20) NOT NULL,
records_processed INT NOT NULL,
pages INT NOT NULL,
duration_seconds DECIMAL(10,2),
errors JSONB,
completed_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);Query Recent Syncs
SELECT
sync_id,
status,
records_processed,
pages,
duration_seconds,
jsonb_array_length(errors) as error_count,
completed_at
FROM sync_logs
ORDER BY completed_at DESC
LIMIT 10;Error Recovery
Retry Failed Batches
Store failed batch info for later retry:
data:
query: |
INSERT INTO sync_retry_queue (sync_id, page, batch_num, error, created_at)
VALUES ($1, $2, $3, $4, NOW())
parameters:
- "{{$.context.syncId}}"
- "{{$.context.page}}"
- "{{$.batchNum}}"
- "{{$.error}}"Manual Retry Trigger
Create a separate flow to process retry queue:
# Fetch items from retry queue
query: "SELECT * FROM sync_retry_queue WHERE status = 'pending' LIMIT 10"Performance Optimization
Batch Database Inserts
Instead of single-row inserts, use batch:
INSERT INTO users (external_id, email, name, status, metadata, updated_at)
SELECT * FROM unnest(
$1::text[],
$2::text[],
$3::text[],
$4::text[],
$5::jsonb[],
$6::timestamptz[]
)
ON CONFLICT (external_id) DO UPDATE SET ...Parallel Page Processing
For independent pages, process multiple concurrently:
# Use async component to fetch multiple pages
pages:
- "{{$.currentPage}}"
- "{{$.currentPage + 1}}"
- "{{$.currentPage + 2}}"Key Patterns Used
1. State Preservation Through Flow
Pass sync context through all nodes:
context:
syncId: "{{$.syncId}}"
page: "{{$.page}}"
totalProcessed: "{{$.totalProcessed}}"2. Loop via Self-Triggering
Create pagination loop by triggering the API client again:
target:
node: api-client
port: request
data:
page: "{{$.context.page + 1}}"3. Error Collection Without Stopping
Collect errors in array, continue processing:
errors: "{{[...$.context.errors, {error: $.error}]}}"4. Completion Detection
Use router to detect end of pagination:
hasMore: "{{$.page < $.totalPages}}"