feat: added queues and updated auth 2f03c5c3
Steve · 2026-01-12 22:57 7 file(s) · +264 −45
packages/server/src/index.ts +42 −0
2 2
import { cors } from "hono/cors";
3 3
import type { Bindings } from "./types";
4 4
import { health, webhook, feed, stats, records } from "./routes";
5 +
import { processDocument } from "./utils";
5 6
6 7
const app = new Hono<{ Bindings: Bindings }>();
7 8
47 48
// Export for Cloudflare Workers
48 49
export default {
49 50
	fetch: app.fetch,
51 +
	async scheduled(
52 +
		event: ScheduledEvent,
53 +
		env: Bindings,
54 +
		ctx: ExecutionContext,
55 +
	) {
56 +
		const batchSize = 50;
57 +
		// Select stale documents
58 +
		const { results } = await env.DB.prepare(
59 +
			`SELECT did, rkey FROM resolved_documents
60 +
       WHERE stale_at < datetime('now') OR stale_at IS NULL
61 +
       LIMIT ?`,
62 +
		)
63 +
			.bind(batchSize)
64 +
			.all<{ did: string; rkey: string }>();
65 +
66 +
		if (results && results.length > 0) {
67 +
			const messages = results.map((row) => ({
68 +
				body: {
69 +
					did: row.did,
70 +
					collection: "site.standard.document",
71 +
					rkey: row.rkey,
72 +
				},
73 +
			}));
74 +
75 +
			// Send to queue
76 +
			await env.RESOLUTION_QUEUE.sendBatch(messages);
77 +
			console.log(`Queued ${messages.length} documents for resolution`);
78 +
		}
79 +
	},
80 +
	async queue(batch: MessageBatch<any>, env: Bindings) {
81 +
		for (const message of batch.messages) {
82 +
			try {
83 +
				const { did, collection, rkey } = message.body;
84 +
				await processDocument(env.DB, did, collection, rkey);
85 +
				message.ack();
86 +
			} catch (error) {
87 +
				console.error("Queue processing error:", error);
88 +
				message.retry();
89 +
			}
90 +
		}
91 +
	},
50 92
};
packages/server/src/routes/webhook.ts +19 −38
1 1
import { Hono } from "hono";
2 2
import type { Bindings, TapEvent } from "../types";
3 -
import { resolvePds, parseAtUri } from "../utils";
3 +
import { resolvePds, parseAtUri, resolveViewUrl } from "../utils";
4 4
5 5
const webhook = new Hono<{ Bindings: Bindings }>();
6 6
7 -
async function resolveViewUrl(
8 -
  db: D1Database,
9 -
  siteUri: string,
10 -
  path: string
11 -
): Promise<string | null> {
12 -
  const parsed = parseAtUri(siteUri);
13 -
  if (!parsed) return null;
14 -
15 -
  try {
16 -
    const pds = await resolvePds(db, parsed.did);
17 -
    if (!pds) return null;
18 -
19 -
    const url = `${pds}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(parsed.did)}&collection=${encodeURIComponent(parsed.collection)}&rkey=${encodeURIComponent(parsed.rkey)}`;
20 -
    const response = await fetch(url);
21 -
    if (!response.ok) return null;
22 -
23 -
    const data = (await response.json()) as {
24 -
      value?: { url?: string; domain?: string };
25 -
    };
26 -
    const siteUrl = data.value?.url || data.value?.domain;
27 -
    if (!siteUrl) return null;
28 -
29 -
    const baseUrl = siteUrl.startsWith("http") ? siteUrl : `https://${siteUrl}`;
30 -
    return `${baseUrl}${path}`;
31 -
  } catch {
32 -
    return null;
33 -
  }
34 -
}
35 -
36 7
webhook.post("/tap", async (c) => {
37 8
  try {
38 9
    const db = c.env.DB;
40 11
    const secret = c.env.TAP_WEBHOOK_SECRET;
41 12
    if (secret) {
42 13
      const auth = c.req.header("Authorization");
43 -
      if (auth !== `Bearer ${secret}`) {
14 +
      // Support both Bearer token (legacy) and Basic Auth (Tap default)
15 +
      // Tap sends Basic Auth as base64("admin:password")
16 +
      const expectedBasic = `Basic ${btoa(`admin:${secret}`)}`;
17 +
      const expectedBearer = `Bearer ${secret}`;
18 +
19 +
      if (auth !== expectedBasic && auth !== expectedBearer) {
44 20
        return c.json({ error: "Unauthorized" }, 401);
45 21
      }
46 22
    }
87 63
88 64
            await db
89 65
              .prepare(
90 -
                `INSERT INTO resolved_documents (uri, did, rkey, title, path, site, content, text_content, published_at, view_url, resolved_at)
91 -
                 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'))
66 +
                `INSERT INTO resolved_documents (uri, did, rkey, title, path, site, content, text_content, published_at, view_url, resolved_at, stale_at)
67 +
                 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now', '+12 hours'))
92 68
                 ON CONFLICT(uri) DO UPDATE SET
93 -
                   title = ?, path = ?, site = ?, content = ?, text_content = ?, published_at = ?, view_url = ?, resolved_at = datetime('now')`
69 +
                   title = ?, path = ?, site = ?, content = ?, text_content = ?, published_at = ?, view_url = ?, resolved_at = datetime('now'), stale_at = datetime('now', '+12 hours')`
94 70
              )
95 71
              .bind(
96 72
                uri,
147 123
    const secret = c.env.TAP_WEBHOOK_SECRET;
148 124
    if (secret) {
149 125
      const auth = c.req.header("Authorization");
150 -
      if (auth !== `Bearer ${secret}`) {
126 +
      // Support both Bearer token (legacy) and Basic Auth (Tap default)
127 +
      // Tap sends Basic Auth as base64("admin:password")
128 +
      const expectedBasic = `Basic ${btoa(`admin:${secret}`)}`;
129 +
      const expectedBearer = `Bearer ${secret}`;
130 +
131 +
      if (auth !== expectedBasic && auth !== expectedBearer) {
151 132
        return c.json({ error: "Unauthorized" }, 401);
152 133
      }
153 134
    }
207 188
208 189
            await db
209 190
              .prepare(
210 -
                `INSERT INTO resolved_documents (uri, did, rkey, title, path, site, content, text_content, published_at, view_url, resolved_at)
211 -
                 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'))
191 +
                `INSERT INTO resolved_documents (uri, did, rkey, title, path, site, content, text_content, published_at, view_url, resolved_at, stale_at)
192 +
                 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now', '+12 hours'))
212 193
                 ON CONFLICT(uri) DO UPDATE SET
213 -
                   title = ?, path = ?, site = ?, content = ?, text_content = ?, published_at = ?, view_url = ?, resolved_at = datetime('now')`
194 +
                   title = ?, path = ?, site = ?, content = ?, text_content = ?, published_at = ?, view_url = ?, resolved_at = datetime('now'), stale_at = datetime('now', '+12 hours')`
214 195
              )
215 196
              .bind(
216 197
                uri,
packages/server/src/types/index.ts +1 −0
1 1
export type Bindings = {
2 2
  DB: D1Database;
3 +
  RESOLUTION_QUEUE: Queue;
3 4
  TAP_WEBHOOK_SECRET?: string;
4 5
};
5 6
packages/server/src/utils/document.ts (added) +142 −0
1 +
import { resolvePds } from "./resolver";
2 +
import { parseAtUri } from "./at-uri";
3 +
4 +
export async function resolveViewUrl(
5 +
  db: D1Database,
6 +
  siteUri: string,
7 +
  path: string
8 +
): Promise<string | null> {
9 +
  const parsed = parseAtUri(siteUri);
10 +
  if (!parsed) return null;
11 +
12 +
  try {
13 +
    const pds = await resolvePds(db, parsed.did);
14 +
    if (!pds) return null;
15 +
16 +
    const url = `${pds}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(
17 +
      parsed.did
18 +
    )}&collection=${encodeURIComponent(parsed.collection)}&rkey=${encodeURIComponent(
19 +
      parsed.rkey
20 +
    )}`;
21 +
    const response = await fetch(url);
22 +
    if (!response.ok) return null;
23 +
24 +
    const data = (await response.json()) as {
25 +
      value?: { url?: string; domain?: string };
26 +
    };
27 +
    const siteUrl = data.value?.url || data.value?.domain;
28 +
    if (!siteUrl) return null;
29 +
30 +
    const baseUrl = siteUrl.startsWith("http") ? siteUrl : `https://${siteUrl}`;
31 +
    return `${baseUrl}${path}`;
32 +
  } catch {
33 +
    return null;
34 +
  }
35 +
}
36 +
37 +
export async function processDocument(
38 +
  db: D1Database,
39 +
  did: string,
40 +
  collection: string,
41 +
  rkey: string
42 +
) {
43 +
  try {
44 +
    // 1. Resolve PDS
45 +
    const pds = await resolvePds(db, did);
46 +
    if (!pds) {
47 +
      console.warn(`Could not resolve PDS for ${did}`);
48 +
      return;
49 +
    }
50 +
51 +
    // 2. Fetch Record
52 +
    const url = `${pds}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(
53 +
      did
54 +
    )}&collection=${encodeURIComponent(collection)}&rkey=${encodeURIComponent(rkey)}`;
55 +
    
56 +
    const response = await fetch(url);
57 +
    if (!response.ok) {
58 +
      if (response.status === 404) {
59 +
         // Record deleted?
60 +
         console.warn(`Record not found: ${did}/${collection}/${rkey}`);
61 +
      }
62 +
      return;
63 +
    }
64 +
65 +
    const data = (await response.json()) as {
66 +
      uri: string;
67 +
      cid?: string;
68 +
      value: {
69 +
        title?: string;
70 +
        path?: string;
71 +
        site?: string;
72 +
        content?: unknown;
73 +
        textContent?: string;
74 +
        publishedAt?: string;
75 +
        [key: string]: unknown;
76 +
      };
77 +
    };
78 +
79 +
    const { value, cid } = data;
80 +
81 +
    // 3. Update repo_records
82 +
    await db
83 +
      .prepare(
84 +
        `INSERT INTO repo_records (did, rkey, collection, cid, synced_at)
85 +
         VALUES (?, ?, ?, ?, datetime('now'))
86 +
         ON CONFLICT(did, collection, rkey) DO UPDATE SET
87 +
           cid = ?,
88 +
           synced_at = datetime('now')`
89 +
      )
90 +
      .bind(did, rkey, collection, cid || null, cid || null)
91 +
      .run();
92 +
93 +
    // 4. Resolve View URL and Update resolved_documents
94 +
    const uri = `at://${did}/${collection}/${rkey}`;
95 +
    let viewUrl: string | null = null;
96 +
    if (value.site && value.path) {
97 +
      viewUrl = await resolveViewUrl(db, value.site, value.path);
98 +
    }
99 +
100 +
    // Set stale_at to 12 hours from now
101 +
    const STALE_OFFSET_HOURS = 12;
102 +
103 +
    await db
104 +
      .prepare(
105 +
        `INSERT INTO resolved_documents (uri, did, rkey, title, path, site, content, text_content, published_at, view_url, resolved_at, stale_at)
106 +
         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now', '+${STALE_OFFSET_HOURS} hours'))
107 +
         ON CONFLICT(uri) DO UPDATE SET
108 +
           title = ?,
109 +
           path = ?,
110 +
           site = ?,
111 +
           content = ?,
112 +
           text_content = ?,
113 +
           published_at = ?,
114 +
           view_url = ?,
115 +
           resolved_at = datetime('now'),
116 +
           stale_at = datetime('now', '+${STALE_OFFSET_HOURS} hours')`
117 +
      )
118 +
      .bind(
119 +
        uri,
120 +
        did,
121 +
        rkey,
122 +
        value.title || null,
123 +
        value.path || null,
124 +
        value.site || null,
125 +
        value.content ? JSON.stringify(value.content) : null,
126 +
        value.textContent || null,
127 +
        value.publishedAt || null,
128 +
        viewUrl,
129 +
        // Update bindings
130 +
        value.title || null,
131 +
        value.path || null,
132 +
        value.site || null,
133 +
        value.content ? JSON.stringify(value.content) : null,
134 +
        value.textContent || null,
135 +
        value.publishedAt || null,
136 +
        viewUrl
137 +
      )
138 +
      .run();
139 +
  } catch (error) {
140 +
    console.error(`Error processing document ${did}/${collection}/${rkey}:`, error);
141 +
  }
142 +
}
packages/server/src/utils/index.ts +1 −0
1 1
export { parseAtUri, buildAtUri, type AtUriComponents } from "./at-uri";
2 2
export { resolvePds } from "./resolver";
3 +
export { resolveViewUrl, processDocument } from "./document";
packages/server/tables.csv (added) +51 −0
1 +
name,sql
2 +
_cf_KV,"CREATE TABLE _cf_KV (
3 +
        key TEXT PRIMARY KEY,
4 +
        value BLOB
5 +
      ) WITHOUT ROWID"
6 +
repo_records,"CREATE TABLE repo_records (
7 +
  id INTEGER PRIMARY KEY AUTOINCREMENT,
8 +
  did TEXT NOT NULL,
9 +
  rkey TEXT NOT NULL,
10 +
  collection TEXT NOT NULL,
11 +
  cid TEXT,
12 +
  synced_at TEXT DEFAULT (datetime('now')),
13 +
  UNIQUE(did, collection, rkey)
14 +
)"
15 +
pds_cache,"CREATE TABLE pds_cache (
16 +
  did TEXT PRIMARY KEY,
17 +
  pds_endpoint TEXT NOT NULL,
18 +
  cached_at TEXT DEFAULT (datetime('now'))
19 +
)"
20 +
record_cache,"CREATE TABLE record_cache (
21 +
  uri TEXT PRIMARY KEY,
22 +
  did TEXT NOT NULL,
23 +
  collection TEXT NOT NULL,
24 +
  rkey TEXT NOT NULL,
25 +
  record_data TEXT NOT NULL,  -- JSON blob
26 +
  cached_at TEXT DEFAULT (datetime('now'))
27 +
)"
28 +
publication_cache,"CREATE TABLE publication_cache (
29 +
  at_uri TEXT PRIMARY KEY,
30 +
  base_url TEXT NOT NULL,
31 +
  cached_at TEXT DEFAULT (datetime('now'))
32 +
)"
33 +
sync_metadata,"CREATE TABLE sync_metadata (
34 +
  key TEXT PRIMARY KEY,
35 +
  value TEXT NOT NULL,
36 +
  updated_at TEXT DEFAULT (datetime('now'))
37 +
)"
38 +
resolved_documents,"CREATE TABLE resolved_documents (
39 +
  uri TEXT PRIMARY KEY,
40 +
  did TEXT NOT NULL,
41 +
  rkey TEXT NOT NULL,
42 +
  title TEXT,
43 +
  path TEXT,
44 +
  site TEXT,
45 +
  content TEXT,  -- JSON blob
46 +
  text_content TEXT,
47 +
  published_at TEXT,
48 +
  view_url TEXT,
49 +
  resolved_at TEXT DEFAULT (datetime('now')),
50 +
  stale_at TEXT  -- When this record should be re-resolved
51 +
)"
packages/server/wrangler.toml +8 −7
9 9
database_id = "bfbb9955-1496-47e9-9602-e32c9b1fa7b2"
10 10
11 11
# Queue for processing document resolution
12 -
# [[queues.producers]]
13 -
# queue = "document-resolution"
14 -
# binding = "RESOLUTION_QUEUE"
12 +
[[queues.producers]]
13 +
queue = "document-resolution"
14 +
binding = "RESOLUTION_QUEUE"
15 +
16 +
[[queues.consumers]]
17 +
queue = "document-resolution"
18 +
max_batch_size = 10
19 +
max_batch_timeout = 30
15 20
16 -
# [[queues.consumers]]
17 -
# queue = "document-resolution"
18 -
# max_batch_size = 10
19 -
# max_batch_timeout = 30
20 21
21 22
# Cron trigger to refresh stale documents
22 23
[triggers]