Skip to content

API Integration Flow

A complete example flow that integrates multiple external APIs with data enrichment.

Overview

This flow demonstrates:

  • Multi-API orchestration
  • Data enrichment from multiple sources
  • Parallel API calls
  • Response aggregation
  • Caching patterns
  • Rate limiting

Use Case

Build a customer profile by aggregating data from:

  • CRM system (customer details)
  • Payment provider (transaction history)
  • Support system (ticket history)
  • Analytics platform (usage metrics)

Flow Architecture

                           ┌──────────────┐
                           │   Incoming   │
                           │   Request    │
                           └──────┬───────┘


                           ┌──────────────┐
                           │  Validate &  │
                           │   Extract    │
                           └──────┬───────┘

                  ┌───────────────┼───────────────┐
                  │               │               │
                  ▼               ▼               ▼
           ┌──────────┐   ┌──────────┐   ┌──────────┐
           │   CRM    │   │ Payment  │   │ Support  │
           │   API    │   │   API    │   │   API    │
           └────┬─────┘   └────┬─────┘   └────┬─────┘
                │              │              │
                └───────────────┼───────────────┘


                         ┌──────────────┐
                         │  Aggregate   │
                         │   Results    │
                         └──────┬───────┘


                         ┌──────────────┐
                         │   Enrich &   │
                         │   Format     │
                         └──────┬───────┘


                         ┌──────────────┐
                         │   Response   │
                         └──────────────┘

Node Configurations

1. HTTP Server (API Endpoint)

Component: http-module/server

Settings:

yaml
path: "/api/v1/customers/:id/profile"
methods: ["GET"]
hostnames: ["api.myservice.com"]
timeout: "30s"

2. Request Validator

Component: common-module/modify

Input Edge Configuration:

yaml
data:
  # Extract customer ID from path
  customerId: "{{$.request.path.split('/')[4]}}"
  requestId: "{{$.request.requestId}}"
  timestamp: "{{now()}}"

  # Validate
  isValid: "{{$.customerId != null && len($.customerId) > 0}}"

3. Validation Router

Component: common-module/router

Settings:

yaml
rules:
  - name: "valid_request"
    field: "isValid"
    operator: "equals"
    value: "true"
    outputPort: "route_a"
defaultPort: "error"

4. Signal Splitter (Parallel API Calls)

Component: common-module/signal

Creates multiple signals for parallel execution:

Input Edge Configuration:

yaml
data:
  signals:
    - target: "crm-client"
      port: "request"
      data:
        customerId: "{{$.customerId}}"
        requestId: "{{$.requestId}}"
        source: "crm"
    - target: "payment-client"
      port: "request"
      data:
        customerId: "{{$.customerId}}"
        requestId: "{{$.requestId}}"
        source: "payment"
    - target: "support-client"
      port: "request"
      data:
        customerId: "{{$.customerId}}"
        requestId: "{{$.requestId}}"
        source: "support"
  context:
    customerId: "{{$.customerId}}"
    requestId: "{{$.requestId}}"
    expectedResponses: 3

5. CRM API Client

Component: http-module/client

Settings:

yaml
baseUrl: "https://api.crm-system.com/v2"
defaultHeaders:
  Authorization: "Bearer {{$.secrets.crmToken}}"
  X-API-Version: "2024-01"
timeout: "10s"
retryCount: 2

Input Edge Configuration:

yaml
data:
  method: "GET"
  url: "/contacts/{{$.customerId}}"

Output Transformation:

yaml
data:
  requestId: "{{$.requestId}}"
  source: "crm"
  success: "{{$.response.statusCode == 200}}"
  data:
    id: "{{$.response.bodyJson.id}}"
    email: "{{$.response.bodyJson.email}}"
    name: "{{$.response.bodyJson.first_name}} {{$.response.bodyJson.last_name}}"
    company: "{{$.response.bodyJson.company}}"
    createdAt: "{{$.response.bodyJson.created_at}}"
    tags: "{{$.response.bodyJson.tags}}"

6. Payment API Client

Component: http-module/client

Settings:

