packages/server/src/durable-objects/jetstream-consumer.ts 5.1 K raw
1
import type { Bindings, JetstreamEvent } from "../types";
2
import { resolvePds } from "../utils/resolver";
3
import { ingestDocument, deleteDocument } from "../utils/ingest";
4
5
const JETSTREAM_URL =
6
	"wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=site.standard.document";
7
const ALARM_INTERVAL_MS = 30_000;
8
const CURSOR_SAVE_INTERVAL_MS = 10_000;
9
const CURSOR_SAVE_MESSAGE_COUNT = 100;
10
11
export class JetstreamConsumer implements DurableObject {
12
	private state: DurableObjectState;
13
	private env: Bindings;
14
	private ws: WebSocket | null = null;
15
	private cursor: string | null = null;
16
	private messageCount = 0;
17
	private lastCursorSave = 0;
18
	private pdsCache: Map<string, string | null> = new Map();
19
20
	constructor(state: DurableObjectState, env: Bindings) {
21
		this.state = state;
22
		this.env = env;
23
	}
24
25
	async fetch(request: Request): Promise<Response> {
26
		const url = new URL(request.url);
27
28
		switch (url.pathname) {
29
			case "/start":
30
				return this.handleStart();
31
			case "/status":
32
				return this.handleStatus();
33
			case "/stop":
34
				return this.handleStop();
35
			default:
36
				return new Response("Not found", { status: 404 });
37
		}
38
	}
39
40
	async alarm(): Promise<void> {
41
		if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
42
			console.log("Alarm: WebSocket not connected, reconnecting...");
43
			await this.connect();
44
		}
45
		// Reschedule alarm
46
		await this.state.storage.setAlarm(Date.now() + ALARM_INTERVAL_MS);
47
	}
48
49
	private async handleStart(): Promise<Response> {
50
		if (this.ws && this.ws.readyState === WebSocket.OPEN) {
51
			return Response.json({
52
				status: "already_connected",
53
				cursor: this.cursor,
54
			});
55
		}
56
57
		await this.connect();
58
		await this.state.storage.setAlarm(Date.now() + ALARM_INTERVAL_MS);
59
60
		return Response.json({ status: "started", cursor: this.cursor });
61
	}
62
63
	private handleStatus(): Response {
64
		return Response.json({
65
			connected: this.ws?.readyState === WebSocket.OPEN,
66
			cursor: this.cursor,
67
			messageCount: this.messageCount,
68
		});
69
	}
70
71
	private async handleStop(): Promise<Response> {
72
		if (this.ws) {
73
			this.ws.close();
74
			this.ws = null;
75
		}
76
		await this.state.storage.deleteAlarm();
77
		return Response.json({ status: "stopped" });
78
	}
79
80
	private async connect(): Promise<void> {
81
		// Close existing connection if any
82
		if (this.ws) {
83
			try {
84
				this.ws.close();
85
			} catch {}
86
			this.ws = null;
87
		}
88
89
		// Load cursor from storage
90
		if (!this.cursor) {
91
			this.cursor =
92
				(await this.state.storage.get<string>("cursor")) || null;
93
		}
94
95
		const url = this.cursor
96
			? `${JETSTREAM_URL}&cursor=${this.cursor}`
97
			: JETSTREAM_URL;
98
99
		console.log(`Connecting to Jetstream: ${url}`);
100
101
		try {
102
			const ws = new WebSocket(url);
103
104
			ws.addEventListener("message", (event) => {
105
				this.onMessage(event.data as string);
106
			});
107
108
			ws.addEventListener("close", () => {
109
				console.log("Jetstream WebSocket closed");
110
				this.ws = null;
111
				// Alarm will handle reconnection
112
			});
113
114
			ws.addEventListener("error", (event) => {
115
				console.error("Jetstream WebSocket error:", event);
116
				try {
117
					ws.close();
118
				} catch {}
119
				this.ws = null;
120
			});
121
122
			this.ws = ws;
123
		} catch (error) {
124
			console.error("Failed to connect to Jetstream:", error);
125
			this.ws = null;
126
		}
127
	}
128
129
	private async onMessage(data: string): Promise<void> {
130
		try {
131
			const event = JSON.parse(data) as JetstreamEvent;
132
133
			// Update cursor from every event
134
			this.cursor = String(event.time_us);
135
136
			// Only process commit events
137
			if (event.kind !== "commit") return;
138
139
			const { commit } = event;
140
			if (commit.collection !== "site.standard.document") return;
141
142
			// PDS filter: skip bridgy noise
143
			const pds = await this.resolvePdsWithCache(event.did);
144
			if (!pds || pds.includes("brid.gy")) return;
145
146
			if (
147
				commit.operation === "create" ||
148
				commit.operation === "update"
149
			) {
150
				await ingestDocument(this.env.DB, this.env.RESOLUTION_QUEUE, {
151
					did: event.did,
152
					rkey: commit.rkey,
153
					collection: commit.collection,
154
					cid: commit.cid,
155
					record: commit.record,
156
				});
157
			} else if (commit.operation === "delete") {
158
				await deleteDocument(this.env.DB, {
159
					did: event.did,
160
					collection: commit.collection,
161
					rkey: commit.rkey,
162
				});
163
			}
164
165
			this.messageCount++;
166
167
			// Periodically save cursor
168
			await this.maybeSaveCursor();
169
		} catch (error) {
170
			console.error("Error processing Jetstream message:", error);
171
		}
172
	}
173
174
	private async resolvePdsWithCache(did: string): Promise<string | null> {
175
		// Fast in-memory cache (cleared on DO eviction)
176
		if (this.pdsCache.has(did)) {
177
			return this.pdsCache.get(did)!;
178
		}
179
180
		const pds = await resolvePds(this.env.DB, did);
181
		this.pdsCache.set(did, pds);
182
183
		// Keep in-memory cache bounded
184
		if (this.pdsCache.size > 10_000) {
185
			const firstKey = this.pdsCache.keys().next().value;
186
			if (firstKey) this.pdsCache.delete(firstKey);
187
		}
188
189
		return pds;
190
	}
191
192
	private async maybeSaveCursor(): Promise<void> {
193
		if (!this.cursor) return;
194
195
		const now = Date.now();
196
		const shouldSave =
197
			this.messageCount % CURSOR_SAVE_MESSAGE_COUNT === 0 ||
198
			now - this.lastCursorSave > CURSOR_SAVE_INTERVAL_MS;
199
200
		if (shouldSave) {
201
			await this.state.storage.put("cursor", this.cursor);
202
			this.lastCursorSave = now;
203
		}
204
	}
205
}