chore: refactor to jetstream durable object fa68d029
Steve · 2026-04-11 22:33 9 file(s) · +452 −2
packages/server/src/durable-objects/jetstream-consumer.ts (added) +205 −0
1 +
import type { Bindings, JetstreamEvent } from "../types";
2 +
import { resolvePds } from "../utils/resolver";
3 +
import { ingestDocument, deleteDocument } from "../utils/ingest";
4 +
5 +
const JETSTREAM_URL =
6 +
	"wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=site.standard.document";
7 +
const ALARM_INTERVAL_MS = 30_000;
8 +
const CURSOR_SAVE_INTERVAL_MS = 10_000;
9 +
const CURSOR_SAVE_MESSAGE_COUNT = 100;
10 +
11 +
export class JetstreamConsumer implements DurableObject {
12 +
	private state: DurableObjectState;
13 +
	private env: Bindings;
14 +
	private ws: WebSocket | null = null;
15 +
	private cursor: string | null = null;
16 +
	private messageCount = 0;
17 +
	private lastCursorSave = 0;
18 +
	private pdsCache: Map<string, string | null> = new Map();
19 +
20 +
	constructor(state: DurableObjectState, env: Bindings) {
21 +
		this.state = state;
22 +
		this.env = env;
23 +
	}
24 +
25 +
	async fetch(request: Request): Promise<Response> {
26 +
		const url = new URL(request.url);
27 +
28 +
		switch (url.pathname) {
29 +
			case "/start":
30 +
				return this.handleStart();
31 +
			case "/status":
32 +
				return this.handleStatus();
33 +
			case "/stop":
34 +
				return this.handleStop();
35 +
			default:
36 +
				return new Response("Not found", { status: 404 });
37 +
		}
38 +
	}
39 +
40 +
	async alarm(): Promise<void> {
41 +
		if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
42 +
			console.log("Alarm: WebSocket not connected, reconnecting...");
43 +
			await this.connect();
44 +
		}
45 +
		// Reschedule alarm
46 +
		await this.state.storage.setAlarm(Date.now() + ALARM_INTERVAL_MS);
47 +
	}
48 +
49 +
	private async handleStart(): Promise<Response> {
50 +
		if (this.ws && this.ws.readyState === WebSocket.OPEN) {
51 +
			return Response.json({
52 +
				status: "already_connected",
53 +
				cursor: this.cursor,
54 +
			});
55 +
		}
56 +
57 +
		await this.connect();
58 +
		await this.state.storage.setAlarm(Date.now() + ALARM_INTERVAL_MS);
59 +
60 +
		return Response.json({ status: "started", cursor: this.cursor });
61 +
	}
62 +
63 +
	private handleStatus(): Response {
64 +
		return Response.json({
65 +
			connected: this.ws?.readyState === WebSocket.OPEN,
66 +
			cursor: this.cursor,
67 +
			messageCount: this.messageCount,
68 +
		});
69 +
	}
70 +
71 +
	private async handleStop(): Promise<Response> {
72 +
		if (this.ws) {
73 +
			this.ws.close();
74 +
			this.ws = null;
75 +
		}
76 +
		await this.state.storage.deleteAlarm();
77 +
		return Response.json({ status: "stopped" });
78 +
	}
79 +
80 +
	private async connect(): Promise<void> {
81 +
		// Close existing connection if any
82 +
		if (this.ws) {
83 +
			try {
84 +
				this.ws.close();
85 +
			} catch {}
86 +
			this.ws = null;
87 +
		}
88 +
89 +
		// Load cursor from storage
90 +
		if (!this.cursor) {
91 +
			this.cursor =
92 +
				(await this.state.storage.get<string>("cursor")) || null;
93 +
		}
94 +
95 +
		const url = this.cursor
96 +
			? `${JETSTREAM_URL}&cursor=${this.cursor}`
97 +
			: JETSTREAM_URL;
98 +
99 +
		console.log(`Connecting to Jetstream: ${url}`);
100 +
101 +
		try {
102 +
			const ws = new WebSocket(url);
103 +
104 +
			ws.addEventListener("message", (event) => {
105 +
				this.onMessage(event.data as string);
106 +
			});
107 +
108 +
			ws.addEventListener("close", () => {
109 +
				console.log("Jetstream WebSocket closed");
110 +
				this.ws = null;
111 +
				// Alarm will handle reconnection
112 +
			});
113 +
114 +
			ws.addEventListener("error", (event) => {
115 +
				console.error("Jetstream WebSocket error:", event);
116 +
				try {
117 +
					ws.close();
118 +
				} catch {}
119 +
				this.ws = null;
120 +
			});
121 +
122 +
			this.ws = ws;
123 +
		} catch (error) {
124 +
			console.error("Failed to connect to Jetstream:", error);
125 +
			this.ws = null;
126 +
		}
127 +
	}
