Skip to content

Scheduled Data Sync Flow

A complete example flow that periodically syncs data between systems.

Overview

This flow demonstrates:

  • Scheduled/periodic execution
  • Pagination handling
  • Batch processing
  • Database operations
  • Progress tracking
  • Error recovery

Flow Architecture

┌──────────────┐     ┌───────────────┐     ┌──────────────┐
│   Periodic   │────►│  Fetch Page  │────►│   Process    │
│   Emitter    │     │  from API    │     │   Items      │
└──────────────┘     └───────┬───────┘     └──────┬───────┘
       ▲                     │                     │
       │                     │                     ▼
       │              ┌──────┴──────┐     ┌──────────────┐
       │              │  Has More   │     │   Upsert     │
       │              │   Pages?    │     │   Database   │
       │              └──────┬──────┘     └──────┬───────┘
       │                     │                    │
       │              yes    │    no              ▼
       │              ┌──────┴──────┐     ┌──────────────┐
       │              │ Next Page   │     │   Complete   │
       │              │  Trigger    │     │   Handler    │
       │              └─────────────┘     └──────────────┘
       │                     │
       └─────────────────────┘

Node Configurations

1. Periodic Emitter (Scheduler)

Component: common-module/ticker

Settings:

yaml
interval: "1h"
emitOnStart: false
maxEmissions: 0
payload:
  syncType: "full"
  startTime: null

Control State:

yaml
# Start the scheduler
start: true

Purpose: Triggers the sync process every hour.

2. Initialize Sync State

Component: common-module/modify

Input Edge Configuration:

yaml
data:
  syncId: "{{now()}}"
  syncType: "{{$.payload.syncType}}"
  page: 1
  pageSize: 100
  totalProcessed: 0
  totalPages: null
  startTime: "{{RFC3339(now())}}"
  errors: []

Purpose: Sets up initial state for pagination tracking.

3. API Client (Fetch Page)

Component: http-module/client

Settings:

yaml
baseUrl: "https://api.external-system.com/v1"
defaultHeaders:
  Authorization: "Bearer {{$.secrets.apiToken}}"
  Accept: "application/json"
timeout: "60s"
retryCount: 3
retryDelay: "5s"

Input Edge Configuration:

yaml
data:
  method: "GET"
  url: "/users"
  queryParams:
    page: "{{string($.page)}}"
    limit: "{{string($.pageSize)}}"
    updated_since: "{{$.lastSyncTime || ''}}"

4. Extract Page Data

Component: common-module/modify

Input Edge Configuration:

yaml
data:
  # Preserve sync state
  syncId: "{{$.syncId}}"
  page: "{{$.page}}"
  pageSize: "{{$.pageSize}}"
  startTime: "{{$.startTime}}"
  errors: "{{$.errors}}"

  # Extract response data
  items: "{{$.response.bodyJson.data}}"
  totalItems: "{{$.response.bodyJson.total}}"
  totalPages: "{{ceil($.response.bodyJson.total / $.pageSize)}}"
  hasMore: "{{$.page < ceil($.response.bodyJson.total / $.pageSize)}}"

  # Track progress
  totalProcessed: "{{$.totalProcessed + len($.response.bodyJson.data)}}"

5. Array Iterator

Component: common-module/iterator

Settings:

yaml
batchSize: 10
emitIndex: true
emitProgress: true
continueOnError: true

Input Edge Configuration:

yaml
data:
  array: "{{$.items}}"
  context:
    syncId: "{{$.syncId}}"
    page: "{{$.page}}"
    totalPages: "{{$.totalPages}}"
    hasMore: "{{$.hasMore}}"
    startTime: "{{$.startTime}}"
    errors: "{{$.errors}}"

6. Transform User Data

Component: common-module/modify

Input Edge Configuration (from iterator batch port):

yaml
data:
  users: |
    {{$.items.map(item => ({
      external_id: item.id,
      email: lower(item.email),
      name: item.first_name + ' ' + item.last_name,
      status: item.active ? 'active' : 'inactive',
      metadata: JSON.stringify({
        source: 'external-system',
        original_id: item.id,
        synced_at: RFC3339(now())
      }),
      updated_at: RFC3339(now())
    }))}}
  context: "{{$.context}}"
  batchNum: "{{$.batchNum}}"

7. Database Upsert

Component: database-module/connector

Settings:

yaml
driver: "postgres"
host: "{{$.secrets.dbHost}}"
port: 5432
database: "myapp"
username: "{{$.secrets.dbUser}}"
password: "{{$.secrets.dbPassword}}"
sslMode: "require"
maxOpenConns: 10

Input Edge Configuration:

yaml
data:
  query: |
    INSERT INTO users (external_id, email, name, status, metadata, updated_at)
    VALUES ($1, $2, $3, $4, $5, $6)
    ON CONFLICT (external_id)
    DO UPDATE SET
      email = EXCLUDED.email,
      name = EXCLUDED.name,
      status = EXCLUDED.status,
      metadata = EXCLUDED.metadata,
      updated_at = EXCLUDED.updated_at
  # Note: In practice, you'd use batch insert for efficiency
  parameters: "{{$.users[0] ? [$.users[0].external_id, $.users[0].email, $.users[0].name, $.users[0].status, $.users[0].metadata, $.users[0].updated_at] : []}}"
  operation: "execute"

8. Pagination Router

Component: common-module/router

Settings:

yaml
rules:
  - name: "has_more_pages"
    field: "hasMore"
    operator: "equals"
    value: "true"
    outputPort: "route_a"
