Messaging Architecture — QStash
Migration complete: RabbitMQ has been replaced by Upstash QStash for all async messaging in dot-cOS.
Overview
dot-cOS uses QStash (Upstash) for asynchronous event delivery. QStash is an HTTP-based message queue — instead of a persistent AMQP consumer, services expose HTTP webhook endpoints that QStash calls with automatic retries.
Architecture
graph LR
subgraph dot_cOS_Services
WS[Workflow Service\ncos-workflow.dotevolve.net]
RE[Rule Engine\ncos-rules.dotevolve.net]
FE[Frontend\nSSE stream]
end
subgraph QStash["QStash (Upstash)"]
Q[Message Queue\nHTTP delivery\nAuto-retry]
end
WS -->|publishProfileUpdate\nPOST to QStash| Q
WS -->|publishExtractionCompleted\nPOST to QStash| Q
Q -->|POST /webhook/profile-updated| WS
Q -->|POST /webhook/extraction-completed| WS
WS -->|sseEmitter.emit| FE
How It Works
sequenceDiagram
participant WS as Workflow Service
participant QS as QStash
participant WH as Webhook Endpoint\n(same service)
participant SSE as SSE Clients
WS->>QS: POST /v2/publish\n{ url: /webhook/profile-updated, body: payload }
QS-->>WS: 200 OK { messageId }
note over QS: QStash delivers with retries
QS->>WH: POST /api/v1/events/webhook/profile-updated\n(with Upstash-Signature header)
WH->>WH: Verify QStash signature
WH->>SSE: sseEmitter.emit("profile.updated", payload)
WH-->>QS: 200 OK (prevents retry)
Webhook Endpoints
The Workflow Service exposes three webhook endpoints that QStash calls:
| Endpoint | Event | Triggered by |
|---|---|---|
POST /api/v1/events/webhook/profile-updated |
Entity profile changed | publishProfileUpdate() |
POST /api/v1/events/webhook/extraction-completed |
IDP extraction done | publishExtractionCompleted() |
POST /api/v1/events/webhook/extraction-failed |
IDP extraction failed | publishExtractionCompleted({ error }) |
All endpoints verify the Upstash-Signature header before processing.
Publishing Events
The publisher API is unchanged from the RabbitMQ version — same function signatures:
const {
publishProfileUpdate,
publishExtractionCompleted,
} = require("./eventPublisher.service");
// Publish a profile update
await publishProfileUpdate({
entityId: "entity-uuid",
tenantId: "tenant-uuid",
userId: "user-uuid",
changes: [{ field: "name", oldValue: "Old", newValue: "New" }],
version: 5,
});
// Publish extraction result
await publishExtractionCompleted({
jobId: "job-uuid",
entityId: "entity-uuid",
tenantId: "tenant-uuid",
documentType: "PAN_CARD",
extractedData: { panNumber: "ABCDE1234F" },
});
Internally, publishProfileUpdate calls QStash:
await qstash.publishJSON({
url: `${SERVICE_BASE_URL}/api/v1/events/webhook/profile-updated`,
body: payload,
retries: 3,
});
Receiving Events (SSE)
Clients subscribe to real-time events via Server-Sent Events — unchanged from before:
const eventSource = new EventSource(
`${API_URL}/api/v1/events/stream?tenantId=${tenantId}`,
);
eventSource.addEventListener("message", (e) => {
const payload = JSON.parse(e.data);
if (payload.eventType === "profile.updated") {
// Update UI
}
});
Retry Behaviour
QStash retries failed deliveries automatically:
flowchart TD
A[QStash delivers webhook] --> B{HTTP 2xx?}
B -->|Yes| C[Message acknowledged]
B -->|No / Timeout| D[Retry with backoff]
D --> E{Retry 1}
E -->|Fail| F{Retry 2}
F -->|Fail| G{Retry 3}
G -->|Fail| H[Dead Letter Queue\nin QStash console]
Default: 3 retries with exponential backoff. Configurable per-publish.
Signature Verification
Every webhook verifies the QStash signature to prevent spoofed requests:
const { Receiver } = require("@upstash/qstash");
const receiver = new Receiver({
currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY,
nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY,
});
await receiver.verify({
signature: req.headers["upstash-signature"],
body: JSON.stringify(req.body),
url: `${SERVICE_BASE_URL}/api/v1/events/webhook/profile-updated`,
});
Environment Variables
# QStash — single instance for all DotEvolve services
QSTASH_TOKEN=eyJ...
QSTASH_CURRENT_SIGNING_KEY=sig_6SP1B9R9ZLv2t4RT85A9Z9nrPnSU
QSTASH_NEXT_SIGNING_KEY=sig_5MC99iuFLfzKGtvpicpU6rUCmUwK
# Service base URL (QStash needs a public URL to call back)
# Set automatically by Vercel via VERCEL_URL
SERVICE_BASE_URL=https://cos-workflow.dotevolve.net
Local Development
In local development without a public URL, QStash cannot call back. The publisher falls back to direct sseEmitter.emit():
if (!process.env.QSTASH_TOKEN) {
// Dev fallback: emit directly without QStash
sseEmitter.emit(eventType, payload);
return true;
}
To test QStash locally, use ngrok to expose your local port:
ngrok http 8001
# Set SERVICE_BASE_URL=https://your-ngrok-url.ngrok.io
Comparison: Before vs After
| Aspect | RabbitMQ (old) | QStash (current) |
|---|---|---|
| Protocol | AMQP | HTTP |
| Consumer | Persistent process | HTTP webhook endpoint |
| Retries | Manual (nack/requeue) | Built-in, configurable |
| Local dev | Docker container | Direct emit fallback |
| Monitoring | RabbitMQ Management UI | QStash console |
| Cost | $19/mo (CloudAMQP) | $0–$1/mo (pay-per-use) |
| Vendor | CloudAMQP | Upstash (same as Redis) |