Skip to content

Event Stream Client Reference

Obra exposes two event-consumption paths: WS /api/events/stream for one connection that can multiplex workflow, run, and derivation subscriptions, and per-resource SSE routes for simpler single-resource consumers. Use this page as the copy-paste client reference, and see Gateway API for the broader route contract.

WebSocket Reference Example

The example below shows a standalone TypeScript client that authenticates, waits for the first non-error server message before treating the socket as ready, tracks the highest seen sequence per subscription, sends keepalive pings, and reconnects with after_sequence recovery.

type ResourceType = "workflow" | "workflow_run" | "workflow_derivation";

type SubscriptionTarget =
  | { run_id: string; filter?: string[] }
  | { workflow_id: string; filter?: string[] }
  | { derivation_id: string; filter?: string[] };

type EventEnvelope = {
  resource_type: string;
  event_id: string;
  event_name: string;
  sequence: number;
  timestamp: string;
  workflow_run_id: string | null;
  workflow_derivation_id: string | null;
  payload: Record<string, unknown>;
  correlation: Record<string, unknown>;
  stream_resource_type: ResourceType;
  stream_resource_id: string;
  subscription_id: string;
};

class EventStreamClient {
  private socket: WebSocket | null = null;
  private attempts = 0;
  private ready = false;
  private keepaliveTimer: ReturnType<typeof setInterval> | null = null;
  private pongTimer: ReturnType<typeof setTimeout> | null = null;
  private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
  private readonly lastSeenSequence = new Map<string, number>();

  constructor(
    private readonly baseUrl: string,
    private readonly token: string,
    private readonly subscriptions: SubscriptionTarget[],
  ) {}

  connect() {
    const wsUrl = this.baseUrl.replace(/^http/, "ws").replace(/\/$/, "") + "/api/events/stream";
    this.socket = new WebSocket(wsUrl);

    this.socket.addEventListener("open", () => {
      this.ready = false;
      this.socket?.send(JSON.stringify({ type: "auth", token: this.token }));
      this.socket?.send(
        JSON.stringify({
          type: "subscribe",
          subscriptions: this.subscriptions.map((subscription) =>
            this.withAfterSequence(subscription),
          ),
        }),
      );
    });

    this.socket.addEventListener("message", (message) => {
      const payload = JSON.parse(String(message.data)) as
        | EventEnvelope
        | { type?: string; detail?: string };

      if (payload.type === "error") {
        console.error("Event stream error:", payload.detail ?? "Authentication failed");
        return;
      }

      if (!this.ready) {
        this.ready = true;
        this.attempts = 0;
        this.startKeepalive();
      }

      if (payload.type === "subscribed" || payload.type === "unsubscribed") {
        return;
      }

      if (payload.type === "pong") {
        this.clearPongTimer();
        return;
      }

      const event = payload as EventEnvelope;
      const key = `${event.stream_resource_type}:${event.stream_resource_id}`;
      this.lastSeenSequence.set(key, Math.max(this.lastSeenSequence.get(key) ?? 0, event.sequence));

      console.log(`[${event.sequence}] ${event.event_name}`, event.payload);
    });

    this.socket.addEventListener("close", () => {
      this.stopKeepalive();
      this.ready = false;
      const timeoutMs = Math.min(1000 * 2 ** this.attempts, 10000);
      this.attempts += 1;
      this.reconnectTimer = setTimeout(() => this.connect(), timeoutMs);
    });
  }

  disconnect() {
    const socket = this.socket;
    this.stopKeepalive();
    if (socket?.readyState === WebSocket.OPEN) {
      socket.send(
        JSON.stringify({
          type: "unsubscribe",
          subscriptions: this.subscriptions.map((subscription) =>
            this.withAfterSequence(subscription),
          ),
        }),
      );
      socket.close();
    }
  }

  private startKeepalive() {
    this.stopKeepalive();
    this.keepaliveTimer = setInterval(() => {
      if (this.socket?.readyState !== WebSocket.OPEN) {
        return;
      }
      this.socket.send(JSON.stringify({ type: "ping" }));
      this.clearPongTimer();
      this.pongTimer = setTimeout(() => {
        this.socket?.close();
      }, 5000);
    }, 30000);
  }