defaultPort: "default"

Purpose: Routes to next page or completion based on pagination state.

9. Next Page Trigger

Component: common-module/modify

Input Edge Configuration (from route_a):

yaml
target:
  node: api-client
  port: request
data:
  syncId: "{{$.context.syncId}}"
  page: "{{$.context.page + 1}}"
  pageSize: 100
  totalProcessed: "{{$.context.totalProcessed}}"
  totalPages: "{{$.context.totalPages}}"
  startTime: "{{$.context.startTime}}"
  errors: "{{$.context.errors}}"

Purpose: Increments page and triggers next API call.

10. Sync Complete Handler

Component: common-module/modify

Input Edge Configuration (from default):

yaml
data:
  syncId: "{{$.context.syncId}}"
  status: "completed"
  totalProcessed: "{{$.context.totalProcessed}}"
  totalPages: "{{$.context.totalPages}}"
  duration: "{{(now() - int($.context.startTime)) / 1000000000}}"
  errors: "{{$.context.errors}}"
  completedAt: "{{RFC3339(now())}}"

11. Log Sync Result

Component: database-module/connector

Input Edge Configuration:

yaml
data:
  query: |
    INSERT INTO sync_logs (sync_id, status, records_processed, pages, duration_seconds, errors, completed_at)
    VALUES ($1, $2, $3, $4, $5, $6, $7)
  parameters:
    - "{{$.syncId}}"
    - "{{$.status}}"
    - "{{$.totalProcessed}}"
    - "{{$.totalPages}}"
    - "{{$.duration}}"
    - "{{JSON.stringify($.errors)}}"
    - "{{$.completedAt}}"

12. Error Handler

Component: common-module/modify

Collects errors without stopping the sync:

Input Edge Configuration:

yaml
data:
  # Add error to list and continue
  context:
    syncId: "{{$.context.syncId}}"
    errors: "{{[...$.context.errors, {page: $.context.page, error: $.error, timestamp: RFC3339(now())}]}}"

Incremental Sync Variant

For incremental syncs, store the last sync timestamp:

Modified Initialize Node

yaml
data:
  syncId: "{{now()}}"
  syncType: "incremental"
  page: 1
  pageSize: 100
  # Get last sync time from database or use default
  lastSyncTime: "{{$.lastSyncTime || '2024-01-01T00:00:00Z'}}"

Store Last Sync Time

After successful completion:

yaml
data:
  query: |
    INSERT INTO sync_metadata (key, value, updated_at)
    VALUES ('last_user_sync', $1, $2)
    ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at
  parameters:
    - "{{$.completedAt}}"
    - "{{$.completedAt}}"

Flow Monitoring

Progress Tracking

The iterator emits progress information:

json
{
  "batchNum": 5,
  "totalBatches": 20,
  "progress": 25,
  "isLastBatch": false
}

Sync Logs Table

sql
CREATE TABLE sync_logs (
    id SERIAL PRIMARY KEY,
    sync_id BIGINT NOT NULL,
    status VARCHAR(20) NOT NULL,
    records_processed INT NOT NULL,
    pages INT NOT NULL,
    duration_seconds DECIMAL(10,2),
    errors JSONB,
    completed_at TIMESTAMP WITH TIME ZONE,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

Query Recent Syncs

sql
SELECT
    sync_id,
    status,
    records_processed,
    pages,
    duration_seconds,
    jsonb_array_length(errors) as error_count,
    completed_at
FROM sync_logs
ORDER BY completed_at DESC
LIMIT 10;

Error Recovery

Retry Failed Batches

Store failed batch info for later retry:

yaml
data:
  query: |
    INSERT INTO sync_retry_queue (sync_id, page, batch_num, error, created_at)
    VALUES ($1, $2, $3, $4, NOW())
  parameters:
    - "{{$.context.syncId}}"
    - "{{$.context.page}}"
    - "{{$.batchNum}}"
    - "{{$.error}}"

Manual Retry Trigger

Create a separate flow to process retry queue:

yaml
# Fetch items from retry queue
query: "SELECT * FROM sync_retry_queue WHERE status = 'pending' LIMIT 10"

Performance Optimization

Batch Database Inserts

Instead of single-row inserts, use batch:

sql
INSERT INTO users (external_id, email, name, status, metadata, updated_at)
SELECT * FROM unnest(
    $1::text[],
    $2::text[],
    $3::text[],
    $4::text[],
    $5::jsonb[],
    $6::timestamptz[]
)
ON CONFLICT (external_id) DO UPDATE SET ...

Parallel Page Processing

For independent pages, process multiple concurrently:

yaml
# Use async component to fetch multiple pages
pages:
  - "{{$.currentPage}}"
  - "{{$.currentPage + 1}}"
  - "{{$.currentPage + 2}}"

Key Patterns Used

1. State Preservation Through Flow

Pass sync context through all nodes:

yaml
context:
  syncId: "{{$.syncId}}"
  page: "{{$.page}}"
  totalProcessed: "{{$.totalProcessed}}"

2. Loop via Self-Triggering

Create pagination loop by triggering the API client again:

yaml
target:
  node: api-client
  port: request
data:
  page: "{{$.context.page + 1}}"

3. Error Collection Without Stopping

Collect errors in array, continue processing:

yaml
errors: "{{[...$.context.errors, {error: $.error}]}}"

4. Completion Detection

Use router to detect end of pagination:

yaml
hasMore: "{{$.page < $.totalPages}}"

Build flow-based applications on Kubernetes