chore: added verification to data flow 6ce0020e
Steve · 2026-01-13 23:15 10 file(s) · +176 −119
packages/server/schema.sql +3 −1
55 55
  view_url TEXT,  -- Constructed canonical URL (pub_url + path)
56 56
  pds_endpoint TEXT,  -- Cached PDS endpoint for this DID
57 57
  resolved_at TEXT DEFAULT (datetime('now')),
58 -
  stale_at TEXT  -- When this record should be re-resolved
58 +
  stale_at TEXT,  -- When this record should be re-resolved
59 +
  verified INTEGER DEFAULT 0  -- Whether the record has been verified via .well-known or link tag
59 60
);
60 61
61 62
CREATE INDEX IF NOT EXISTS idx_resolved_documents_rkey ON resolved_documents(rkey DESC);
62 63
CREATE INDEX IF NOT EXISTS idx_resolved_documents_stale ON resolved_documents(stale_at);
63 64
CREATE INDEX IF NOT EXISTS idx_resolved_documents_pub_url ON resolved_documents(pub_url);
65 +
CREATE INDEX IF NOT EXISTS idx_resolved_documents_verified ON resolved_documents(verified);
packages/server/src/index.ts +2 −2
1 1
import { Hono } from "hono";
2 2
import { cors } from "hono/cors";
3 3
import type { Bindings } from "./types";
4 -
import { health, webhook, feed, stats, records } from "./routes";
4 +
import { health, webhook, feed, stats, records, admin } from "./routes";
5 5
import { processDocument } from "./utils";
6 6
7 7
const app = new Hono<{ Bindings: Bindings }>();
15 15
app.route("/feed", feed);
16 16
app.route("/stats", stats);
17 17
app.route("/records", records);
18 -
//app.route("/admin", admin);
18 +
app.route("/admin", admin);
19 19
20 20
// Legacy alias: /feed-raw -> /feed/raw
21 21
app.get("/feed-raw", async (c) => {
packages/server/src/routes/admin.ts +55 −55
5 5
6 6
// Queue all documents for re-processing
7 7
admin.post("/resolve-all", async (c) => {
8 -
  try {
9 -
    const db = c.env.DB;
10 -
    const queue = c.env.RESOLUTION_QUEUE;
8 +
	try {
9 +
		const db = c.env.DB;
10 +
		const queue = c.env.RESOLUTION_QUEUE;
11 11
12 -
    // Get all records from repo_records
13 -
    const { results } = await db
14 -
      .prepare(
15 -
        `SELECT did, rkey FROM repo_records
16 -
         WHERE collection = 'site.standard.document'`
17 -
      )
18 -
      .all<{ did: string; rkey: string }>();
12 +
		// Get all records from repo_records
13 +
		const { results } = await db
14 +
			.prepare(
15 +
				`SELECT did, rkey FROM repo_records
16 +
         WHERE collection = 'site.standard.document'`,
17 +
			)
18 +
			.all<{ did: string; rkey: string }>();
19 19
20 -
    if (!results || results.length === 0) {
21 -
      return c.json({ message: "No documents to process", queued: 0 });
22 -
    }
20 +
		if (!results || results.length === 0) {
21 +
			return c.json({ message: "No documents to process", queued: 0 });
22 +
		}
23 23
24 -
    // Queue in batches of 100 (Cloudflare Queue limit)
25 -
    const batchSize = 100;
26 -
    let queued = 0;
24 +
		// Queue in batches of 100 (Cloudflare Queue limit)
25 +
		const batchSize = 100;
26 +
		let queued = 0;
27 27
28 -
    for (let i = 0; i < results.length; i += batchSize) {
29 -
      const batch = results.slice(i, i + batchSize);
30 -
      const messages = batch.map((row) => ({
31 -
        body: {
32 -
          did: row.did,
33 -
          collection: "site.standard.document",
34 -
          rkey: row.rkey,
35 -
        },
36 -
      }));
28 +
		for (let i = 0; i < results.length; i += batchSize) {
29 +
			const batch = results.slice(i, i + batchSize);
30 +
			const messages = batch.map((row) => ({
31 +
				body: {
32 +
					did: row.did,
33 +
					collection: "site.standard.document",
34 +
					rkey: row.rkey,
35 +
				},
36 +
			}));
37 37
38 -
      await queue.sendBatch(messages);
39 -
      queued += messages.length;
40 -
    }
38 +
			await queue.sendBatch(messages);
39 +
			queued += messages.length;
40 +
		}
41 41
42 -
    return c.json({
43 -
      message: "Documents queued for re-processing",
44 -
      queued,
45 -
    });
46 -
  } catch (error) {
47 -
    return c.json(
48 -
      { error: "Failed to queue documents", details: String(error) },
49 -
      500
50 -
    );
51 -
  }
42 +
		return c.json({
43 +
			message: "Documents queued for re-processing",
44 +
			queued,
45 +
		});
46 +
	} catch (error) {
47 +
		return c.json(
48 +
			{ error: "Failed to queue documents", details: String(error) },
49 +
			500,
50 +
		);
51 +
	}
52 52
});
53 53
54 54
// Mark all documents as stale (alternative - lets cron handle it)
55 55
admin.post("/mark-stale", async (c) => {
56 -
  try {
57 -
    const db = c.env.DB;
56 +
	try {
57 +
		const db = c.env.DB;
58 58
59 -
    const result = await db
60 -
      .prepare(
61 -
        `UPDATE resolved_documents SET stale_at = datetime('now', '-1 hour')`
62 -
      )
63 -
      .run();
59 +
		const result = await db
60 +
			.prepare(
61 +
				`UPDATE resolved_documents SET stale_at = datetime('now', '-1 hour')`,
62 +
			)
63 +
			.run();
64 64
65 -
    return c.json({
66 -
      message: "All documents marked as stale",
67 -
      affected: result.meta.changes,
68 -
    });
69 -
  } catch (error) {
70 -
    return c.json(
71 -
      { error: "Failed to mark documents as stale", details: String(error) },
72 -
      500
73 -
    );
74 -
  }
65 +
		return c.json({
66 +
			message: "All documents marked as stale",
67 +
			affected: result.meta.changes,
68 +
		});
69 +
	} catch (error) {
70 +
		return c.json(
71 +
			{ error: "Failed to mark documents as stale", details: String(error) },
72 +
			500,
73 +
		);
74 +
	}
75 75
});
76 76
77 77
export default admin;
packages/server/src/routes/feed.ts +4 −3
83 83
      .prepare(
84 84
        `SELECT did, rkey FROM repo_records
85 85
         WHERE collection = 'site.standard.document'
86 -
         ORDER BY rkey DESC
86 +
         ORDER BY published_at DESC
87 87
         LIMIT ? OFFSET ?`
88 88
      )
89 89
      .bind(limit, offset)
116 116
                cover_image_cid, cover_image_url, bsky_post_ref, tags,
117 117
                published_at, updated_at, pub_url, pub_name, pub_description,
118 118
                pub_icon_cid, pub_icon_url, view_url, pds_endpoint,
119 -
                resolved_at, stale_at
119 +
                resolved_at, stale_at, verified
120 120
         FROM resolved_documents
121 -
         ORDER BY rkey DESC
121 +
         WHERE verified = 1
122 +
         ORDER BY published_at DESC
122 123
         LIMIT ? OFFSET ?`
123 124
      )
124 125
      .bind(limit, offset)
packages/server/src/routes/index.ts +1 −1
3 3
export { default as feed } from "./feed";
4 4
export { default as stats } from "./stats";
5 5
export { default as records } from "./records";
6 -
//export { default as admin } from "./admin";
6 +
export { default as admin } from "./admin";
packages/server/src/types/index.ts +1 −0
98 98
  pds_endpoint: string | null;
99 99
  resolved_at: string | null;
100 100
  stale_at: string | null;
101 +
  verified: number | null;
101 102
}
packages/server/src/utils/document.ts +10 −6
1 1
import { resolvePds } from "./resolver";
2 2
import { parseAtUri } from "./at-uri";
3 3
import { buildBlobUrl, extractBlobCid } from "./blob";
4 +
import { verifyDocumentRecord } from "./verification";
4 5
5 6
// Raw document record from PDS
6 7
interface DocumentRecord {
198 199
      }
199 200
    }
200 201
201 -
    // 6. Insert/update resolved_documents
202 +
    // 6. Verify the document
202 203
    const uri = `at://${did}/${collection}/${rkey}`;
204 +
    const verified = await verifyDocumentRecord(pubUrl, site, viewUrl, uri);
205 +
206 +
    // 7. Insert/update resolved_documents
203 207
    const STALE_OFFSET_HOURS = 12;
204 208
205 209
    await db
209 213
          cover_image_cid, cover_image_url, bsky_post_ref, tags,
210 214
          published_at, updated_at, pub_url, pub_name, pub_description,
211 215
          pub_icon_cid, pub_icon_url, view_url, pds_endpoint,
212 -
          resolved_at, stale_at
213 -
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now', '+${STALE_OFFSET_HOURS} hours'))
216 +
          resolved_at, stale_at, verified
217 +
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now', '+${STALE_OFFSET_HOURS} hours'), ?)
214 218
        ON CONFLICT(uri) DO UPDATE SET
215 219
          title = ?, description = ?, path = ?, site = ?, content = ?, text_content = ?,
216 220
          cover_image_cid = ?, cover_image_url = ?, bsky_post_ref = ?, tags = ?,
217 221
          published_at = ?, updated_at = ?, pub_url = ?, pub_name = ?, pub_description = ?,
218 222
          pub_icon_cid = ?, pub_icon_url = ?, view_url = ?, pds_endpoint = ?,
219 -
          resolved_at = datetime('now'), stale_at = datetime('now', '+${STALE_OFFSET_HOURS} hours')`
223 +
          resolved_at = datetime('now'), stale_at = datetime('now', '+${STALE_OFFSET_HOURS} hours'), verified = ?`
220 224
      )
221 225
      .bind(
222 226
        // INSERT values
223 227
        uri, did, rkey, title, description, path, site, content, textContent,
224 228
        coverImageCid, coverImageUrl, bskyPostRef, tags,
225 229
        publishedAt, updatedAt, pubUrl, pubName, pubDescription,
226 -
        pubIconCid, pubIconUrl, viewUrl, pds,
230 +
        pubIconCid, pubIconUrl, viewUrl, pds, verified ? 1 : 0,
227 231
        // UPDATE values
228 232
        title, description, path, site, content, textContent,
229 233
        coverImageCid, coverImageUrl, bskyPostRef, tags,
230 234
        publishedAt, updatedAt, pubUrl, pubName, pubDescription,
231 -
        pubIconCid, pubIconUrl, viewUrl, pds
235 +
        pubIconCid, pubIconUrl, viewUrl, pds, verified ? 1 : 0
232 236
      )
233 237
      .run();
234 238
packages/server/src/utils/index.ts +1 −0
2 2
export { resolvePds } from "./resolver";
3 3
export { resolveViewUrl, processDocument } from "./document";
4 4
export { buildBlobUrl, extractBlobCid } from "./blob";
5 +
export { verifyPublication, verifyDocument, verifyDocumentRecord } from "./verification";
packages/server/src/utils/verification.ts (added) +99 −0
1 +
/**
2 +
 * Verification utilities for standard.site records.
3 +
 *
4 +
 * Publications are verified via /.well-known/site.standard.publication
5 +
 * Documents are verified via <link rel="site.standard.document"> in HTML
6 +
 */
7 +
8 +
/**
9 +
 * Verifies a publication by checking /.well-known/site.standard.publication
10 +
 * @param pubUrl The publication's base URL (e.g., "https://example.com")
11 +
 * @param siteUri The expected AT-URI of the publication (e.g., "at://did:plc:abc/site.standard.publication/rkey")
12 +
 * @returns true if the .well-known endpoint returns the matching AT-URI
13 +
 */
14 +
export async function verifyPublication(
15 +
  pubUrl: string,
16 +
  siteUri: string
17 +
): Promise<boolean> {
18 +
  try {
19 +
    const baseUrl = pubUrl.startsWith("http") ? pubUrl : `https://${pubUrl}`;
20 +
    const wellKnownUrl = `${baseUrl.replace(/\/$/, "")}/.well-known/site.standard.publication`;
21 +
22 +
    const response = await fetch(wellKnownUrl, {
23 +
      headers: { Accept: "text/plain" },
24 +
    });
25 +
26 +
    if (!response.ok) return false;
27 +
28 +
    const body = await response.text();
29 +
    return body.trim() === siteUri.trim();
30 +
  } catch {
31 +
    return false;
32 +
  }
33 +
}
34 +
35 +
/**
36 +
 * Verifies a document by checking for a matching <link rel="site.standard.document"> tag
37 +
 * @param viewUrl The document's canonical URL (e.g., "https://example.com/blog/post")
38 +
 * @param documentUri The expected AT-URI of the document (e.g., "at://did:plc:abc/site.standard.document/rkey")
39 +
 * @returns true if the HTML contains a matching link tag
40 +
 */
41 +
export async function verifyDocument(
42 +
  viewUrl: string,
43 +
  documentUri: string
44 +
): Promise<boolean> {
45 +
  try {
46 +
    const response = await fetch(viewUrl, {
47 +
      headers: { Accept: "text/html" },
48 +
    });
49 +
50 +
    if (!response.ok) return false;
51 +
52 +
    const html = await response.text();
53 +
54 +
    // Look for <link rel="site.standard.document" href="at://...">
55 +
    // Using regex to avoid heavy HTML parser dependency
56 +
    const linkPattern =
57 +
      /<link[^>]+rel=["']site\.standard\.document["'][^>]+href=["']([^"']+)["'][^>]*>/i;
58 +
    const altPattern =
59 +
      /<link[^>]+href=["']([^"']+)["'][^>]+rel=["']site\.standard\.document["'][^>]*>/i;
60 +
61 +
    const match = html.match(linkPattern) || html.match(altPattern);
62 +
    if (!match) return false;
63 +
64 +
    return match[1].trim() === documentUri.trim();
65 +
  } catch {
66 +
    return false;
67 +
  }
68 +
}
69 +
70 +
/**
71 +
 * Combined verification for a document record.
72 +
 * Checks publication verification first (if applicable), then document verification.
73 +
 *
74 +
 * @param pubUrl The publication's base URL
75 +
 * @param siteUri The AT-URI of the publication (from document's site field)
76 +
 * @param viewUrl The document's canonical URL
77 +
 * @param documentUri The AT-URI of the document
78 +
 * @returns true if either publication or document verification passes
79 +
 */
80 +
export async function verifyDocumentRecord(
81 +
  pubUrl: string | null,
82 +
  siteUri: string | null,
83 +
  viewUrl: string | null,
84 +
  documentUri: string
85 +
): Promise<boolean> {
86 +
  // Try publication verification first (if we have a publication AT-URI)
87 +
  if (pubUrl && siteUri && siteUri.startsWith("at://")) {
88 +
    const pubVerified = await verifyPublication(pubUrl, siteUri);
89 +
    if (pubVerified) return true;
90 +
  }
91 +
92 +
  // Fall back to document verification (if we have a view URL)
93 +
  if (viewUrl) {
94 +
    const docVerified = await verifyDocument(viewUrl, documentUri);
95 +
    if (docVerified) return true;
96 +
  }
97 +
98 +
  return false;
99 +
}
packages/server/tables.csv (deleted) +0 −51
1 -
name,sql
2 -
_cf_KV,"CREATE TABLE _cf_KV (
3 -
        key TEXT PRIMARY KEY,
4 -
        value BLOB
5 -
      ) WITHOUT ROWID"
6 -
repo_records,"CREATE TABLE repo_records (
7 -
  id INTEGER PRIMARY KEY AUTOINCREMENT,
8 -
  did TEXT NOT NULL,
9 -
  rkey TEXT NOT NULL,
10 -
  collection TEXT NOT NULL,
11 -
  cid TEXT,
12 -
  synced_at TEXT DEFAULT (datetime('now')),
13 -
  UNIQUE(did, collection, rkey)
14 -
)"
15 -
pds_cache,"CREATE TABLE pds_cache (
16 -
  did TEXT PRIMARY KEY,
17 -
  pds_endpoint TEXT NOT NULL,
18 -
  cached_at TEXT DEFAULT (datetime('now'))
19 -
)"
20 -
record_cache,"CREATE TABLE record_cache (
21 -
  uri TEXT PRIMARY KEY,
22 -
  did TEXT NOT NULL,
23 -
  collection TEXT NOT NULL,
24 -
  rkey TEXT NOT NULL,
25 -
  record_data TEXT NOT NULL,  -- JSON blob
26 -
  cached_at TEXT DEFAULT (datetime('now'))
27 -
)"
28 -
publication_cache,"CREATE TABLE publication_cache (
29 -
  at_uri TEXT PRIMARY KEY,
30 -
  base_url TEXT NOT NULL,
31 -
  cached_at TEXT DEFAULT (datetime('now'))
32 -
)"
33 -
sync_metadata,"CREATE TABLE sync_metadata (
34 -
  key TEXT PRIMARY KEY,
35 -
  value TEXT NOT NULL,
36 -
  updated_at TEXT DEFAULT (datetime('now'))
37 -
)"
38 -
resolved_documents,"CREATE TABLE resolved_documents (
39 -
  uri TEXT PRIMARY KEY,
40 -
  did TEXT NOT NULL,
41 -
  rkey TEXT NOT NULL,
42 -
  title TEXT,
43 -
  path TEXT,
44 -
  site TEXT,
45 -
  content TEXT,  -- JSON blob
46 -
  text_content TEXT,
47 -
  published_at TEXT,
48 -
  view_url TEXT,
49 -
  resolved_at TEXT DEFAULT (datetime('now')),
50 -
  stale_at TEXT  -- When this record should be re-resolved
51 -
)"