yaml
baseUrl: "https://api.payment-provider.com/v1"
defaultHeaders:
  Authorization: "Basic {{$.secrets.paymentApiKey}}"
timeout: "10s"
retryCount: 2

Input Edge Configuration:

yaml
data:
  method: "GET"
  url: "/customers/{{$.customerId}}/summary"

Output Transformation:

yaml
data:
  requestId: "{{$.requestId}}"
  source: "payment"
  success: "{{$.response.statusCode == 200}}"
  data:
    totalSpent: "{{$.response.bodyJson.lifetime_value}}"
    currency: "{{$.response.bodyJson.currency}}"
    subscriptionStatus: "{{$.response.bodyJson.subscription.status}}"
    lastPayment: "{{$.response.bodyJson.last_payment_date}}"
    paymentMethod: "{{$.response.bodyJson.default_payment_method.type}}"

7. Support API Client

Component: http-module/client

Settings:

yaml
baseUrl: "https://support.company.com/api"
defaultHeaders:
  X-API-Key: "{{$.secrets.supportApiKey}}"
timeout: "10s"
retryCount: 2

Input Edge Configuration:

yaml
data:
  method: "GET"
  url: "/users/by-external-id/{{$.customerId}}/stats"

Output Transformation:

yaml
data:
  requestId: "{{$.requestId}}"
  source: "support"
  success: "{{$.response.statusCode == 200}}"
  data:
    totalTickets: "{{$.response.bodyJson.total_tickets}}"
    openTickets: "{{$.response.bodyJson.open_tickets}}"
    avgResponseTime: "{{$.response.bodyJson.avg_response_hours}}"
    satisfactionScore: "{{$.response.bodyJson.csat_score}}"
    lastContactDate: "{{$.response.bodyJson.last_contact}}"

8. Response Aggregator

Component: common-module/aggregator

Settings:

yaml
groupBy: "requestId"
expectedCount: 3
timeout: "15s"
partialOnTimeout: true

Purpose: Waits for all three API responses and combines them.

Output:

yaml
data:
  requestId: "{{$.requestId}}"
  customerId: "{{$.customerId}}"
  responses:
    crm: "{{$.items.find(i => i.source == 'crm').data}}"
    payment: "{{$.items.find(i => i.source == 'payment').data}}"
    support: "{{$.items.find(i => i.source == 'support').data}}"
  allSuccessful: "{{$.items.every(i => i.success)}}"
  failedSources: "{{$.items.filter(i => !i.success).map(i => i.source)}}"

9. Profile Builder

Component: common-module/modify

Input Edge Configuration:

yaml
data:
  profile:
    # Basic info from CRM
    id: "{{$.customerId}}"
    email: "{{$.responses.crm.email}}"
    name: "{{$.responses.crm.name}}"
    company: "{{$.responses.crm.company}}"
    customerSince: "{{$.responses.crm.createdAt}}"
    tags: "{{$.responses.crm.tags}}"

    # Financial info from Payment
    financial:
      totalSpent: "{{$.responses.payment.totalSpent}}"
      currency: "{{$.responses.payment.currency}}"
      subscriptionStatus: "{{$.responses.payment.subscriptionStatus}}"
      lastPayment: "{{$.responses.payment.lastPayment}}"
      paymentMethod: "{{$.responses.payment.paymentMethod}}"

    # Support info
    support:
      totalTickets: "{{$.responses.support.totalTickets}}"
      openTickets: "{{$.responses.support.openTickets}}"
      avgResponseTime: "{{$.responses.support.avgResponseTime}}"
      satisfactionScore: "{{$.responses.support.satisfactionScore}}"
      lastContact: "{{$.responses.support.lastContactDate}}"

    # Computed fields
    healthScore: "{{c.calculateHealthScore($.responses)}}"
    riskLevel: "{{c.calculateRiskLevel($.responses)}}"

    # Metadata
    _meta:
      fetchedAt: "{{RFC3339(now())}}"
      sources: ["crm", "payment", "support"]
      incompleteData: "{{$.failedSources}}"

  requestId: "{{$.requestId}}"

10. Response Sender

Component: common-module/modify

Input Edge Configuration:

