Observability
How apptor flow captures, stores, and surfaces execution telemetry across all workflow runs.
Overview
apptor flow persists rich execution data in PostgreSQL as a natural by-product of running workflows. Every process execution, every individual node run, and every AI call writes detailed records with timing, state, and error information. This data powers the admin analytics dashboards without any additional instrumentation.
Two scopes are available:
| Scope | Who | Covers |
|---|---|---|
| Organization | Org Admin | Executions within a single tenant |
| System-wide | Super Admin | Executions across all tenants |
Data Model
Process Instance (process_instance)
Every workflow execution creates one process_instance record.
| Column | Type | Description |
|---|---|---|
id | UUID | Unique execution identifier |
process_meta_id | UUID | The workflow definition that was executed |
process_name | string | Workflow name at time of execution |
state_cd | int | Execution state (see below) |
start_time | timestamp | When execution started |
end_time | timestamp | When execution finished (null if still running) |
parent_instance_id | UUID | Parent process ID for subprocesses |
organization_id | string | Tenant scoping field |
variables | JSONB | Final variable state |
Process states:
state_cd | Label | Description |
|---|---|---|
1 | Active | Currently executing |
2 | Waiting | Paused at an Input Node or Catch Event |
3 | Aborted | Cancelled by a user or admin |
4 | Completed | Finished successfully |
Node Instance (node_instance)
One node_instance row is created for each node that executes within a process instance.
| Column | Type | Description |
|---|---|---|
id | UUID | Unique node execution identifier |
process_instance_id | UUID | Parent execution |
node_id | string | Node ID in the workflow definition |
node_name | string | Display name of the node |
node_type | string | Type (e.g., aiTask, serviceTask, ifElse) |
status_cd | int | Node execution status (see below) |
start_time | timestamp | When node execution started |
end_time | timestamp | When node execution finished |
error_details | string | Error message if status is 7 |
iteration | int | Loop iteration number (for nodes inside a Loop) |
variables | JSONB | Variables produced by this node |
organization_id | string | Tenant scoping field |
Node statuses:
status_cd | Label | Description |
|---|---|---|
5 | In Progress | Currently executing |
6 | Cancelled | Cancelled mid-execution |
7 | Error | Failed with an error |
8 | Completed | Finished successfully |
AI Execution History (ai_execution_history)
Every AI Task and Voice Task execution writes an ai_execution_history record.
| Column | Type | Description |
|---|---|---|
id | UUID | Record identifier |
process_instance_id | UUID | Parent execution |
node_instance_id | UUID | The AI Task node that produced this record |
model_provider | string | Provider name (e.g., OPENAI, ANTHROPIC) |
model_name | string | Model used (e.g., gpt-4o, claude-3-5-sonnet) |
tokens_used | int | Total tokens consumed (prompt + completion) |
cost_cents | int | Estimated cost in US cents × 100 (i.e., 1250 = $0.125) |
execution_time_ms | long | Time from request to response in milliseconds |
success | boolean | Whether the AI call succeeded |
tools_used | string[] | Tool names called during this execution |
organization_id | string | Tenant scoping field |
created_at | timestamp | When this record was written |
API Key Usage (api_key_usage)
Tracks every API request made with an API Key credential.
| Column | Type | Description |
|---|---|---|
api_key_id | UUID | Which API key was used |
endpoint | string | Request path (e.g., /process/execute) |
method | string | HTTP method |
status_code | int | HTTP response status |
response_time_ms | long | Latency of the request |
ip_address | string | Caller IP |
error_message | string | Error if request failed |
Admin Analytics Dashboards
The data above is surfaced through the Admin dashboards. See the Process Analytics UI guide for how to navigate and interpret the dashboards.
API Endpoints
All analytics data is served from the /api/admin/observability/ path. Endpoints:
| Endpoint | Description |
|---|---|
GET /summary | High-level counts and averages |
GET /process-stats | Process state distribution |
GET /process-trends | Daily execution counts over time |
GET /workflow-breakdown | Per-workflow execution counts |
GET /node-distribution | Node type usage counts |
GET /node-errors | Error hotspot rankings |
GET /node-performance | Slowest nodes by average duration |
GET /ai-summary | AI totals (calls, tokens, cost) |
GET /ai-model-breakdown | Per-model token and cost breakdown |
GET /ai-trends | Daily AI usage trends |
GET /ai-tools | Tool usage frequency |
GET /org-breakdown | Per-org comparison (Super Admin only) |
GET /recent-executions | Latest execution records |
Query parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
days | int | 30 | Look-back window |
orgId | string | (caller's org) | Override org scope (Super Admin only) |
Real-Time Log Streaming
In addition to the analytics data, apptor flow streams live execution logs via Server-Sent Events during an active execution.
GET /process/instance/{processInstanceId}/logs
Accept: text/event-stream
Each SSE event is a JSON ExecutionLogEntry:
{
"nodeId": "aiTask-1",
"nodeName": "Classify Ticket",
"nodeType": "aiTask",
"message": "Completed | classification=billing",
"level": "INFO",
"timestamp": "2025-01-15T10:00:03.500Z",
"variables": { "classification": "billing", "priority": "high" }
}
The Angular Execution Console subscribes to this stream and highlights the GoJS diagram as nodes complete. See Executing Workflows for the UI details.
Error Handling in Executions
Error Flow
When a node fails, the engine checks for an error flow connection originating from that node. If one exists, execution routes to the connected error handler node instead of stopping.
[AI Task] --error--> [Log Error Node] --> [End Event]
If no error flow is configured, the process instance transitions to state 4 (Completed) with the error recorded in the failed node's error_details.
Retry Configuration
Nodes that support retries (AI Task, Service Task) can be configured with:
| Property | Description |
|---|---|
retry.maxAttempts | How many times to retry on failure |
retry.delay | Wait time in seconds between retries |
retry.backoffMultiplier | Multiplier for exponential backoff |
The node's status_cd remains 5 (In Progress) during retries. If all retries are exhausted without success, the node transitions to 7 (Error).
Timeout Configuration
Nodes can be given a timeout in seconds. If the node does not complete within that time:
- The node transitions to status
7(Error) with message"TIMEOUT" - The engine routes execution along the timeout flow connection (if configured)
- If no timeout flow exists, the process continues to the next node on the default sequence flow
Planned: Runtime Observability (Phase 5)
The following observability features are planned for a future release and are not yet implemented:
| Feature | Description |
|---|---|
| Live queue depths | Real-time count of messages queued per actor type (from Hazelcast) |
| Actor thread utilization | How many threads are currently busy vs idle |
| JVM metrics | Heap memory, GC activity, thread counts |
| Dead Letter Queue (DLQ) | Messages that failed all retries and were moved to a DLQ for manual review |
| OpenTelemetry export | Distributed trace export to Jaeger, Zipkin, or OTLP-compatible backends |
| API Key Usage Dashboard | Analytics page for API key usage patterns |
| System Health endpoint | GET /api/admin/system-health with JVM and Hazelcast metrics |
The instrumentation hooks for OpenTelemetry are already in place in QueueConsumerProxy. Enabling them requires configuring an OTel exporter in application.yml.
Querying the Data Directly
For advanced analysis or one-off investigations, query the PostgreSQL database directly.
-- Process state distribution
SELECT state_cd, COUNT(*) FROM process_instance GROUP BY state_cd;
-- Today's execution counts by workflow
SELECT process_name, COUNT(*) as total,
SUM(CASE WHEN state_cd = 4 THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN state_cd = 7 THEN 1 ELSE 0 END) as errored
FROM process_instance
WHERE DATE(start_time) = CURRENT_DATE
GROUP BY process_name ORDER BY total DESC;
-- Average AI cost per workflow (last 30 days)
SELECT pi.process_name,
COUNT(aeh.id) as ai_calls,
SUM(aeh.tokens_used) as tokens,
SUM(aeh.cost_cents) / 100.0 as cost_usd
FROM ai_execution_history aeh
JOIN process_instance pi ON pi.id = aeh.process_instance_id
WHERE aeh.created_at >= NOW() - INTERVAL '30 days'
GROUP BY pi.process_name ORDER BY cost_usd DESC;
-- Error hotspots: nodes with most failures
SELECT node_type, node_name, COUNT(*) as error_count
FROM node_instance
WHERE status_cd = 7
AND start_time >= NOW() - INTERVAL '30 days'
GROUP BY node_type, node_name ORDER BY error_count DESC LIMIT 10;
-- Slowest nodes by average execution time
SELECT node_type, node_name,
ROUND(AVG(EXTRACT(EPOCH FROM (end_time - start_time)))::numeric, 2) as avg_secs,
COUNT(*) as total_runs
FROM node_instance
WHERE end_time IS NOT NULL
AND start_time >= NOW() - INTERVAL '30 days'
GROUP BY node_type, node_name ORDER BY avg_secs DESC LIMIT 10;