packages/jetstream-consumer/src/index.ts 4.5 K raw
1
export {};
2
3
const WEBHOOK_URL = process.env.WEBHOOK_URL;
4
const WEBHOOK_SECRET = process.env.WEBHOOK_SECRET;
5
const JETSTREAM_INSTANCES = (
6
	process.env.JETSTREAM_INSTANCES ??
7
	"jetstream1.us-east.bsky.network,jetstream2.us-east.bsky.network,jetstream1.us-west.bsky.network,jetstream2.us-west.bsky.network"
8
)
9
	.split(",")
10
	.map((h) => h.trim());
11
const WANTED_COLLECTIONS = (
12
	process.env.WANTED_COLLECTIONS ?? "site.standard.document"
13
)
14
	.split(",")
15
	.map((c) => c.trim());
16
const CURSOR_FILE = process.env.CURSOR_FILE ?? "./cursor.txt";
17
const BATCH_INTERVAL_MS = Number(process.env.BATCH_INTERVAL_MS) || 5000;
18
const INACTIVITY_TIMEOUT_MS =
19
	Number(process.env.INACTIVITY_TIMEOUT_MS) || 300_000;
20
21
const DEFAULT_CURSOR = "1772036746";
22
23
if (!WEBHOOK_URL) {
24
	console.error("WEBHOOK_URL environment variable is required");
25
	process.exit(1);
26
}
27
28
let cursor = DEFAULT_CURSOR;
29
let currentInstanceIndex = 0;
30
let ws: WebSocket | null = null;
31
let lastMessageTime = Date.now();
32
let batch: Array<{
33
	type: string;
34
	did: string;
35
	collection: string;
36
	rkey: string;
37
	cid: string;
38
	record?: Record<string, unknown>;
39
}> = [];
40
41
async function loadCursor(): Promise<void> {
42
	try {
43
		const file = Bun.file(CURSOR_FILE);
44
		const text = await file.text();
45
		const trimmed = text.trim();
46
		if (trimmed) {
47
			cursor = trimmed;
48
			console.log(`Loaded cursor from file: ${cursor}`);
49
		}
50
	} catch {
51
		console.log(`No cursor file found, using default: ${cursor}`);
52
	}
53
}
54
55
async function saveCursor(): Promise<void> {
56
	try {
57
		await Bun.write(CURSOR_FILE, cursor);
58
	} catch (err) {
59
		console.error("Failed to save cursor:", err);
60
	}
61
}
62
63
async function flushBatch(): Promise<void> {
64
	if (batch.length === 0) return;
65
66
	const events = batch.splice(0, batch.length);
67
	console.log(`Flushing batch of ${events.length} events`);
68
69
	try {
70
		const headers: Record<string, string> = {
71
			"Content-Type": "application/json",
72
		};
73
		if (WEBHOOK_SECRET) {
74
			headers["Authorization"] = `Bearer ${WEBHOOK_SECRET}`;
75
		}
76
77
		const res = await fetch(WEBHOOK_URL!, {
78
			method: "POST",
79
			headers,
80
			body: JSON.stringify(events),
81
		});
82
83
		if (!res.ok) {
84
			console.error(
85
				`Webhook responded with ${res.status}: ${await res.text()}`,
86
			);
87
			return;
88
		}
89
90
		const result = (await res.json()) as Record<string, unknown>;
91
		console.log("Batch result:", JSON.stringify(result));
92
		await saveCursor();
93
	} catch (err) {
94
		console.error("Failed to send batch:", err);
95
	}
96
}
97
98
function connect(): void {
99
	const host = JETSTREAM_INSTANCES[currentInstanceIndex];
100
	const collections = WANTED_COLLECTIONS.map(
101
		(c) => `wantedCollections=${c}`,
102
	).join("&");
103
	const url = `wss://${host}/subscribe?${collections}&cursor=${cursor}`;
104
105
	console.log(`Connecting to ${host} with cursor ${cursor}`);
106
107
	ws = new WebSocket(url);
108
109
	ws.addEventListener("open", () => {
110
		console.log(`Connected to ${host}`);
111
		lastMessageTime = Date.now();
112
	});
113
114
	ws.addEventListener("message", (event) => {
115
		lastMessageTime = Date.now();
116
117
		try {
118
			const data = JSON.parse(String(event.data));
119
120
			if (data.kind !== "commit") return;
121
122
			cursor = String(data.time_us);
123
124
			batch.push({
125
				type: data.commit.operation,
126
				did: data.did,
127
				collection: data.commit.collection,
128
				rkey: data.commit.rkey,
129
				cid: data.commit.cid,
130
				record: data.commit.record,
131
			});
132
		} catch (err) {
133
			console.error("Failed to parse message:", err);
134
		}
135
	});
136
137
	ws.addEventListener("close", (event) => {
138
		console.log(`WebSocket closed: code=${event.code} reason=${event.reason}`);
139
	});
140
141
	ws.addEventListener("error", (event) => {
142
		console.error("WebSocket error:", event);
143
	});
144
}
145
146
function switchInstance(): void {
147
	currentInstanceIndex =
148
		(currentInstanceIndex + 1) % JETSTREAM_INSTANCES.length;
149
	const newHost = JETSTREAM_INSTANCES[currentInstanceIndex];
150
	console.log(`Switching to instance: ${newHost}`);
151
152
	if (ws) {
153
		try {
154
			ws.close();
155
		} catch {
156
			// ignore close errors
157
		}
158
		ws = null;
159
	}
160
161
	connect();
162
}
163
164
// --- Main ---
165
166
await loadCursor();
167
connect();
168
169
// Batch flush interval
170
setInterval(() => {
171
	flushBatch();
172
}, BATCH_INTERVAL_MS);
173
174
// Inactivity check every 30s
175
setInterval(() => {
176
	const elapsed = Date.now() - lastMessageTime;
177
	if (elapsed > INACTIVITY_TIMEOUT_MS) {
178
		console.log(
179
			`No messages for ${Math.round(elapsed / 1000)}s, triggering failover`,
180
		);
181
		switchInstance();
182
	}
183
}, 30_000);
184
185
console.log("Jetstream consumer started");
186
console.log(`  Webhook URL: ${WEBHOOK_URL}`);
187
console.log(`  Instances: ${JETSTREAM_INSTANCES.join(", ")}`);
188
console.log(`  Collections: ${WANTED_COLLECTIONS.join(", ")}`);
189
console.log(`  Batch interval: ${BATCH_INTERVAL_MS}ms`);
190
console.log(`  Inactivity timeout: ${INACTIVITY_TIMEOUT_MS}ms`);