Skip to main content

Sample Flow: Data Pipeline Workflow

Fetches data from an external REST API, transforms it with a Script Task, and upserts it into a PostgreSQL database — processing records in batches.


Overview

What it does:

  1. Calls a REST API to fetch a list of records (e.g., orders, products, events)
  2. Uses a Script Task to transform/normalize the data
  3. Loops over each record and upserts it to a database
  4. Handles errors per record without stopping the whole pipeline
  5. Sends a summary notification when complete

Best for: ETL pipelines, sync workflows, data import jobs, scheduled data processing.


Prerequisites

RequirementType
Source REST APIREST API Integration
Target PostgreSQL databaseDatabase Integration
Email providerEmail Integration

Input Variables

{
"processId": "proc_data_pipeline",
"data": {
"sourceEndpoint": "/orders?status=pending&limit=100",
"targetTable": "orders_staging",
"notifyEmail": "ops@yourcompany.com",
"pipelineRunId": "run-20260220"
}
}

Workflow Steps

[Start]

[Set Variable: Initialize counters]
successCount = 0, failCount = 0, errors = []

[Service Task (REST): Fetch Records]
GET {sourceEndpoint}
↓ successFlow
[Script Task: Transform Records]
Normalize field names, parse dates, validate

[Loop: collection=records, variable=record, maxIterations=500]

[Service Task (SQL): Upsert Record]
INSERT ... ON CONFLICT DO UPDATE
↓ successFlow ↓ errorFlow
[Script: increment success] [Script: increment fail, log error]
↓ ↓
[Loop back to next record]
↓ (after loop)
[Script Task: Build Summary]

[Email: Pipeline Summary Report]

[End]

Node Configurations

Set Variable: Initialize

successCount → 0
failCount → 0
errors → []
startTime → (use Script Task to get current timestamp)

Service Task (REST): Fetch Records

restApiConnectionId: int_source_api
endpointPath: {sourceEndpoint}
method: GET

Output: {serviceTaskResponse} contains the API response (array of records).

Script Task: Transform Records

// Normalize and validate the fetched data
let records;

try {
// Handle both {data: [...]} and [...] response formats
records = Array.isArray(serviceTaskResponse)
? serviceTaskResponse
: serviceTaskResponse.data || serviceTaskResponse.results || [];
} catch (e) {
records = [];
}

// Transform each record to our target schema
const transformed = records.map(r => ({
id: r.id || r.order_id || r.orderId,
externalId: String(r.id),
customerEmail: (r.customer_email || r.customerEmail || '').toLowerCase().trim(),
total: parseFloat(r.total || r.amount || 0),
status: (r.status || 'unknown').toLowerCase(),
createdAt: r.created_at || r.createdAt || new Date().toISOString(),
rawData: JSON.stringify(r)
})).filter(r => r.id); // Drop records with no ID

return {
records: transformed,
totalFetched: records.length,
validRecords: transformed.length,
invalidRecords: records.length - transformed.length
};

Loop: Iterate Records

collection: records
loopVariable: record
maxIterations: 500

Service Task (SQL): Upsert Record

databaseConnectionId: int_db_placeholder
mode: direct
transactional: false
queries:
INSERT INTO {targetTable}
(external_id, customer_email, total, status, created_at, raw_data, synced_at)
VALUES
('{record.externalId}', '{record.customerEmail}', {record.total}, '{record.status}',
'{record.createdAt}', '{record.rawData}', NOW())
ON CONFLICT (external_id)
DO UPDATE SET
customer_email = EXCLUDED.customer_email,
total = EXCLUDED.total,
status = EXCLUDED.status,
raw_data = EXCLUDED.raw_data,
synced_at = NOW()

Script Task: Increment Success (on successFlow)

return {
successCount: parseInt(successCount || 0) + 1
};

Script Task: Log Error (on errorFlow)

const errorList = Array.isArray(errors) ? errors : [];
errorList.push({
recordId: record ? record.id : 'unknown',
error: _error || 'Unknown error',
timestamp: new Date().toISOString()
});

