Skip to content

Messaging Architecture — QStash

Migration complete: RabbitMQ has been replaced by Upstash QStash for all async messaging in dot-cOS.

Overview

dot-cOS uses QStash (Upstash) for asynchronous event delivery. QStash is an HTTP-based message queue — instead of a persistent AMQP consumer, services expose HTTP webhook endpoints that QStash calls with automatic retries.

Architecture

graph LR
    subgraph dot_cOS_Services
        WS[Workflow Service\ncos-workflow.dotevolve.net]
        RE[Rule Engine\ncos-rules.dotevolve.net]
        FE[Frontend\nSSE stream]
    end

    subgraph QStash["QStash (Upstash)"]
        Q[Message Queue\nHTTP delivery\nAuto-retry]
    end

    WS -->|publishProfileUpdate\nPOST to QStash| Q
    WS -->|publishExtractionCompleted\nPOST to QStash| Q
    Q -->|POST /webhook/profile-updated| WS
    Q -->|POST /webhook/extraction-completed| WS
    WS -->|sseEmitter.emit| FE

How It Works

sequenceDiagram
    participant WS as Workflow Service
    participant QS as QStash
    participant WH as Webhook Endpoint\n(same service)
    participant SSE as SSE Clients

    WS->>QS: POST /v2/publish\n{ url: /webhook/profile-updated, body: payload }
    QS-->>WS: 200 OK { messageId }

    note over QS: QStash delivers with retries
    QS->>WH: POST /api/v1/events/webhook/profile-updated\n(with Upstash-Signature header)
    WH->>WH: Verify QStash signature
    WH->>SSE: sseEmitter.emit("profile.updated", payload)
    WH-->>QS: 200 OK (prevents retry)

Webhook Endpoints

The Workflow Service exposes three webhook endpoints that QStash calls:

Endpoint Event Triggered by
POST /api/v1/events/webhook/profile-updated Entity profile changed publishProfileUpdate()
POST /api/v1/events/webhook/extraction-completed IDP extraction done publishExtractionCompleted()
POST /api/v1/events/webhook/extraction-failed IDP extraction failed publishExtractionCompleted({ error })

All endpoints verify the Upstash-Signature header before processing.

Publishing Events

The publisher API is unchanged from the RabbitMQ version — same function signatures:

const {
  publishProfileUpdate,
  publishExtractionCompleted,
} = require("./eventPublisher.service");

// Publish a profile update
await publishProfileUpdate({
  entityId: "entity-uuid",
  tenantId: "tenant-uuid",
  userId: "user-uuid",
  changes: [{ field: "name", oldValue: "Old", newValue: "New" }],
  version: 5,
});

// Publish extraction result
await publishExtractionCompleted({
  jobId: "job-uuid",
  entityId: "entity-uuid",
  tenantId: "tenant-uuid",
  documentType: "PAN_CARD",
  extractedData: { panNumber: "ABCDE1234F" },
});

Internally, publishProfileUpdate calls QStash:

await qstash.publishJSON({
  url: `${SERVICE_BASE_URL}/api/v1/events/webhook/profile-updated`,
  body: payload,
  retries: 3,
});

Receiving Events (SSE)

Clients subscribe to real-time events via Server-Sent Events — unchanged from before:

const eventSource = new EventSource(
  `${API_URL}/api/v1/events/stream?tenantId=${tenantId}`,
);

eventSource.addEventListener("message", (e) => {
  const payload = JSON.parse(e.data);
  if (payload.eventType === "profile.updated") {
    // Update UI
  }
});

Retry Behaviour

QStash retries failed deliveries automatically:

flowchart TD
    A[QStash delivers webhook] --> B{HTTP 2xx?}
    B -->|Yes| C[Message acknowledged]
    B -->|No / Timeout| D[Retry with backoff]
    D --> E{Retry 1}
    E -->|Fail| F{Retry 2}
    F -->|Fail| G{Retry 3}
    G -->|Fail| H[Dead Letter Queue\nin QStash console]

Default: 3 retries with exponential backoff. Configurable per-publish.

Signature Verification

Every webhook verifies the QStash signature to prevent spoofed requests:

const { Receiver } = require("@upstash/qstash");

const receiver = new Receiver({
  currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY,
  nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY,
});

await receiver.verify({
  signature: req.headers["upstash-signature"],
  body: JSON.stringify(req.body),
  url: `${SERVICE_BASE_URL}/api/v1/events/webhook/profile-updated`,
});

Environment Variables

# QStash — single instance for all DotEvolve services
QSTASH_TOKEN=eyJ...
QSTASH_CURRENT_SIGNING_KEY=sig_6SP1B9R9ZLv2t4RT85A9Z9nrPnSU
QSTASH_NEXT_SIGNING_KEY=sig_5MC99iuFLfzKGtvpicpU6rUCmUwK

# Service base URL (QStash needs a public URL to call back)
# Set automatically by Vercel via VERCEL_URL
SERVICE_BASE_URL=https://cos-workflow.dotevolve.net

Local Development

In local development without a public URL, QStash cannot call back. The publisher falls back to direct sseEmitter.emit():

if (!process.env.QSTASH_TOKEN) {
  // Dev fallback: emit directly without QStash
  sseEmitter.emit(eventType, payload);
  return true;
}

To test QStash locally, use ngrok to expose your local port:

ngrok http 8001
# Set SERVICE_BASE_URL=https://your-ngrok-url.ngrok.io

Comparison: Before vs After

Aspect RabbitMQ (old) QStash (current)
Protocol AMQP HTTP
Consumer Persistent process HTTP webhook endpoint
Retries Manual (nack/requeue) Built-in, configurable
Local dev Docker container Direct emit fallback
Monitoring RabbitMQ Management UI QStash console
Cost $19/mo (CloudAMQP) $0–$1/mo (pay-per-use)
Vendor CloudAMQP Upstash (same as Redis)