API Integration Flow
A complete example flow that integrates multiple external APIs with data enrichment.
Overview
This flow demonstrates:
- Multi-API orchestration
- Data enrichment from multiple sources
- Parallel API calls
- Response aggregation
- Caching patterns
- Rate limiting
Use Case
Build a customer profile by aggregating data from:
- CRM system (customer details)
- Payment provider (transaction history)
- Support system (ticket history)
- Analytics platform (usage metrics)
Flow Architecture
┌──────────────┐
│ Incoming │
│ Request │
└──────┬───────┘
│
▼
┌──────────────┐
│ Validate & │
│ Extract │
└──────┬───────┘
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ CRM │ │ Payment │ │ Support │
│ API │ │ API │ │ API │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└───────────────┼───────────────┘
│
▼
┌──────────────┐
│ Aggregate │
│ Results │
└──────┬───────┘
│
▼
┌──────────────┐
│ Enrich & │
│ Format │
└──────┬───────┘
│
▼
┌──────────────┐
│ Response │
└──────────────┘Node Configurations
1. HTTP Server (API Endpoint)
Component: http-module/server
Settings:
path: "/api/v1/customers/:id/profile"
methods: ["GET"]
hostnames: ["api.myservice.com"]
timeout: "30s"2. Request Validator
Component: common-module/modify
Input Edge Configuration:
data:
# Extract customer ID from path
customerId: "{{$.request.path.split('/')[4]}}"
requestId: "{{$.request.requestId}}"
timestamp: "{{now()}}"
# Validate
isValid: "{{$.customerId != null && len($.customerId) > 0}}"3. Validation Router
Component: common-module/router
Settings:
rules:
- name: "valid_request"
field: "isValid"
operator: "equals"
value: "true"
outputPort: "route_a"
defaultPort: "error"4. Signal Splitter (Parallel API Calls)
Component: common-module/signal
Creates multiple signals for parallel execution:
Input Edge Configuration:
data:
signals:
- target: "crm-client"
port: "request"
data:
customerId: "{{$.customerId}}"
requestId: "{{$.requestId}}"
source: "crm"
- target: "payment-client"
port: "request"
data:
customerId: "{{$.customerId}}"
requestId: "{{$.requestId}}"
source: "payment"
- target: "support-client"
port: "request"
data:
customerId: "{{$.customerId}}"
requestId: "{{$.requestId}}"
source: "support"
context:
customerId: "{{$.customerId}}"
requestId: "{{$.requestId}}"
expectedResponses: 35. CRM API Client
Component: http-module/client
Settings:
baseUrl: "https://api.crm-system.com/v2"
defaultHeaders:
Authorization: "Bearer {{$.secrets.crmToken}}"
X-API-Version: "2024-01"
timeout: "10s"
retryCount: 2Input Edge Configuration:
data:
method: "GET"
url: "/contacts/{{$.customerId}}"Output Transformation:
data:
requestId: "{{$.requestId}}"
source: "crm"
success: "{{$.response.statusCode == 200}}"
data:
id: "{{$.response.bodyJson.id}}"
email: "{{$.response.bodyJson.email}}"
name: "{{$.response.bodyJson.first_name}} {{$.response.bodyJson.last_name}}"
company: "{{$.response.bodyJson.company}}"
createdAt: "{{$.response.bodyJson.created_at}}"
tags: "{{$.response.bodyJson.tags}}"6. Payment API Client
Component: http-module/client
Settings:
baseUrl: "https://api.payment-provider.com/v1"
defaultHeaders:
Authorization: "Basic {{$.secrets.paymentApiKey}}"
timeout: "10s"
retryCount: 2Input Edge Configuration:
data:
method: "GET"
url: "/customers/{{$.customerId}}/summary"Output Transformation:
data:
requestId: "{{$.requestId}}"
source: "payment"
success: "{{$.response.statusCode == 200}}"
data:
totalSpent: "{{$.response.bodyJson.lifetime_value}}"
currency: "{{$.response.bodyJson.currency}}"
subscriptionStatus: "{{$.response.bodyJson.subscription.status}}"
lastPayment: "{{$.response.bodyJson.last_payment_date}}"
paymentMethod: "{{$.response.bodyJson.default_payment_method.type}}"7. Support API Client
Component: http-module/client
Settings:
baseUrl: "https://support.company.com/api"
defaultHeaders:
X-API-Key: "{{$.secrets.supportApiKey}}"
timeout: "10s"
retryCount: 2Input Edge Configuration:
data:
method: "GET"
url: "/users/by-external-id/{{$.customerId}}/stats"Output Transformation:
data:
requestId: "{{$.requestId}}"
source: "support"
success: "{{$.response.statusCode == 200}}"
data:
totalTickets: "{{$.response.bodyJson.total_tickets}}"
openTickets: "{{$.response.bodyJson.open_tickets}}"
avgResponseTime: "{{$.response.bodyJson.avg_response_hours}}"
satisfactionScore: "{{$.response.bodyJson.csat_score}}"
lastContactDate: "{{$.response.bodyJson.last_contact}}"8. Response Aggregator
Component: common-module/aggregator
Settings:
groupBy: "requestId"
expectedCount: 3
timeout: "15s"
partialOnTimeout: truePurpose: Waits for all three API responses and combines them.
Output:
data:
requestId: "{{$.requestId}}"
customerId: "{{$.customerId}}"
responses:
crm: "{{$.items.find(i => i.source == 'crm').data}}"
payment: "{{$.items.find(i => i.source == 'payment').data}}"
support: "{{$.items.find(i => i.source == 'support').data}}"
allSuccessful: "{{$.items.every(i => i.success)}}"
failedSources: "{{$.items.filter(i => !i.success).map(i => i.source)}}"9. Profile Builder
Component: common-module/modify
Input Edge Configuration:
data:
profile:
# Basic info from CRM
id: "{{$.customerId}}"
email: "{{$.responses.crm.email}}"
name: "{{$.responses.crm.name}}"
company: "{{$.responses.crm.company}}"
customerSince: "{{$.responses.crm.createdAt}}"
tags: "{{$.responses.crm.tags}}"
# Financial info from Payment
financial:
totalSpent: "{{$.responses.payment.totalSpent}}"
currency: "{{$.responses.payment.currency}}"
subscriptionStatus: "{{$.responses.payment.subscriptionStatus}}"
lastPayment: "{{$.responses.payment.lastPayment}}"
paymentMethod: "{{$.responses.payment.paymentMethod}}"
# Support info
support:
totalTickets: "{{$.responses.support.totalTickets}}"
openTickets: "{{$.responses.support.openTickets}}"
avgResponseTime: "{{$.responses.support.avgResponseTime}}"
satisfactionScore: "{{$.responses.support.satisfactionScore}}"
lastContact: "{{$.responses.support.lastContactDate}}"
# Computed fields
healthScore: "{{c.calculateHealthScore($.responses)}}"
riskLevel: "{{c.calculateRiskLevel($.responses)}}"
# Metadata
_meta:
fetchedAt: "{{RFC3339(now())}}"
sources: ["crm", "payment", "support"]
incompleteData: "{{$.failedSources}}"
requestId: "{{$.requestId}}"10. Response Sender
Component: common-module/modify
Input Edge Configuration:
target:
node: http-server
port: response
data:
requestId: "{{$.requestId}}"
statusCode: 200
contentType: "application/json"
headers:
X-Data-Completeness: "{{$.profile._meta.incompleteData.length == 0 ? 'complete' : 'partial'}}"
Cache-Control: "private, max-age=60"
body: "{{JSON.stringify($.profile)}}"11. Error Response Handler
Component: common-module/modify
Input Edge Configuration (from validation error):
target:
node: http-server
port: response
data:
requestId: "{{$.requestId}}"
statusCode: 400
contentType: "application/json"
body: "{\"error\":\"Invalid customer ID\",\"code\":\"INVALID_CUSTOMER_ID\"}"Caching Layer
Add caching to reduce API calls:
Cache Check Node
Component: cache-module/get
Input Edge Configuration:
data:
key: "customer-profile:{{$.customerId}}"
ttl: 300Cache Router
Component: common-module/router
Settings:
rules:
- name: "cache_hit"
field: "cacheHit"
operator: "equals"
value: "true"
outputPort: "cached"
defaultPort: "fetch"Cache Store Node
Component: cache-module/set
After building profile:
data:
key: "customer-profile:{{$.customerId}}"
value: "{{$.profile}}"
ttl: 300Rate Limiting
Protect external APIs from overload:
Rate Limiter Node
Component: common-module/rate-limiter
Settings:
key: "api:{{$.source}}"
maxRequests: 100
windowSeconds: 60
strategy: "sliding_window"Place before each API client.
Complete Response Example
{
"id": "cust_12345",
"email": "jane.doe@company.com",
"name": "Jane Doe",
"company": "Acme Corp",
"customerSince": "2022-03-15T00:00:00Z",
"tags": ["enterprise", "priority"],
"financial": {
"totalSpent": 15420.00,
"currency": "USD",
"subscriptionStatus": "active",
"lastPayment": "2024-01-01T00:00:00Z",
"paymentMethod": "card"
},
"support": {
"totalTickets": 12,
"openTickets": 1,
"avgResponseTime": 2.5,
"satisfactionScore": 4.8,
"lastContact": "2024-01-10T00:00:00Z"
},
"healthScore": 92,
"riskLevel": "low",
"_meta": {
"fetchedAt": "2024-01-15T10:30:00Z",
"sources": ["crm", "payment", "support"],
"incompleteData": []
}
}Key Patterns Used
1. Parallel API Calls with Signals
Fan out to multiple APIs simultaneously:
signals:
- target: "crm-client"
- target: "payment-client"
- target: "support-client"2. Response Aggregation
Collect all responses before proceeding:
groupBy: "requestId"
expectedCount: 3
timeout: "15s"3. Graceful Degradation
Handle partial failures:
partialOnTimeout: true
incompleteData: "{{$.failedSources}}"4. Response Caching
Cache aggregated results:
key: "customer-profile:{{$.customerId}}"
ttl: 3005. Health Score Computation
Derive metrics from aggregated data:
healthScore: "{{c.calculateHealthScore($.responses)}}"Error Handling
Individual API Failures
Mark source as failed, continue with others:
success: "{{$.response.statusCode == 200}}"
failedSources: "{{$.items.filter(i => !i.success).map(i => i.source)}}"Complete Failure
If all APIs fail, return error:
rules:
- name: "all_failed"
field: "allFailed"
operator: "equals"
value: "true"
outputPort: "error"Timeout Handling
Return partial data on timeout:
partialOnTimeout: true
headers:
X-Data-Completeness: "partial"Monitoring Metrics
Track for observability:
- Response time per source: Identify slow APIs
- Cache hit rate: Measure caching effectiveness
- Partial response rate: Track API reliability
- Error rate by source: Identify problematic integrations