Skip to main content

Observability

How apptor flow captures, stores, and surfaces execution telemetry across all workflow runs.


Overview

apptor flow persists rich execution data in PostgreSQL as a natural by-product of running workflows. Every process execution, every individual node run, and every AI call writes detailed records with timing, state, and error information. This data powers the admin analytics dashboards without any additional instrumentation.

Two scopes are available:

ScopeWhoCovers
OrganizationOrg AdminExecutions within a single tenant
System-wideSuper AdminExecutions across all tenants

Data Model

Process Instance (process_instance)

Every workflow execution creates one process_instance record.

ColumnTypeDescription
idUUIDUnique execution identifier
process_meta_idUUIDThe workflow definition that was executed
process_namestringWorkflow name at time of execution
state_cdintExecution state (see below)
start_timetimestampWhen execution started
end_timetimestampWhen execution finished (null if still running)
parent_instance_idUUIDParent process ID for subprocesses
organization_idstringTenant scoping field
variablesJSONBFinal variable state

Process states:

state_cdLabelDescription
1ActiveCurrently executing
2WaitingPaused at an Input Node or Catch Event
3AbortedCancelled by a user or admin
4CompletedFinished successfully

Node Instance (node_instance)

One node_instance row is created for each node that executes within a process instance.

ColumnTypeDescription
idUUIDUnique node execution identifier
process_instance_idUUIDParent execution
node_idstringNode ID in the workflow definition
node_namestringDisplay name of the node
node_typestringType (e.g., aiTask, serviceTask, ifElse)
status_cdintNode execution status (see below)
start_timetimestampWhen node execution started
end_timetimestampWhen node execution finished
error_detailsstringError message if status is 7
iterationintLoop iteration number (for nodes inside a Loop)
variablesJSONBVariables produced by this node
organization_idstringTenant scoping field

Node statuses:

status_cdLabelDescription
5In ProgressCurrently executing
6CancelledCancelled mid-execution
7ErrorFailed with an error
8CompletedFinished successfully

AI Execution History (ai_execution_history)

Every AI Task and Voice Task execution writes an ai_execution_history record.

ColumnTypeDescription
idUUIDRecord identifier
process_instance_idUUIDParent execution
node_instance_idUUIDThe AI Task node that produced this record
model_providerstringProvider name (e.g., OPENAI, ANTHROPIC)
model_namestringModel used (e.g., gpt-4o, claude-3-5-sonnet)
tokens_usedintTotal tokens consumed (prompt + completion)
cost_centsintEstimated cost in US cents × 100 (i.e., 1250 = $0.125)
execution_time_mslongTime from request to response in milliseconds
successbooleanWhether the AI call succeeded
tools_usedstring[]Tool names called during this execution
organization_idstringTenant scoping field
created_attimestampWhen this record was written

API Key Usage (api_key_usage)

Tracks every API request made with an API Key credential.

ColumnTypeDescription
api_key_idUUIDWhich API key was used
endpointstringRequest path (e.g., /process/execute)
methodstringHTTP method
status_codeintHTTP response status
response_time_mslongLatency of the request
ip_addressstringCaller IP
error_messagestringError if request failed

Admin Analytics Dashboards

The data above is surfaced through the Admin dashboards. See the Process Analytics UI guide for how to navigate and interpret the dashboards.

API Endpoints

All analytics data is served from the /api/admin/observability/ path. Endpoints:

EndpointDescription
GET /summaryHigh-level counts and averages
GET /process-statsProcess state distribution
GET /process-trendsDaily execution counts over time
GET /workflow-breakdownPer-workflow execution counts
GET /node-distributionNode type usage counts
GET /node-errorsError hotspot rankings
GET /node-performanceSlowest nodes by average duration
GET /ai-summaryAI totals (calls, tokens, cost)
GET /ai-model-breakdownPer-model token and cost breakdown
GET /ai-trendsDaily AI usage trends
GET /ai-toolsTool usage frequency
GET /org-breakdownPer-org comparison (Super Admin only)
GET /recent-executionsLatest execution records

Query parameters:

ParameterTypeDefaultDescription
daysint30Look-back window
orgIdstring(caller's org)Override org scope (Super Admin only)

Real-Time Log Streaming

In addition to the analytics data, apptor flow streams live execution logs via Server-Sent Events during an active execution.

GET /process/instance/{processInstanceId}/logs
Accept: text/event-stream

Each SSE event is a JSON ExecutionLogEntry:

{
"nodeId": "aiTask-1",
"nodeName": "Classify Ticket",
"nodeType": "aiTask",
"message": "Completed | classification=billing",
"level": "INFO",
"timestamp": "2025-01-15T10:00:03.500Z",
"variables": { "classification": "billing", "priority": "high" }
}

The Angular Execution Console subscribes to this stream and highlights the GoJS diagram as nodes complete. See Executing Workflows for the UI details.


Error Handling in Executions

Error Flow

When a node fails, the engine checks for an error flow connection originating from that node. If one exists, execution routes to the connected error handler node instead of stopping.

[AI Task] --error--> [Log Error Node] --> [End Event]

If no error flow is configured, the process instance transitions to state 4 (Completed) with the error recorded in the failed node's error_details.

Retry Configuration

Nodes that support retries (AI Task, Service Task) can be configured with:

PropertyDescription
retry.maxAttemptsHow many times to retry on failure
retry.delayWait time in seconds between retries
retry.backoffMultiplierMultiplier for exponential backoff

The node's status_cd remains 5 (In Progress) during retries. If all retries are exhausted without success, the node transitions to 7 (Error).

Timeout Configuration

Nodes can be given a timeout in seconds. If the node does not complete within that time:

  1. The node transitions to status 7 (Error) with message "TIMEOUT"
  2. The engine routes execution along the timeout flow connection (if configured)
  3. If no timeout flow exists, the process continues to the next node on the default sequence flow

Planned: Runtime Observability (Phase 5)

The following observability features are planned for a future release and are not yet implemented:

FeatureDescription
Live queue depthsReal-time count of messages queued per actor type (from Hazelcast)
Actor thread utilizationHow many threads are currently busy vs idle
JVM metricsHeap memory, GC activity, thread counts
Dead Letter Queue (DLQ)Messages that failed all retries and were moved to a DLQ for manual review
OpenTelemetry exportDistributed trace export to Jaeger, Zipkin, or OTLP-compatible backends
API Key Usage DashboardAnalytics page for API key usage patterns
System Health endpointGET /api/admin/system-health with JVM and Hazelcast metrics

The instrumentation hooks for OpenTelemetry are already in place in QueueConsumerProxy. Enabling them requires configuring an OTel exporter in application.yml.


Querying the Data Directly

For advanced analysis or one-off investigations, query the PostgreSQL database directly.

-- Process state distribution
SELECT state_cd, COUNT(*) FROM process_instance GROUP BY state_cd;

-- Today's execution counts by workflow
SELECT process_name, COUNT(*) as total,
SUM(CASE WHEN state_cd = 4 THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN state_cd = 7 THEN 1 ELSE 0 END) as errored
FROM process_instance
WHERE DATE(start_time) = CURRENT_DATE
GROUP BY process_name ORDER BY total DESC;

-- Average AI cost per workflow (last 30 days)
SELECT pi.process_name,
COUNT(aeh.id) as ai_calls,
SUM(aeh.tokens_used) as tokens,
SUM(aeh.cost_cents) / 100.0 as cost_usd
FROM ai_execution_history aeh
JOIN process_instance pi ON pi.id = aeh.process_instance_id
WHERE aeh.created_at >= NOW() - INTERVAL '30 days'
GROUP BY pi.process_name ORDER BY cost_usd DESC;

-- Error hotspots: nodes with most failures
SELECT node_type, node_name, COUNT(*) as error_count
FROM node_instance
WHERE status_cd = 7
AND start_time >= NOW() - INTERVAL '30 days'
GROUP BY node_type, node_name ORDER BY error_count DESC LIMIT 10;

-- Slowest nodes by average execution time
SELECT node_type, node_name,
ROUND(AVG(EXTRACT(EPOCH FROM (end_time - start_time)))::numeric, 2) as avg_secs,
COUNT(*) as total_runs
FROM node_instance
WHERE end_time IS NOT NULL
AND start_time >= NOW() - INTERVAL '30 days'
GROUP BY node_type, node_name ORDER BY avg_secs DESC LIMIT 10;