return {
failCount: parseInt(failCount || 0) + 1,
errors: errorList
};

Script Task: Build Summary

const endTime = new Date();
const duration = Math.round((endTime - new Date(startTime)) / 1000);

return {
summaryText: `Pipeline Run: ${pipelineRunId}
Total fetched: ${totalFetched}
Valid records: ${validRecords}
Successfully upserted: ${successCount}
Failed: ${failCount}
Duration: ${duration}s
${failCount > 0 ? '\nFailed records:\n' + JSON.stringify(errors, null, 2) : ''}`,
hasErrors: parseInt(failCount) > 0,
duration
};

Email: Pipeline Summary

recipientAddress: {notifyEmail}
subject: Data Pipeline {pipelineRunId} - {successCount} ✓ / {failCount} ✗
contentType: text/html
body:
<h3>Pipeline Run Complete: {pipelineRunId}</h3>
<table>
<tr><td>Total Fetched</td><td>{totalFetched}</td></tr>
<tr><td>Valid Records</td><td>{validRecords}</td></tr>
<tr><td>Successfully Synced</td><td><b style="color:green">{successCount}</b></td></tr>
<tr><td>Failed</td><td><b style="color:{hasErrors == 'true' ? 'red' : 'green'}">{failCount}</b></td></tr>
<tr><td>Duration</td><td>{duration}s</td></tr>
</table>
<pre>{summaryText}</pre>

Error Handling Strategy

This pipeline uses per-record error handling:

  • Each record has its own errorFlow path
  • Errors are collected in the errors array
  • The pipeline continues to the next record even when one fails
  • The final email summarizes all errors

This is the recommended pattern for data pipelines — you don't want one bad record to stop the entire run.


Scheduling the Pipeline

To run this pipeline on a schedule (e.g., every hour), configure it with a Trigger:

  1. Navigate to the workflow → Triggers tab
  2. Add a Cron Trigger
  3. Set the cron expression: 0 * * * * (every hour)
  4. Set default input data (or leave empty for the workflow to use hardcoded endpoints)

The scheduler will automatically trigger the workflow according to the schedule.


Output

After completion:

{
"pipelineRunId": "run-20260220",
"totalFetched": 87,
"validRecords": 85,
"successCount": 84,
"failCount": 1,
"duration": 45,
"hasErrors": true,
"errors": [
{
"recordId": "ord-5521",
"error": "violates not-null constraint: customer_email",
"timestamp": "2026-02-20T10:00:32Z"
}
]
}

Build This Yourself

  1. Navigate to /flows → click New Flow → name it "Data Pipeline" → Create
  2. From the Action Browser, drag onto canvas:
    • Start Event
    • Set Variable (name: "Initialize Counters")
    • Service Task – REST (name: "Fetch Records")
    • Script Task (name: "Transform Records")
    • Loop Node (name: "Process Each Record")
    • Service Task – SQL (name: "Upsert Record") — inside the loop
    • Script Task (name: "Increment Success") — on successFlow
    • Script Task (name: "Log Error") — on errorFlow
    • Script Task (name: "Build Summary")
    • Service Task – Email (name: "Pipeline Summary Report")
    • End Event
  3. Connect: Start → Set Variable → REST → Script (Transform) → Loop → SQL
    • SQL successFlow → Script (Increment Success) → Loop (back)
    • SQL errorFlow → Script (Log Error) → Loop (back)
    • Loop (after completion) → Script (Build Summary) → Email → End
  4. Configure Set Variable: add variables successCount = 0, failCount = 0, errors = []
  5. Configure Loop Node:
    • Collection: records
    • Loop Variable: record
    • Max Iterations: 500
  6. Configure the REST Service Task with your API connection and endpoint path
  7. Configure the SQL Service Task with the upsert query from the Node Configurations section above
  8. Configure the Email node with the summary template
  9. To schedule: open the Triggers tab → Add Trigger → Schedule → set cron expression (e.g., 0 * * * * for hourly)
  10. Click SavePublish