| 1 | export {}; |
| 2 | |
| 3 | const WEBHOOK_URL = process.env.WEBHOOK_URL; |
| 4 | const WEBHOOK_SECRET = process.env.WEBHOOK_SECRET; |
| 5 | const JETSTREAM_INSTANCES = ( |
| 6 | process.env.JETSTREAM_INSTANCES ?? |
| 7 | "jetstream1.us-east.bsky.network,jetstream2.us-east.bsky.network,jetstream1.us-west.bsky.network,jetstream2.us-west.bsky.network" |
| 8 | ) |
| 9 | .split(",") |
| 10 | .map((h) => h.trim()); |
| 11 | const WANTED_COLLECTIONS = ( |
| 12 | process.env.WANTED_COLLECTIONS ?? "site.standard.document" |
| 13 | ) |
| 14 | .split(",") |
| 15 | .map((c) => c.trim()); |
| 16 | const CURSOR_FILE = process.env.CURSOR_FILE ?? "./cursor.txt"; |
| 17 | const BATCH_INTERVAL_MS = Number(process.env.BATCH_INTERVAL_MS) || 5000; |
| 18 | const INACTIVITY_TIMEOUT_MS = |
| 19 | Number(process.env.INACTIVITY_TIMEOUT_MS) || 300_000; |
| 20 | |
| 21 | const DEFAULT_CURSOR = "1772036746"; |
| 22 | |
| 23 | if (!WEBHOOK_URL) { |
| 24 | console.error("WEBHOOK_URL environment variable is required"); |
| 25 | process.exit(1); |
| 26 | } |
| 27 | |
| 28 | let cursor = DEFAULT_CURSOR; |
| 29 | let currentInstanceIndex = 0; |
| 30 | let ws: WebSocket | null = null; |
| 31 | let lastMessageTime = Date.now(); |
| 32 | let batch: Array<{ |
| 33 | type: string; |
| 34 | did: string; |
| 35 | collection: string; |
| 36 | rkey: string; |
| 37 | cid: string; |
| 38 | record?: Record<string, unknown>; |
| 39 | }> = []; |
| 40 | |
| 41 | async function loadCursor(): Promise<void> { |
| 42 | try { |
| 43 | const file = Bun.file(CURSOR_FILE); |
| 44 | const text = await file.text(); |
| 45 | const trimmed = text.trim(); |
| 46 | if (trimmed) { |
| 47 | cursor = trimmed; |
| 48 | console.log(`Loaded cursor from file: ${cursor}`); |
| 49 | } |
| 50 | } catch { |
| 51 | console.log(`No cursor file found, using default: ${cursor}`); |
| 52 | } |
| 53 | } |
| 54 | |
| 55 | async function saveCursor(): Promise<void> { |
| 56 | try { |
| 57 | await Bun.write(CURSOR_FILE, cursor); |
| 58 | } catch (err) { |
| 59 | console.error("Failed to save cursor:", err); |
| 60 | } |
| 61 | } |
| 62 | |
| 63 | async function flushBatch(): Promise<void> { |
| 64 | if (batch.length === 0) return; |
| 65 | |
| 66 | const events = batch.splice(0, batch.length); |
| 67 | console.log(`Flushing batch of ${events.length} events`); |
| 68 | |
| 69 | try { |
| 70 | const headers: Record<string, string> = { |
| 71 | "Content-Type": "application/json", |
| 72 | }; |
| 73 | if (WEBHOOK_SECRET) { |
| 74 | headers["Authorization"] = `Bearer ${WEBHOOK_SECRET}`; |
| 75 | } |
| 76 | |
| 77 | const res = await fetch(WEBHOOK_URL!, { |
| 78 | method: "POST", |
| 79 | headers, |
| 80 | body: JSON.stringify(events), |
| 81 | }); |
| 82 | |
| 83 | if (!res.ok) { |
| 84 | console.error( |
| 85 | `Webhook responded with ${res.status}: ${await res.text()}`, |
| 86 | ); |
| 87 | return; |
| 88 | } |
| 89 | |
| 90 | const result = (await res.json()) as Record<string, unknown>; |
| 91 | console.log("Batch result:", JSON.stringify(result)); |
| 92 | await saveCursor(); |
| 93 | } catch (err) { |
| 94 | console.error("Failed to send batch:", err); |
| 95 | } |
| 96 | } |
| 97 | |
| 98 | function connect(): void { |
| 99 | const host = JETSTREAM_INSTANCES[currentInstanceIndex]; |
| 100 | const collections = WANTED_COLLECTIONS.map( |
| 101 | (c) => `wantedCollections=${c}`, |
| 102 | ).join("&"); |
| 103 | const url = `wss://${host}/subscribe?${collections}&cursor=${cursor}`; |
| 104 | |
| 105 | console.log(`Connecting to ${host} with cursor ${cursor}`); |
| 106 | |
| 107 | ws = new WebSocket(url); |
| 108 | |
| 109 | ws.addEventListener("open", () => { |
| 110 | console.log(`Connected to ${host}`); |
| 111 | lastMessageTime = Date.now(); |
| 112 | }); |
| 113 | |
| 114 | ws.addEventListener("message", (event) => { |
| 115 | lastMessageTime = Date.now(); |
| 116 | |
| 117 | try { |
| 118 | const data = JSON.parse(String(event.data)); |
| 119 | |
| 120 | if (data.kind !== "commit") return; |
| 121 | |
| 122 | cursor = String(data.time_us); |
| 123 | |
| 124 | batch.push({ |
| 125 | type: data.commit.operation, |
| 126 | did: data.did, |
| 127 | collection: data.commit.collection, |
| 128 | rkey: data.commit.rkey, |
| 129 | cid: data.commit.cid, |
| 130 | record: data.commit.record, |
| 131 | }); |
| 132 | } catch (err) { |
| 133 | console.error("Failed to parse message:", err); |
| 134 | } |
| 135 | }); |
| 136 | |
| 137 | ws.addEventListener("close", (event) => { |
| 138 | console.log(`WebSocket closed: code=${event.code} reason=${event.reason}`); |
| 139 | }); |
| 140 | |
| 141 | ws.addEventListener("error", (event) => { |
| 142 | console.error("WebSocket error:", event); |
| 143 | }); |
| 144 | } |
| 145 | |
| 146 | function switchInstance(): void { |
| 147 | currentInstanceIndex = |
| 148 | (currentInstanceIndex + 1) % JETSTREAM_INSTANCES.length; |
| 149 | const newHost = JETSTREAM_INSTANCES[currentInstanceIndex]; |
| 150 | console.log(`Switching to instance: ${newHost}`); |
| 151 | |
| 152 | if (ws) { |
| 153 | try { |
| 154 | ws.close(); |
| 155 | } catch { |
| 156 | // ignore close errors |
| 157 | } |
| 158 | ws = null; |
| 159 | } |
| 160 | |
| 161 | connect(); |
| 162 | } |
| 163 | |
| 164 | // --- Main --- |
| 165 | |
| 166 | await loadCursor(); |
| 167 | connect(); |
| 168 | |
| 169 | // Batch flush interval |
| 170 | setInterval(() => { |
| 171 | flushBatch(); |
| 172 | }, BATCH_INTERVAL_MS); |
| 173 | |
| 174 | // Inactivity check every 30s |
| 175 | setInterval(() => { |
| 176 | const elapsed = Date.now() - lastMessageTime; |
| 177 | if (elapsed > INACTIVITY_TIMEOUT_MS) { |
| 178 | console.log( |
| 179 | `No messages for ${Math.round(elapsed / 1000)}s, triggering failover`, |
| 180 | ); |
| 181 | switchInstance(); |
| 182 | } |
| 183 | }, 30_000); |
| 184 | |
| 185 | console.log("Jetstream consumer started"); |
| 186 | console.log(` Webhook URL: ${WEBHOOK_URL}`); |
| 187 | console.log(` Instances: ${JETSTREAM_INSTANCES.join(", ")}`); |
| 188 | console.log(` Collections: ${WANTED_COLLECTIONS.join(", ")}`); |
| 189 | console.log(` Batch interval: ${BATCH_INTERVAL_MS}ms`); |
| 190 | console.log(` Inactivity timeout: ${INACTIVITY_TIMEOUT_MS}ms`); |