Event Stream Client Reference¶
Obra exposes two event-consumption paths: WS /api/events/stream for one connection that can multiplex workflow, run, and derivation subscriptions, and per-resource SSE routes for simpler single-resource consumers. Use this page as the copy-paste client reference, and see Gateway API for the broader route contract.
WebSocket Reference Example¶
The example below shows a standalone TypeScript client that authenticates, waits for the first non-error server message before treating the socket as ready, tracks the highest seen sequence per subscription, sends keepalive pings, and reconnects with after_sequence recovery.
type ResourceType = "workflow" | "workflow_run" | "workflow_derivation";
type SubscriptionTarget =
| { run_id: string; filter?: string[] }
| { workflow_id: string; filter?: string[] }
| { derivation_id: string; filter?: string[] };
type EventEnvelope = {
resource_type: string;
event_id: string;
event_name: string;
sequence: number;
timestamp: string;
workflow_run_id: string | null;
workflow_derivation_id: string | null;
payload: Record<string, unknown>;
correlation: Record<string, unknown>;
stream_resource_type: ResourceType;
stream_resource_id: string;
subscription_id: string;
};
class EventStreamClient {
private socket: WebSocket | null = null;
private attempts = 0;
private ready = false;
private keepaliveTimer: ReturnType<typeof setInterval> | null = null;
private pongTimer: ReturnType<typeof setTimeout> | null = null;
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
private readonly lastSeenSequence = new Map<string, number>();
constructor(
private readonly baseUrl: string,
private readonly token: string,
private readonly subscriptions: SubscriptionTarget[],
) {}
connect() {
const wsUrl = this.baseUrl.replace(/^http/, "ws").replace(/\/$/, "") + "/api/events/stream";
this.socket = new WebSocket(wsUrl);
this.socket.addEventListener("open", () => {
this.ready = false;
this.socket?.send(JSON.stringify({ type: "auth", token: this.token }));
this.socket?.send(
JSON.stringify({
type: "subscribe",
subscriptions: this.subscriptions.map((subscription) =>
this.withAfterSequence(subscription),
),
}),
);
});
this.socket.addEventListener("message", (message) => {
const payload = JSON.parse(String(message.data)) as
| EventEnvelope
| { type?: string; detail?: string };
if (payload.type === "error") {
console.error("Event stream error:", payload.detail ?? "Authentication failed");
return;
}
if (!this.ready) {
this.ready = true;
this.attempts = 0;
this.startKeepalive();
}
if (payload.type === "subscribed" || payload.type === "unsubscribed") {
return;
}
if (payload.type === "pong") {
this.clearPongTimer();
return;
}
const event = payload as EventEnvelope;
const key = `${event.stream_resource_type}:${event.stream_resource_id}`;
this.lastSeenSequence.set(key, Math.max(this.lastSeenSequence.get(key) ?? 0, event.sequence));
console.log(`[${event.sequence}] ${event.event_name}`, event.payload);
});
this.socket.addEventListener("close", () => {
this.stopKeepalive();
this.ready = false;
const timeoutMs = Math.min(1000 * 2 ** this.attempts, 10000);
this.attempts += 1;
this.reconnectTimer = setTimeout(() => this.connect(), timeoutMs);
});
}
disconnect() {
const socket = this.socket;
this.stopKeepalive();
if (socket?.readyState === WebSocket.OPEN) {
socket.send(
JSON.stringify({
type: "unsubscribe",
subscriptions: this.subscriptions.map((subscription) =>
this.withAfterSequence(subscription),
),
}),
);
socket.close();
}
}
private startKeepalive() {
this.stopKeepalive();
this.keepaliveTimer = setInterval(() => {
if (this.socket?.readyState !== WebSocket.OPEN) {
return;
}
this.socket.send(JSON.stringify({ type: "ping" }));
this.clearPongTimer();
this.pongTimer = setTimeout(() => {
this.socket?.close();
}, 5000);
}, 30000);
}
private stopKeepalive() {
if (this.keepaliveTimer) {
clearInterval(this.keepaliveTimer);
this.keepaliveTimer = null;
}
this.clearPongTimer();
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
}
private clearPongTimer() {
if (this.pongTimer) {
clearTimeout(this.pongTimer);
this.pongTimer = null;
}
}
private withAfterSequence(subscription: SubscriptionTarget) {
const key = subscriptionKey(subscription);
return {
...subscription,
filter: subscription.filter ?? [],
after_sequence: this.lastSeenSequence.get(key) ?? 0,
};
}
}
function subscriptionKey(subscription: SubscriptionTarget): string {
if ("run_id" in subscription) {
return `workflow_run:${subscription.run_id}`;
}
if ("workflow_id" in subscription) {
return `workflow:${subscription.workflow_id}`;
}
return `workflow_derivation:${subscription.derivation_id}`;
}
const client = new EventStreamClient("http://127.0.0.1:18790", "<token>", [
{ run_id: "<run-id>", filter: [] },
]);
client.connect();
setTimeout(() => client.disconnect(), 120000);
SSE Reference Example¶
Use the SSE routes when you only need one resource stream. EventSource is not a fit here because it cannot send the required bearer token header, so use fetch plus a ReadableStream.
async function streamRunEvents(baseUrl: string, token: string, runId: string) {
const decoder = new TextDecoder();
while (true) {
const response = await fetch(
`${baseUrl.replace(/\/$/, "")}/api/workflow-runs/${runId}/events`,
{
headers: {
Authorization: `Bearer ${token}`,
},
},
);
if (!response.ok || !response.body) {
throw new Error(`Unable to open SSE stream: ${response.status}`);
}
const reader = response.body.getReader();
let buffer = "";
let eventType = "message";
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
buffer += decoder.decode(value, { stream: true });
const frames = buffer.split("\n");
buffer = frames.pop() ?? "";
for (const line of frames) {
if (line.startsWith("event:")) {
eventType = line.slice(6).trim();
continue;
}
if (line.startsWith("data:")) {
const payload = JSON.parse(line.slice(5).trim()) as Record<string, unknown>;
console.log(eventType, payload);
continue;
}
if (line.trim() === "") {
eventType = "message";
}
}
}
} finally {
reader.releaseLock();
}
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
void streamRunEvents("http://127.0.0.1:18790", "<token>", "<run-id>");
Event Envelope¶
The WebSocket event payload is the AsyncAPI EventEnvelope. Fields added by the WebSocket transport layer are noted explicitly.
| Field | Type | Required | Source | Notes |
|---|---|---|---|---|
resource_type |
string | Yes | Projection-native | Projection envelope type. Current value is workflow_event. |
event_id |
string | Yes | Projection-native | Canonical event identifier. |
event_name |
string | Yes | Projection-native | Canonical event name such as workflow_run.completed. |
sequence |
integer | Yes | Projection-native | Monotonically increasing sequence for the resource stream. |
timestamp |
string (date-time) |
Yes | Projection-native | Event timestamp in ISO 8601 form. |
workflow_run_id |
string or null |
Yes | Projection-native | Populated for run-backed events. |
workflow_derivation_id |
string or null |
Yes | Projection-native | Populated for derivation-backed events. |
payload |
object | Yes | Projection-native | Event-specific data. Progress payloads often include action. |
correlation |
object | Yes | Projection-native | Correlation bundle containing workflow/run/revision/domain context. |
stream_resource_type |
workflow | workflow_run | workflow_derivation |
Yes | Transport-added | Added by the WebSocket layer for the active subscription. |
stream_resource_id |
string | Yes | Transport-added | Resource identifier tied to the active subscription. |
subscription_id |
string | Yes | Transport-added | Subscription key in resource_type:resource_id form. |
Example workflow_run.completed envelope:
{
"resource_type": "workflow_event",
"event_id": "run-123:42",
"event_name": "workflow_run.completed",
"sequence": 42,
"timestamp": "2026-03-30T18:30:12.123456+00:00",
"workflow_run_id": "run-123",
"workflow_derivation_id": null,
"payload": {
"run_id": "run-123",
"workflow_run_id": "run-123",
"session_id": "session-123",
"summary": "Run completed"
},
"correlation": {
"workflow_run_id": "run-123",
"workflow_derivation_id": null,
"workflow_id": "WORKFLOW-CLAIMS-001",
"workflow_revision_id": "rev-12",
"domain_id": "business",
"checkpoint_id": null,
"escalation_id": null,
"trace_id": null,
"base_revision_id": null,
"current_revision_id": null,
"resulting_revision_id": null
},
"stream_resource_type": "workflow_run",
"stream_resource_id": "run-123",
"subscription_id": "workflow_run:run-123"
}
Protocol Walkthrough¶
The WebSocket lifecycle is short and strict: connect, authenticate, subscribe, process event envelopes and keepalive frames, then resubscribe with after_sequence after reconnects.
1. Connect¶
Open a WebSocket connection to the gateway event-stream route.
2. Authenticate¶
Send the bearer token as the first frame. The server does not send an explicit auth-success frame. On failure it sends an error and closes with code 4001.
3. Subscribe¶
Send one or more subscriptions after the auth frame.
{
"type": "subscribe",
"subscriptions": [
{
"run_id": "<id>",
"filter": ["workflow_run.completed"],
"after_sequence": 0
}
]
}
Server acknowledgement:
{
"type": "subscribed",
"subscriptions": [
{
"subscription_id": "workflow_run:<id>",
"resource_type": "workflow_run",
"resource_id": "<id>",
"after_sequence": 0,
"filter": ["workflow_run.completed"]
}
]
}
4. Receive Events¶
Application data arrives as EventEnvelope JSON objects.
{
"event_name": "workflow_run.completed",
"sequence": 42,
"timestamp": "2026-03-30T18:30:12.123456+00:00",
"payload": {
"summary": "Run completed"
},
"stream_resource_type": "workflow_run",
"stream_resource_id": "run-123",
"subscription_id": "workflow_run:run-123"
}
5. Keepalive¶
Send a ping every 30 seconds. Treat the connection as dead if the matching pong does not arrive within 5 seconds.
Server response:
6. Reconnect¶
Reconnect with exponential backoff and resubscribe using the highest seen after_sequence per subscription.
{
"type": "subscribe",
"subscriptions": [
{
"run_id": "<id>",
"filter": ["workflow_run.completed"],
"after_sequence": 42
}
]
}
7. Unsubscribe¶
Remove an active subscription before shutdown when you no longer need it.
Server response:
{
"type": "unsubscribed",
"subscriptions": [
{
"subscription_id": "workflow_run:<id>",
"resource_type": "workflow_run",
"resource_id": "<id>"
}
]
}
8. Close¶
Close the WebSocket when the client is finished.
Common Patterns¶
Watch A Single Run To Completion¶
function waitForRunTerminalEvent(client: EventStreamClient): Promise<EventEnvelope> {
return new Promise((resolve) => {
const originalLog = console.log;
console.log = (...args) => {
const [label, payload] = args as [string, EventEnvelope];
if (
payload?.event_name === "workflow_run.completed" ||
payload?.event_name === "workflow_run.failed"
) {
console.log = originalLog;
resolve(payload);
}
originalLog(label, payload);
};
client.connect();
});
}
Subscribe To Multiple Resources¶
const subscriptions = [
{ run_id: "run-123", filter: [] },
{ workflow_id: "WORKFLOW-CLAIMS-001", filter: ["workflow.updated"] },
];
const client = new EventStreamClient("http://127.0.0.1:18790", "<token>", subscriptions);
client.connect();