packages/server/src/utils/document.ts 8.1 K raw
1
import { resolvePds } from "./resolver";
2
import { parseAtUri } from "./at-uri";
3
import { buildBlobUrl, extractBlobCid } from "./blob";
4
import { verifyDocumentRecord } from "./verification";
5
6
// Raw document record from PDS
7
interface DocumentRecord {
8
	site?: string;
9
	path?: string;
10
	title?: string;
11
	description?: string;
12
	coverImage?: unknown;
13
	content?: unknown;
14
	textContent?: string;
15
	bskyPostRef?: { uri: string; cid: string };
16
	tags?: string[];
17
	publishedAt?: string;
18
	updatedAt?: string;
19
}
20
21
// Raw publication record from PDS
22
interface PublicationRecord {
23
	url?: string;
24
	name?: string;
25
	description?: string;
26
	icon?: unknown;
27
}
28
29
// Resolved publication data
30
interface ResolvedPublication {
31
	url: string;
32
	name: string;
33
	description: string | null;
34
	iconCid: string | null;
35
	iconUrl: string | null;
36
}
37
38
/**
39
 * Fetches a publication record from an at:// URI
40
 */
41
async function fetchPublication(
42
	db: D1Database,
43
	siteUri: string,
44
): Promise<ResolvedPublication | null> {
45
	const parsed = parseAtUri(siteUri);
46
	if (!parsed) return null;
47
48
	try {
49
		const pds = await resolvePds(db, parsed.did);
50
		if (!pds) return null;
51
52
		const url = `${pds}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(
53
			parsed.did,
54
		)}&collection=${encodeURIComponent(parsed.collection)}&rkey=${encodeURIComponent(
55
			parsed.rkey,
56
		)}`;
57
58
		const response = await fetch(url);
59
		if (!response.ok) return null;
60
61
		const data = (await response.json()) as { value?: PublicationRecord };
62
		const pub = data.value;
63
		if (!pub?.url || !pub?.name) return null;
64
65
		const iconCid = extractBlobCid(pub.icon);
66
		const iconUrl = iconCid ? buildBlobUrl(pds, parsed.did, iconCid) : null;
67
68
		return {
69
			url: pub.url,
70
			name: pub.name,
71
			description: pub.description || null,
72
			iconCid,
73
			iconUrl,
74
		};
75
	} catch {
76
		return null;
77
	}
78
}
79
80
/**
81
 * Resolves the view URL for a document.
82
 * If site is an at:// URI, fetches the publication to get the base URL.
83
 * If site is an https:// URL, uses it directly.
84
 */
85
export async function resolveViewUrl(
86
	db: D1Database,
87
	siteUri: string,
88
	path: string,
89
): Promise<string | null> {
90
	// Check if site is an at:// URI or direct URL
91
	if (siteUri.startsWith("at://")) {
92
		const pub = await fetchPublication(db, siteUri);
93
		if (!pub?.url) return null;
94
		const baseUrl = pub.url.startsWith("http") ? pub.url : `https://${pub.url}`;
95
		return new URL(path, baseUrl).toString();
96
	}
97
98
	// Direct URL
99
	const baseUrl = siteUri.startsWith("http") ? siteUri : `https://${siteUri}`;
100
	return new URL(path, baseUrl).toString();
101
}
102
103
/**
104
 * Processes a document record: fetches from PDS, resolves publication,
105
 * and stores all fields in resolved_documents table.
106
 */