yaml
target:
  node: http-server
  port: response
data:
  requestId: "{{$.requestId}}"
  statusCode: 200
  contentType: "application/json"
  headers:
    X-Data-Completeness: "{{$.profile._meta.incompleteData.length == 0 ? 'complete' : 'partial'}}"
    Cache-Control: "private, max-age=60"
  body: "{{JSON.stringify($.profile)}}"

11. Error Response Handler

Component: common-module/modify

Input Edge Configuration (from validation error):

yaml
target:
  node: http-server
  port: response
data:
  requestId: "{{$.requestId}}"
  statusCode: 400
  contentType: "application/json"
  body: "{\"error\":\"Invalid customer ID\",\"code\":\"INVALID_CUSTOMER_ID\"}"

Caching Layer

Add caching to reduce API calls:

Cache Check Node

Component: cache-module/get

Input Edge Configuration:

yaml
data:
  key: "customer-profile:{{$.customerId}}"
  ttl: 300

Cache Router

Component: common-module/router

Settings:

yaml
rules:
  - name: "cache_hit"
    field: "cacheHit"
    operator: "equals"
    value: "true"
    outputPort: "cached"
defaultPort: "fetch"

Cache Store Node

Component: cache-module/set

After building profile:

yaml
data:
  key: "customer-profile:{{$.customerId}}"
  value: "{{$.profile}}"
  ttl: 300

Rate Limiting

Protect external APIs from overload:

Rate Limiter Node

Component: common-module/rate-limiter

Settings:

yaml
key: "api:{{$.source}}"
maxRequests: 100
windowSeconds: 60
strategy: "sliding_window"

Place before each API client.

Complete Response Example

json
{
  "id": "cust_12345",
  "email": "jane.doe@company.com",
  "name": "Jane Doe",
  "company": "Acme Corp",
  "customerSince": "2022-03-15T00:00:00Z",
  "tags": ["enterprise", "priority"],

  "financial": {
    "totalSpent": 15420.00,
    "currency": "USD",
    "subscriptionStatus": "active",
    "lastPayment": "2024-01-01T00:00:00Z",
    "paymentMethod": "card"
  },

  "support": {
    "totalTickets": 12,
    "openTickets": 1,
    "avgResponseTime": 2.5,
    "satisfactionScore": 4.8,
    "lastContact": "2024-01-10T00:00:00Z"
  },

  "healthScore": 92,
  "riskLevel": "low",

  "_meta": {
    "fetchedAt": "2024-01-15T10:30:00Z",
    "sources": ["crm", "payment", "support"],
    "incompleteData": []
  }
}

Key Patterns Used

1. Parallel API Calls with Signals

Fan out to multiple APIs simultaneously:

yaml
signals:
  - target: "crm-client"
  - target: "payment-client"
  - target: "support-client"

2. Response Aggregation

Collect all responses before proceeding:

yaml
groupBy: "requestId"
expectedCount: 3
timeout: "15s"

3. Graceful Degradation

Handle partial failures:

yaml
partialOnTimeout: true
incompleteData: "{{$.failedSources}}"

4. Response Caching

Cache aggregated results:

yaml
key: "customer-profile:{{$.customerId}}"
ttl: 300

5. Health Score Computation

Derive metrics from aggregated data:

yaml
healthScore: "{{c.calculateHealthScore($.responses)}}"

Error Handling

Individual API Failures

Mark source as failed, continue with others:

yaml
success: "{{$.response.statusCode == 200}}"
failedSources: "{{$.items.filter(i => !i.success).map(i => i.source)}}"

Complete Failure

If all APIs fail, return error:

yaml
rules:
  - name: "all_failed"
    field: "allFailed"
    operator: "equals"
    value: "true"
    outputPort: "error"

Timeout Handling

Return partial data on timeout:

yaml
partialOnTimeout: true
headers:
  X-Data-Completeness: "partial"

Monitoring Metrics

Track for observability:

  • Response time per source: Identify slow APIs
  • Cache hit rate: Measure caching effectiveness
  • Partial response rate: Track API reliability
  • Error rate by source: Identify problematic integrations

Build flow-based applications on Kubernetes