  private stopKeepalive() {
    if (this.keepaliveTimer) {
      clearInterval(this.keepaliveTimer);
      this.keepaliveTimer = null;
    }
    this.clearPongTimer();
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer);
      this.reconnectTimer = null;
    }
  }

  private clearPongTimer() {
    if (this.pongTimer) {
      clearTimeout(this.pongTimer);
      this.pongTimer = null;
    }
  }

  private withAfterSequence(subscription: SubscriptionTarget) {
    const key = subscriptionKey(subscription);
    return {
      ...subscription,
      filter: subscription.filter ?? [],
      after_sequence: this.lastSeenSequence.get(key) ?? 0,
    };
  }
}

function subscriptionKey(subscription: SubscriptionTarget): string {
  if ("run_id" in subscription) {
    return `workflow_run:${subscription.run_id}`;
  }
  if ("workflow_id" in subscription) {
    return `workflow:${subscription.workflow_id}`;
  }
  return `workflow_derivation:${subscription.derivation_id}`;
}

const client = new EventStreamClient("http://127.0.0.1:18790", "<token>", [
  { run_id: "<run-id>", filter: [] },
]);

client.connect();

setTimeout(() => client.disconnect(), 120000);

SSE Reference Example

Use the SSE routes when you only need one resource stream. EventSource is not a fit here because it cannot send the required bearer token header, so use fetch plus a ReadableStream.

async function streamRunEvents(baseUrl: string, token: string, runId: string) {
  const decoder = new TextDecoder();

  while (true) {
    const response = await fetch(
      `${baseUrl.replace(/\/$/, "")}/api/workflow-runs/${runId}/events`,
      {
        headers: {
          Authorization: `Bearer ${token}`,
        },
      },
    );

    if (!response.ok || !response.body) {
      throw new Error(`Unable to open SSE stream: ${response.status}`);
    }

    const reader = response.body.getReader();
    let buffer = "";
    let eventType = "message";

    try {
      while (true) {
        const { done, value } = await reader.read();
        if (done) {
          break;
        }

        buffer += decoder.decode(value, { stream: true });
        const frames = buffer.split("\n");
        buffer = frames.pop() ?? "";

        for (const line of frames) {
          if (line.startsWith("event:")) {
            eventType = line.slice(6).trim();
            continue;
          }
          if (line.startsWith("data:")) {
            const payload = JSON.parse(line.slice(5).trim()) as Record<string, unknown>;
            console.log(eventType, payload);
            continue;
          }
          if (line.trim() === "") {
            eventType = "message";
          }
        }
      }
    } finally {
      reader.releaseLock();
    }

    await new Promise((resolve) => setTimeout(resolve, 1000));
  }
}

void streamRunEvents("http://127.0.0.1:18790", "<token>", "<run-id>");

Event Envelope

The WebSocket event payload is the AsyncAPI EventEnvelope. Fields added by the WebSocket transport layer are noted explicitly.

Field Type Required Source Notes
resource_type string Yes Projection-native Projection envelope type. Current value is workflow_event.
event_id string Yes Projection-native Canonical event identifier.
event_name string Yes Projection-native Canonical event name such as workflow_run.completed.
sequence integer Yes Projection-native Monotonically increasing sequence for the resource stream.
timestamp string (date-time) Yes Projection-native Event timestamp in ISO 8601 form.
workflow_run_id string or null Yes Projection-native Populated for run-backed events.
workflow_derivation_id string or null Yes Projection-native Populated for derivation-backed events.
payload object Yes Projection-native Event-specific data. Progress payloads often include action.
correlation object Yes Projection-native Correlation bundle containing workflow/run/revision/domain context.
stream_resource_type workflow | workflow_run | workflow_derivation Yes Transport-added Added by the WebSocket layer for the active subscription.
stream_resource_id string Yes Transport-added Resource identifier tied to the active subscription.
subscription_id string Yes Transport-added Subscription key in resource_type:resource_id form.

Example workflow_run.completed envelope:

{
  "resource_type": "workflow_event",
  "event_id": "run-123:42",
  "event_name": "workflow_run.completed",
  "sequence": 42,
  "timestamp": "2026-03-30T18:30:12.123456+00:00",
  "workflow_run_id": "run-123",
  "workflow_derivation_id": null,
  "payload": {
    "run_id": "run-123",
    "workflow_run_id": "run-123",
    "session_id": "session-123",
    "summary": "Run completed"
  },
  "correlation": {
    "workflow_run_id": "run-123",
    "workflow_derivation_id": null,
    "workflow_id": "WORKFLOW-CLAIMS-001",
    "workflow_revision_id": "rev-12",
    "domain_id": "business",
    "checkpoint_id": null,
    "escalation_id": null,
    "trace_id": null,
    "base_revision_id": null,
    "current_revision_id": null,
    "resulting_revision_id": null
  },
  "stream_resource_type": "workflow_run",
  "stream_resource_id": "run-123",
  "subscription_id": "workflow_run:run-123"
}

Protocol Walkthrough

The WebSocket lifecycle is short and strict: connect, authenticate, subscribe, process event envelopes and keepalive frames, then resubscribe with after_sequence after reconnects.

1. Connect

Open a WebSocket connection to the gateway event-stream route.

{
  "url": "ws://127.0.0.1:18790/api/events/stream"
}

2. Authenticate

Send the bearer token as the first frame. The server does not send an explicit auth-success frame. On failure it sends an error and closes with code 4001.

{
  "type": "auth",
  "token": "<token>"
}

3. Subscribe

Send one or more subscriptions after the auth frame.

{
  "type": "subscribe",
  "subscriptions": [
    {
      "run_id": "<id>",
      "filter": ["workflow_run.completed"],
      "after_sequence": 0
    }
  ]
}

Server acknowledgement:

{
  "type": "subscribed",
  "subscriptions": [
    {
      "subscription_id": "workflow_run:<id>",
      "resource_type": "workflow_run",
      "resource_id": "<id>",
      "after_sequence": 0,
      "filter": ["workflow_run.completed"]
    }
  ]
}

4. Receive Events

Application data arrives as EventEnvelope JSON objects.

{
  "event_name": "workflow_run.completed",
  "sequence": 42,
  "timestamp": "2026-03-30T18:30:12.123456+00:00",
  "payload": {
    "summary": "Run completed"
  },
  "stream_resource_type": "workflow_run",
  "stream_resource_id": "run-123",
  "subscription_id": "workflow_run:run-123"
}

5. Keepalive

Send a ping every 30 seconds. Treat the connection as dead if the matching pong does not arrive within 5 seconds.

{
  "type": "ping"
}

Server response:

{
  "type": "pong"
}

6. Reconnect

Reconnect with exponential backoff and resubscribe using the highest seen after_sequence per subscription.

{
  "type": "subscribe",
  "subscriptions": [
    {
      "run_id": "<id>",
      "filter": ["workflow_run.completed"],
      "after_sequence": 42
    }
  ]
}

7. Unsubscribe

Remove an active subscription before shutdown when you no longer need it.

{
  "type": "unsubscribe",
  "subscriptions": [
    {
      "run_id": "<id>"
    }
  ]
}

Server response:

{
  "type": "unsubscribed",
  "subscriptions": [
    {
      "subscription_id": "workflow_run:<id>",
      "resource_type": "workflow_run",
      "resource_id": "<id>"
    }
  ]
}

8. Close

Close the WebSocket when the client is finished.

{
  "action": "socket.close()"
}

Common Patterns

Watch A Single Run To Completion

function waitForRunTerminalEvent(client: EventStreamClient): Promise<EventEnvelope> {
  return new Promise((resolve) => {
    const originalLog = console.log;
    console.log = (...args) => {
      const [label, payload] = args as [string, EventEnvelope];
      if (
        payload?.event_name === "workflow_run.completed" ||
        payload?.event_name === "workflow_run.failed"
      ) {
        console.log = originalLog;
        resolve(payload);
      }
      originalLog(label, payload);
    };
    client.connect();
  });
}

Subscribe To Multiple Resources

const subscriptions = [
  { run_id: "run-123", filter: [] },
  { workflow_id: "WORKFLOW-CLAIMS-001", filter: ["workflow.updated"] },
];

const client = new EventStreamClient("http://127.0.0.1:18790", "<token>", subscriptions);
client.connect();

Filter Events By Name

const filtered = {
  run_id: "run-123",
  filter: ["workflow_run.completed", "workflow_run.blocked"],
};

const client = new EventStreamClient("http://127.0.0.1:18790", "<token>", [filtered]);
client.connect();