packages/server/src/index.ts 3.5 K raw
1
import { Hono } from "hono";
2
import { cors } from "hono/cors";
3
import type { Bindings } from "./types";
4
import {
5
	health,
6
	webhook,
7
	feed,
8
	stats,
9
	records,
10
	admin,
11
	rss,
12
	jetstream,
13
} from "./routes";
14
import { processDocument } from "./utils";
15
16
const app = new Hono<{ Bindings: Bindings }>();
17
18
// Middleware
19
app.use("*", cors());
20
21
// Mount routes
22
app.route("/health", health);
23
app.route("/webhook", webhook);
24
app.route("/feed", feed);
25
app.route("/stats", stats);
26
app.route("/records", records);
27
app.route("/admin", admin);
28
app.route("/rss.xml", rss);
29
app.route("/jetstream", jetstream);
30
31
// 404 handler
32
app.notFound((c) => {
33
	return c.json({ error: "Not found" }, 404);
34
});
35
36
// Export Durable Object class
37
export { JetstreamConsumer } from "./durable-objects/jetstream-consumer";
38
39
// Export for Cloudflare Workers
40
export default {
41
	fetch: app.fetch,
42
	async scheduled(_event: ScheduledEvent, env: Bindings, _ctx: ExecutionContext) {
43
		const batchSize = 50;
44
		// Select stale documents
45
		const { results } = await env.DB.prepare(
46
			`SELECT did, rkey FROM resolved_documents
47
       WHERE stale_at < datetime('now') OR stale_at IS NULL
48
       LIMIT ?`,
49
		)
50
			.bind(batchSize)
51
			.all<{ did: string; rkey: string }>();
52
53
		if (results && results.length > 0) {
54
			const messages = results.map((row) => ({
55
				body: {
56
					did: row.did,
57
					collection: "site.standard.document",
58
					rkey: row.rkey,
59
				},
60
			}));
61
62
			// Send to queue
63
			await env.RESOLUTION_QUEUE.sendBatch(messages);
64
			console.log(`Queued ${messages.length} documents for resolution`);
65
		}
66
67
		// Cleanup: keep only the 300 most recent verified documents, delete everything else
68
		const deletedVerified = await env.DB.prepare(
69
			`DELETE FROM resolved_documents WHERE verified = 1 AND uri NOT IN (
70
				SELECT uri FROM resolved_documents WHERE verified = 1
71
				ORDER BY published_at DESC LIMIT 300
72
			)`,
73
		).run();
74
		if (deletedVerified.meta.changes > 0) {
75
			console.log(`Cleaned up ${deletedVerified.meta.changes} old verified documents`);
76
		}
77
78
		// Delete unverified/stale documents older than 24 hours
79
		const deletedUnverified = await env.DB.prepare(
80
			`DELETE FROM resolved_documents WHERE (verified IS NULL OR verified = 0)
81
			 AND resolved_at < datetime('now', '-24 hours')`,
82
		).run();
83
		if (deletedUnverified.meta.changes > 0) {
84
			console.log(`Cleaned up ${deletedUnverified.meta.changes} unverified documents`);
85
		}
86
87
		// Clean up orphaned repo_records
88
		if (deletedVerified.meta.changes > 0 || deletedUnverified.meta.changes > 0) {
89
			const orphaned = await env.DB.prepare(
90
				`DELETE FROM repo_records WHERE NOT EXISTS (
91
					SELECT 1 FROM resolved_documents WHERE resolved_documents.did = repo_records.did
92
					AND resolved_documents.rkey = repo_records.rkey
93
				)`,
94
			).run();
95
			if (orphaned.meta.changes > 0) {
96
				console.log(`Cleaned up ${orphaned.meta.changes} orphaned repo_records`);
97
			}
98
		}
99
100
		// Ensure Jetstream consumer DO is alive
101
		try {
102
			const id = env.JETSTREAM_CONSUMER.idFromName("singleton");
103
			const stub = env.JETSTREAM_CONSUMER.get(id);
104
			await stub.fetch(new Request("http://do/start"));
105
		} catch (error) {
106
			console.error("Failed to ping Jetstream consumer:", error);
107
		}
108
	},
109
	async queue(batch: MessageBatch<any>, env: Bindings) {
110
		for (const message of batch.messages) {
111
			try {
112
				const { did, collection, rkey } = message.body;
113
				await processDocument(env.DB, did, collection, rkey);
114
				message.ack();
115
			} catch (error) {
116
				console.error("Queue processing error:", error);
117
				message.retry();
118
			}
119
		}
120
	},
121
};