Sample Flow: Data Pipeline Workflow
Fetches data from an external REST API, transforms it with a Script Task, and upserts it into a PostgreSQL database — processing records in batches.
Overview
What it does:
- Calls a REST API to fetch a list of records (e.g., orders, products, events)
- Uses a Script Task to transform/normalize the data
- Loops over each record and upserts it to a database
- Handles errors per record without stopping the whole pipeline
- Sends a summary notification when complete
Best for: ETL pipelines, sync workflows, data import jobs, scheduled data processing.
Prerequisites
| Requirement | Type |
|---|---|
| Source REST API | REST API Integration |
| Target PostgreSQL database | Database Integration |
| Email provider | Email Integration |
Input Variables
{
"processId": "proc_data_pipeline",
"data": {
"sourceEndpoint": "/orders?status=pending&limit=100",
"targetTable": "orders_staging",
"notifyEmail": "ops@yourcompany.com",
"pipelineRunId": "run-20260220"
}
}
Workflow Steps
[Start]
↓
[Set Variable: Initialize counters]
successCount = 0, failCount = 0, errors = []
↓
[Service Task (REST): Fetch Records]
GET {sourceEndpoint}
↓ successFlow
[Script Task: Transform Records]
Normalize field names, parse dates, validate
↓
[Loop: collection=records, variable=record, maxIterations=500]
↓
[Service Task (SQL): Upsert Record]
INSERT ... ON CONFLICT DO UPDATE
↓ successFlow ↓ errorFlow
[Script: increment success] [Script: increment fail, log error]
↓ ↓
[Loop back to next record]
↓ (after loop)
[Script Task: Build Summary]
↓
[Email: Pipeline Summary Report]
↓
[End]
Node Configurations
Set Variable: Initialize
successCount → 0
failCount → 0
errors → []
startTime → (use Script Task to get current timestamp)
Service Task (REST): Fetch Records
restApiConnectionId: int_source_api
endpointPath: {sourceEndpoint}
method: GET
Output: {serviceTaskResponse} contains the API response (array of records).
Script Task: Transform Records
// Normalize and validate the fetched data
let records;
try {
// Handle both {data: [...]} and [...] response formats
records = Array.isArray(serviceTaskResponse)
? serviceTaskResponse
: serviceTaskResponse.data || serviceTaskResponse.results || [];
} catch (e) {
records = [];
}
// Transform each record to our target schema
const transformed = records.map(r => ({
id: r.id || r.order_id || r.orderId,
externalId: String(r.id),
customerEmail: (r.customer_email || r.customerEmail || '').toLowerCase().trim(),
total: parseFloat(r.total || r.amount || 0),
status: (r.status || 'unknown').toLowerCase(),
createdAt: r.created_at || r.createdAt || new Date().toISOString(),
rawData: JSON.stringify(r)
})).filter(r => r.id); // Drop records with no ID
return {
records: transformed,
totalFetched: records.length,
validRecords: transformed.length,
invalidRecords: records.length - transformed.length
};
Loop: Iterate Records
collection: records
loopVariable: record
maxIterations: 500
Service Task (SQL): Upsert Record
databaseConnectionId: int_db_placeholder
mode: direct
transactional: false
queries:
INSERT INTO {targetTable}
(external_id, customer_email, total, status, created_at, raw_data, synced_at)
VALUES
('{record.externalId}', '{record.customerEmail}', {record.total}, '{record.status}',
'{record.createdAt}', '{record.rawData}', NOW())
ON CONFLICT (external_id)
DO UPDATE SET
customer_email = EXCLUDED.customer_email,
total = EXCLUDED.total,
status = EXCLUDED.status,
raw_data = EXCLUDED.raw_data,
synced_at = NOW()
Script Task: Increment Success (on successFlow)
return {
successCount: parseInt(successCount || 0) + 1
};
Script Task: Log Error (on errorFlow)
const errorList = Array.isArray(errors) ? errors : [];
errorList.push({
recordId: record ? record.id : 'unknown',
error: _error || 'Unknown error',
timestamp: new Date().toISOString()
});
return {
failCount: parseInt(failCount || 0) + 1,
errors: errorList
};
Script Task: Build Summary
const endTime = new Date();
const duration = Math.round((endTime - new Date(startTime)) / 1000);
return {
summaryText: `Pipeline Run: ${pipelineRunId}
Total fetched: ${totalFetched}
Valid records: ${validRecords}
Successfully upserted: ${successCount}
Failed: ${failCount}
Duration: ${duration}s
${failCount > 0 ? '\nFailed records:\n' + JSON.stringify(errors, null, 2) : ''}`,
hasErrors: parseInt(failCount) > 0,
duration
};
Email: Pipeline Summary
recipientAddress: {notifyEmail}
subject: Data Pipeline {pipelineRunId} - {successCount} ✓ / {failCount} ✗
contentType: text/html
body:
<h3>Pipeline Run Complete: {pipelineRunId}</h3>
<table>
<tr><td>Total Fetched</td><td>{totalFetched}</td></tr>
<tr><td>Valid Records</td><td>{validRecords}</td></tr>
<tr><td>Successfully Synced</td><td><b style="color:green">{successCount}</b></td></tr>
<tr><td>Failed</td><td><b style="color:{hasErrors == 'true' ? 'red' : 'green'}">{failCount}</b></td></tr>
<tr><td>Duration</td><td>{duration}s</td></tr>
</table>
<pre>{summaryText}</pre>
Error Handling Strategy
This pipeline uses per-record error handling:
- Each record has its own
errorFlowpath - Errors are collected in the
errorsarray - The pipeline continues to the next record even when one fails
- The final email summarizes all errors
This is the recommended pattern for data pipelines — you don't want one bad record to stop the entire run.
Scheduling the Pipeline
To run this pipeline on a schedule (e.g., every hour), configure it with a Trigger:
- Navigate to the workflow → Triggers tab
- Add a Cron Trigger
- Set the cron expression:
0 * * * *(every hour) - Set default input data (or leave empty for the workflow to use hardcoded endpoints)
The scheduler will automatically trigger the workflow according to the schedule.
Output
After completion:
{
"pipelineRunId": "run-20260220",
"totalFetched": 87,
"validRecords": 85,
"successCount": 84,
"failCount": 1,
"duration": 45,
"hasErrors": true,
"errors": [
{
"recordId": "ord-5521",
"error": "violates not-null constraint: customer_email",
"timestamp": "2026-02-20T10:00:32Z"
}
]
}
Build This Yourself
- Navigate to
/flows→ click New Flow → name it "Data Pipeline" → Create - From the Action Browser, drag onto canvas:
- Start Event
- Set Variable (name: "Initialize Counters")
- Service Task – REST (name: "Fetch Records")
- Script Task (name: "Transform Records")
- Loop Node (name: "Process Each Record")
- Service Task – SQL (name: "Upsert Record") — inside the loop
- Script Task (name: "Increment Success") — on successFlow
- Script Task (name: "Log Error") — on errorFlow
- Script Task (name: "Build Summary")
- Service Task – Email (name: "Pipeline Summary Report")
- End Event
- Connect: Start → Set Variable → REST → Script (Transform) → Loop → SQL
- SQL successFlow → Script (Increment Success) → Loop (back)
- SQL errorFlow → Script (Log Error) → Loop (back)
- Loop (after completion) → Script (Build Summary) → Email → End
- Configure Set Variable: add variables
successCount = 0,failCount = 0,errors = [] - Configure Loop Node:
- Collection:
records - Loop Variable:
record - Max Iterations:
500
- Collection:
- Configure the REST Service Task with your API connection and endpoint path
- Configure the SQL Service Task with the upsert query from the Node Configurations section above
- Configure the Email node with the summary template
- To schedule: open the Triggers tab → Add Trigger → Schedule → set cron expression (e.g.,
0 * * * *for hourly) - Click Save → Publish