packages/server/src/utils/ingest.ts 2.7 K raw
1
import { resolveViewUrl } from "./document";
2
3
const STALE_OFFSET_HOURS = 24;
4
5
export async function ingestDocument(
6
	db: D1Database,
7
	queue: Queue,
8
	params: {
9
		did: string;
10
		rkey: string;
11
		collection: string;
12
		cid?: string;
13
		record?: Record<string, unknown>;
14
	},
15
): Promise<void> {
16
	const { did, rkey, collection, cid, record } = params;
17
18
	// Upsert repo_records
19
	await db
20
		.prepare(
21
			`INSERT INTO repo_records (did, rkey, collection, cid, synced_at)
22
       VALUES (?, ?, ?, ?, datetime('now'))
23
       ON CONFLICT(did, collection, rkey) DO UPDATE SET
24
         cid = ?,
25
         synced_at = datetime('now')`,
26
		)
27
		.bind(did, rkey, collection, cid || null, cid || null)
28
		.run();
29
30
	// If we have the full record, upsert resolved_documents with initial data
31
	if (record) {
32
		const uri = `at://${did}/${collection}/${rkey}`;
33
		const doc = record as {
34
			title?: string;
35
			path?: string;
36
			site?: string;
37
			content?: unknown;
38
			textContent?: string;
39
			publishedAt?: string;
40
			coverImage?: unknown;
41
			bskyPostRef?: { uri: string; cid: string };
42
			tags?: string[];
43
		};
44
45
		let viewUrl: string | null = null;
46
		if (doc.site && doc.path) {
47
			viewUrl = await resolveViewUrl(db, doc.site, doc.path);
48
		}
49
50
		await db
51
			.prepare(
52
				`INSERT INTO resolved_documents (uri, did, rkey, title, path, site, content, text_content, published_at, view_url, resolved_at, stale_at)
53
         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now', '+${STALE_OFFSET_HOURS} hours'))
54
         ON CONFLICT(uri) DO UPDATE SET
55
           title = ?, path = ?, site = ?, content = ?, text_content = ?, published_at = ?, view_url = ?, resolved_at = datetime('now'), stale_at = datetime('now', '+${STALE_OFFSET_HOURS} hours')`,
56
			)
57
			.bind(
58
				uri,
59
				did,
60
				rkey,
61
				doc.title || null,
62
				doc.path || null,
63
				doc.site || null,
64
				doc.content ? JSON.stringify(doc.content) : null,
65
				doc.textContent || null,
66
				doc.publishedAt || null,
67
				viewUrl,
68
				doc.title || null,
69
				doc.path || null,
70
				doc.site || null,
71
				doc.content ? JSON.stringify(doc.content) : null,
72
				doc.textContent || null,
73
				doc.publishedAt || null,
74
				viewUrl,
75
			)
76
			.run();
77
	}
78
79
	// Queue for full resolution (verification, publication lookup, etc.)
80
	await queue.send({ did, collection, rkey });
81
}
82
83
export async function deleteDocument(
84
	db: D1Database,
85
	params: { did: string; collection: string; rkey: string },
86
): Promise<void> {
87
	const { did, collection, rkey } = params;
88
89
	await db
90
		.prepare(
91
			"DELETE FROM repo_records WHERE did = ? AND collection = ? AND rkey = ?",
92
		)
93
		.bind(did, collection, rkey)
94
		.run();
95
96
	const uri = `at://${did}/${collection}/${rkey}`;
97
	await db
98
		.prepare("DELETE FROM resolved_documents WHERE uri = ?")
99
		.bind(uri)
100
		.run();
101
}