128 +
129 +
	private async onMessage(data: string): Promise<void> {
130 +
		try {
131 +
			const event = JSON.parse(data) as JetstreamEvent;
132 +
133 +
			// Update cursor from every event
134 +
			this.cursor = String(event.time_us);
135 +
136 +
			// Only process commit events
137 +
			if (event.kind !== "commit") return;
138 +
139 +
			const { commit } = event;
140 +
			if (commit.collection !== "site.standard.document") return;
141 +
142 +
			// PDS filter: skip bridgy noise
143 +
			const pds = await this.resolvePdsWithCache(event.did);
144 +
			if (!pds || pds.includes("brid.gy")) return;
145 +
146 +
			if (
147 +
				commit.operation === "create" ||
148 +
				commit.operation === "update"
149 +
			) {
150 +
				await ingestDocument(this.env.DB, this.env.RESOLUTION_QUEUE, {
151 +
					did: event.did,
152 +
					rkey: commit.rkey,
153 +
					collection: commit.collection,
154 +
					cid: commit.cid,
155 +
					record: commit.record,
156 +
				});
157 +
			} else if (commit.operation === "delete") {
158 +
				await deleteDocument(this.env.DB, {
159 +
					did: event.did,
160 +
					collection: commit.collection,
161 +
					rkey: commit.rkey,
162 +
				});
163 +
			}
164 +
165 +
			this.messageCount++;
166 +
167 +
			// Periodically save cursor
168 +
			await this.maybeSaveCursor();
169 +
		} catch (error) {
170 +
			console.error("Error processing Jetstream message:", error);
171 +
		}
172 +
	}
173 +
174 +
	private async resolvePdsWithCache(did: string): Promise<string | null> {
175 +
		// Fast in-memory cache (cleared on DO eviction)
176 +
		if (this.pdsCache.has(did)) {
177 +
			return this.pdsCache.get(did)!;
178 +
		}
179 +
180 +
		const pds = await resolvePds(this.env.DB, did);
181 +
		this.pdsCache.set(did, pds);
182 +
183 +
		// Keep in-memory cache bounded
184 +
		if (this.pdsCache.size > 10_000) {
185 +
			const firstKey = this.pdsCache.keys().next().value;
186 +
			if (firstKey) this.pdsCache.delete(firstKey);
187 +
		}
188 +
189 +
		return pds;
190 +
	}
191 +
192 +
	private async maybeSaveCursor(): Promise<void> {
193 +
		if (!this.cursor) return;
194 +
195 +
		const now = Date.now();
196 +
		const shouldSave =
197 +
			this.messageCount % CURSOR_SAVE_MESSAGE_COUNT === 0 ||
198 +
			now - this.lastCursorSave > CURSOR_SAVE_INTERVAL_MS;
199 +
200 +
		if (shouldSave) {
201 +
			await this.state.storage.put("cursor", this.cursor);
202 +
			this.lastCursorSave = now;
203 +
		}
204 +
	}