107
export async function processDocument(
108
	db: D1Database,
109
	did: string,
110
	collection: string,
111
	rkey: string,
112
) {
113
	try {
114
		// 1. Resolve PDS
115
		const pds = await resolvePds(db, did);
116
		if (!pds) {
117
			console.warn(`Could not resolve PDS for ${did}`);
118
			return;
119
		}
120
121
		// 2. Fetch Document Record
122
		const url = `${pds}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(
123
			did,
124
		)}&collection=${encodeURIComponent(collection)}&rkey=${encodeURIComponent(rkey)}`;
125
126
		const response = await fetch(url);
127
		if (!response.ok) {
128
			if (response.status === 404) {
129
				// Record was deleted from PDS - clean up our local copy
130
				console.warn(
131
					`Record not found (deleted): ${did}/${collection}/${rkey}`,
132
				);
133
				const uri = `at://${did}/${collection}/${rkey}`;
134
				await db
135
					.prepare("DELETE FROM resolved_documents WHERE uri = ?")
136
					.bind(uri)
137
					.run();
138
				await db
139
					.prepare(
140
						"DELETE FROM repo_records WHERE did = ? AND collection = ? AND rkey = ?",
141
					)
142
					.bind(did, collection, rkey)
143
					.run();
144
				return; // Not an error, just cleanup
145
			}
146
			// Other errors (5xx, rate limits, etc.) should be retried
147
			throw new Error(
148
				`Failed to fetch record: ${response.status} ${response.statusText}`,
149
			);
150
		}
151
152
		const data = (await response.json()) as {
153
			uri: string;
154
			cid?: string;
155
			value: DocumentRecord;
156
		};
157
158
		const { value, cid } = data;
159
160
		// 3. Update repo_records
161
		await db
162
			.prepare(
163
				`INSERT INTO repo_records (did, rkey, collection, cid, synced_at)
164
         VALUES (?, ?, ?, ?, datetime('now'))
165
         ON CONFLICT(did, collection, rkey) DO UPDATE SET
166
           cid = ?,
167
           synced_at = datetime('now')`,
168
			)
169
			.bind(did, rkey, collection, cid || null, cid || null)
170
			.run();
171
172
		// 4. Extract document fields
173
		const title = value.title || null;
174
		const description = value.description || null;
175
		const path = value.path || null;
176
		const site = value.site || null;
177
		const content = value.content ? JSON.stringify(value.content) : null;
178
		const textContent = value.textContent || null;
179
		const coverImageCid = extractBlobCid(value.coverImage);
180
		const coverImageUrl = coverImageCid
181
			? buildBlobUrl(pds, did, coverImageCid)
182
			: null;
183
		const bskyPostRef = value.bskyPostRef
184
			? JSON.stringify(value.bskyPostRef)
185
			: null;
186
		const tags = value.tags ? JSON.stringify(value.tags) : null;
187
		const publishedAt = value.publishedAt || null;
188
		const updatedAt = value.updatedAt || null;
189
190
		// 5. Resolve publication if site is at:// URI
191
		let pubUrl: string | null = null;
192
		let pubName: string | null = null;
193
		let pubDescription: string | null = null;
194
		let pubIconCid: string | null = null;
195
		let pubIconUrl: string | null = null;
196
		let viewUrl: string | null = null;
197
198
		if (site) {
199
			if (site.startsWith("at://")) {
200
				// Fetch publication record
201
				const pub = await fetchPublication(db, site);
202
				if (pub) {
203
					pubUrl = pub.url;
204
					pubName = pub.name;
205
					pubDescription = pub.description;
206
					pubIconCid = pub.iconCid;
207
					pubIconUrl = pub.iconUrl;
208
					// Construct view URL
209
					if (pubUrl && path) {
210
						const baseUrl = pubUrl.startsWith("http")
211
							? pubUrl
212
							: `https://${pubUrl}`;
213
						viewUrl = new URL(path, baseUrl).toString();
214
					}
215
				}
216
			} else {
217
				// Site is a direct URL (loose document)
218
				pubUrl = site;
219
				if (path) {
220
					const baseUrl = site.startsWith("http") ? site : `https://${site}`;
221
					viewUrl = new URL(path, baseUrl).toString();
222
				}
223
			}
224
		}
225
226
		// 6. Verify the document
227
		const uri = `at://${did}/${collection}/${rkey}`;
228
		const verified = await verifyDocumentRecord(pubUrl, site, viewUrl, uri);
229
230
		// 7. Insert/update resolved_documents
231
		const STALE_OFFSET_HOURS = 24;
232
233
		await db
234
			.prepare(
235
				`INSERT INTO resolved_documents (
236
          uri, did, rkey, title, description, path, site, content, text_content,
237
          cover_image_cid, cover_image_url, bsky_post_ref, tags,
238
          published_at, updated_at, pub_url, pub_name, pub_description,
239
          pub_icon_cid, pub_icon_url, view_url, pds_endpoint,
240
          resolved_at, stale_at, verified
241
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now', '+${STALE_OFFSET_HOURS} hours'), ?)
242
        ON CONFLICT(uri) DO UPDATE SET
243
          title = ?, description = ?, path = ?, site = ?, content = ?, text_content = ?,
244
          cover_image_cid = ?, cover_image_url = ?, bsky_post_ref = ?, tags = ?,
245
          published_at = ?, updated_at = ?, pub_url = ?, pub_name = ?, pub_description = ?,
246
          pub_icon_cid = ?, pub_icon_url = ?, view_url = ?, pds_endpoint = ?,
247
          resolved_at = datetime('now'), stale_at = datetime('now', '+${STALE_OFFSET_HOURS} hours'), verified = ?`,
248
			)
249
			.bind(
250
				// INSERT values
251
				uri,
252
				did,
253
				rkey,
254
				title,
255
				description,
256
				path,
257
				site,
258
				content,
259
				textContent,
260
				coverImageCid,
261
				coverImageUrl,
262
				bskyPostRef,
263
				tags,
264
				publishedAt,
265
				updatedAt,
266
				pubUrl,
267
				pubName,
268
				pubDescription,
269
				pubIconCid,
270
				pubIconUrl,
271
				viewUrl,
272
				pds,
273
				verified ? 1 : 0,
274
				// UPDATE values
275
				title,
276
				description,
277
				path,
278
				site,
279
				content,
280
				textContent,
281
				coverImageCid,
282
				coverImageUrl,
283
				bskyPostRef,
284
				tags,
285
				publishedAt,
286
				updatedAt,
287
				pubUrl,
288
				pubName,
289
				pubDescription,
290
				pubIconCid,
291
				pubIconUrl,
292
				viewUrl,
293
				pds,
294
				verified ? 1 : 0,
295
			)
296
			.run();
297
298
		console.log(`Processed document: ${uri}`);
299
	} catch (error) {
300
		console.error(
301
			`Error processing document ${did}/${collection}/${rkey}:`,
302
			error,
303
		);
304
		// Re-throw so the queue handler can retry
305
		throw error;
306
	}
307
}