205 +
}
packages/server/src/index.ts +57 −2
1 1
import { Hono } from "hono";
2 2
import { cors } from "hono/cors";
3 3
import type { Bindings } from "./types";
4 -
import { health, webhook, feed, stats, records, admin, rss } from "./routes";
4 +
import {
5 +
	health,
6 +
	webhook,
7 +
	feed,
8 +
	stats,
9 +
	records,
10 +
	admin,
11 +
	rss,
12 +
	jetstream,
13 +
} from "./routes";
5 14
import { processDocument } from "./utils";
6 15
7 16
const app = new Hono<{ Bindings: Bindings }>();
17 26
app.route("/records", records);
18 27
app.route("/admin", admin);
19 28
app.route("/rss.xml", rss);
29 +
app.route("/jetstream", jetstream);
20 30
21 31
// 404 handler
22 32
app.notFound((c) => {
23 33
	return c.json({ error: "Not found" }, 404);
24 34
});
35 +
36 +
// Export Durable Object class
37 +
export { JetstreamConsumer } from "./durable-objects/jetstream-consumer";
25 38
26 39
// Export for Cloudflare Workers
27 40
export default {
28 41
	fetch: app.fetch,
29 -
	async scheduled(event: ScheduledEvent, env: Bindings, ctx: ExecutionContext) {
42 +
	async scheduled(_event: ScheduledEvent, env: Bindings, _ctx: ExecutionContext) {
30 43
		const batchSize = 50;
31 44
		// Select stale documents
32 45
		const { results } = await env.DB.prepare(
49 62
			// Send to queue
50 63
			await env.RESOLUTION_QUEUE.sendBatch(messages);
51 64
			console.log(`Queued ${messages.length} documents for resolution`);
65 +
		}
66 +
67 +
		// Cleanup: keep only the 300 most recent verified documents, delete everything else
68 +
		const deletedVerified = await env.DB.prepare(
69 +
			`DELETE FROM resolved_documents WHERE verified = 1 AND uri NOT IN (
70 +
				SELECT uri FROM resolved_documents WHERE verified = 1
71 +
				ORDER BY published_at DESC LIMIT 300
72 +
			)`,
73 +
		).run();
74 +
		if (deletedVerified.meta.changes > 0) {
75 +
			console.log(`Cleaned up ${deletedVerified.meta.changes} old verified documents`);
76 +
		}
77 +
78 +
		// Delete unverified/stale documents older than 24 hours
79 +
		const deletedUnverified = await env.DB.prepare(
80 +
			`DELETE FROM resolved_documents WHERE (verified IS NULL OR verified = 0)
81 +
			 AND resolved_at < datetime('now', '-24 hours')`,
82 +
		).run();
83 +
		if (deletedUnverified.meta.changes > 0) {
84 +
			console.log(`Cleaned up ${deletedUnverified.meta.changes} unverified documents`);
85 +
		}
86 +
87 +
		// Clean up orphaned repo_records
88 +
		if (deletedVerified.meta.changes > 0 || deletedUnverified.meta.changes > 0) {
89 +
			const orphaned = await env.DB.prepare(
90 +
				`DELETE FROM repo_records WHERE NOT EXISTS (
91 +
					SELECT 1 FROM resolved_documents WHERE resolved_documents.did = repo_records.did
92 +
					AND resolved_documents.rkey = repo_records.rkey
93 +
				)`,
94 +
			).run();
95 +
			if (orphaned.meta.changes > 0) {
96 +
				console.log(`Cleaned up ${orphaned.meta.changes} orphaned repo_records`);
97 +
			}
98 +
		}
99 +
100 +
		// Ensure Jetstream consumer DO is alive
101 +
		try {
102 +
			const id = env.JETSTREAM_CONSUMER.idFromName("singleton");
103 +
			const stub = env.JETSTREAM_CONSUMER.get(id);
104 +
			await stub.fetch(new Request("http://do/start"));
105 +
		} catch (error) {
106 +
			console.error("Failed to ping Jetstream consumer:", error);
52 107
		}
53 108
	},
54 109
	async queue(batch: MessageBatch<any>, env: Bindings) {
packages/server/src/routes/index.ts +1 −0
5 5
export { default as records } from "./records";
6 6
export { default as admin } from "./admin";
7 7
export { default as rss } from "./rss";
8 +
export { default as jetstream } from "./jetstream";
packages/server/src/routes/jetstream.ts (added) +29 −0
1 +
import { Hono } from "hono";
2 +
import type { Bindings } from "../types";
3 +
4 +
const jetstream = new Hono<{ Bindings: Bindings }>();
5 +
6 +
function getStub(env: Bindings) {
7 +
	const id = env.JETSTREAM_CONSUMER.idFromName("singleton");
8 +
	return env.JETSTREAM_CONSUMER.get(id);
9 +
}
10 +
11 +
jetstream.get("/start", async (c) => {
12 +
	const stub = getStub(c.env);
13 +
	const resp = await stub.fetch(new Request("http://do/start"));
14 +
	return c.json(await resp.json());
15 +
});
16 +
17 +
jetstream.get("/status", async (c) => {
18 +
	const stub = getStub(c.env);
19 +
	const resp = await stub.fetch(new Request("http://do/status"));
20 +
	return c.json(await resp.json());
21 +
});
22 +
23 +
jetstream.get("/stop", async (c) => {
24 +
	const stub = getStub(c.env);
25 +
	const resp = await stub.fetch(new Request("http://do/stop"));
26 +
	return c.json(await resp.json());
27 +
});
28 +
29 +
export default jetstream;
packages/server/src/types/index.ts +41 −0
2 2
  DB: D1Database;
3 3
  RESOLUTION_QUEUE: Queue;
4 4
  TAP_WEBHOOK_SECRET?: string;
5 +
  JETSTREAM_CONSUMER: DurableObjectNamespace;
5 6
};
7 +
8 +
// Jetstream WebSocket event types
9 +
export interface JetstreamCommitEvent {
10 +
  did: string;
11 +
  time_us: number;
12 +
  kind: "commit";
13 +
  commit: {
14 +
    rev: string;
15 +
    operation: "create" | "update" | "delete";
16 +
    collection: string;
17 +
    rkey: string;
18 +
    record?: Record<string, unknown>;
19 +
    cid?: string;
20 +
  };
21 +
}
22 +
23 +
export interface JetstreamIdentityEvent {
24 +
  did: string;
25 +
  time_us: number;
26 +
  kind: "identity";
27 +
  identity: { did: string; seq: number; time: string };
28 +
}
29 +
30 +
export interface JetstreamAccountEvent {
31 +
  did: string;
32 +
  time_us: number;
33 +
  kind: "account";
34 +
  account: {
35 +
    active: boolean;
36 +
    did: string;
37 +
    seq: number;
38 +
    time: string;
39 +
    status?: string;
40 +
  };
41 +
}
42 +
43 +
export type JetstreamEvent =
44 +
  | JetstreamCommitEvent
45 +
  | JetstreamIdentityEvent
46 +
  | JetstreamAccountEvent;
6 47
7 48
export interface TapRecordEvent {
8 49
  id: number;
packages/server/src/utils/index.ts +1 −0
3 3
export { resolveViewUrl, processDocument } from "./document";
4 4
export { buildBlobUrl, extractBlobCid } from "./blob";
5 5
export { verifyPublication, verifyDocument, verifyDocumentRecord } from "./verification";
6 +
export { ingestDocument, deleteDocument } from "./ingest";
packages/server/src/utils/ingest.ts (added) +101 −0
1 +
import { resolveViewUrl } from "./document";
2 +
3 +
const STALE_OFFSET_HOURS = 24;
4 +
5 +
export async function ingestDocument(
6 +
	db: D1Database,
7 +
	queue: Queue,
8 +
	params: {
9 +
		did: string;
10 +
		rkey: string;
11 +
		collection: string;
12 +
		cid?: string;
13 +
		record?: Record<string, unknown>;
14 +
	},
15 +
): Promise<void> {
16 +
	const { did, rkey, collection, cid, record } = params;
17 +
18 +
	// Upsert repo_records
19 +
	await db
20 +
		.prepare(
21 +
			`INSERT INTO repo_records (did, rkey, collection, cid, synced_at)
22 +
       VALUES (?, ?, ?, ?, datetime('now'))
23 +
       ON CONFLICT(did, collection, rkey) DO UPDATE SET
24 +
         cid = ?,
25 +
         synced_at = datetime('now')`,
26 +
		)
27 +
		.bind(did, rkey, collection, cid || null, cid || null)
28 +
		.run();
29 +
30 +
	// If we have the full record, upsert resolved_documents with initial data
31 +
	if (record) {
32 +
		const uri = `at://${did}/${collection}/${rkey}`;
33 +
		const doc = record as {
34 +
			title?: string;
35 +
			path?: string;
36 +
			site?: string;
37 +
			content?: unknown;
38 +
			textContent?: string;
39 +
			publishedAt?: string;
40 +
			coverImage?: unknown;
41 +
			bskyPostRef?: { uri: string; cid: string };
42 +
			tags?: string[];
43 +
		};
44 +
45 +
		let viewUrl: string | null = null;
46 +
		if (doc.site && doc.path) {
47 +
			viewUrl = await resolveViewUrl(db, doc.site, doc.path);
48 +
		}
49 +
50 +
		await db
51 +
			.prepare(
52 +
				`INSERT INTO resolved_documents (uri, did, rkey, title, path, site, content, text_content, published_at, view_url, resolved_at, stale_at)
53 +
         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now', '+${STALE_OFFSET_HOURS} hours'))
54 +
         ON CONFLICT(uri) DO UPDATE SET
55 +
           title = ?, path = ?, site = ?, content = ?, text_content = ?, published_at = ?, view_url = ?, resolved_at = datetime('now'), stale_at = datetime('now', '+${STALE_OFFSET_HOURS} hours')`,
56 +
			)
57 +
			.bind(
58 +
				uri,
59 +
				did,
60 +
				rkey,
61 +
				doc.title || null,
62 +
				doc.path || null,
63 +
				doc.site || null,
64 +
				doc.content ? JSON.stringify(doc.content) : null,
65 +
				doc.textContent || null,
66 +
				doc.publishedAt || null,
67 +
				viewUrl,
68 +
				doc.title || null,
69 +
				doc.path || null,
70 +
				doc.site || null,
71 +
				doc.content ? JSON.stringify(doc.content) : null,
72 +
				doc.textContent || null,
73 +
				doc.publishedAt || null,
74 +
				viewUrl,
75 +
			)
76 +
			.run();
77 +
	}
78 +
79 +
	// Queue for full resolution (verification, publication lookup, etc.)
80 +
	await queue.send({ did, collection, rkey });
81 +
}
82 +
83 +
export async function deleteDocument(
84 +
	db: D1Database,
85 +
	params: { did: string; collection: string; rkey: string },
86 +
): Promise<void> {
87 +
	const { did, collection, rkey } = params;
88 +
89 +
	await db
90 +
		.prepare(
91 +
			"DELETE FROM repo_records WHERE did = ? AND collection = ? AND rkey = ?",
92 +
		)
93 +
		.bind(did, collection, rkey)
94 +
		.run();
95 +
96 +
	const uri = `at://${did}/${collection}/${rkey}`;
97 +
	await db
98 +
		.prepare("DELETE FROM resolved_documents WHERE uri = ?")
99 +
		.bind(uri)
100 +
		.run();
101 +
}
packages/server/worker-configuration.d.ts (added) +7 −0
1 +
// Generated by Wrangler by running `wrangler types`
2 +
3 +
interface Env {
4 +
	JETSTREAM_CONSUMER: DurableObjectNamespace<import("./src/index").JetstreamConsumer>;
5 +
	DB: D1Database;
6 +
	RESOLUTION_QUEUE: Queue;
7 +
}
packages/server/wrangler.toml +10 −0
19 19
max_batch_timeout = 30
20 20
21 21
22 +
# Durable Object for Jetstream WebSocket consumer
23 +
[durable_objects]
24 +
bindings = [
25 +
  { name = "JETSTREAM_CONSUMER", class_name = "JetstreamConsumer" }
26 +
]
27 +
28 +
[[migrations]]
29 +
tag = "v1"
30 +
new_classes = ["JetstreamConsumer"]
31 +
22 32
# Cron trigger to refresh stale documents
23 33
[triggers]
24 34
crons = ["0 * * * *"]  # Every hour (at minute 0)