packages/server/src/routes/webhook.ts 8.9 K raw
1
import { Hono } from "hono";
2
import type { Bindings, TapEvent } from "../types";
3
import { resolveViewUrl } from "../utils";
4
5
const STALE_OFFSET_HOURS = 24;
6
7
const webhook = new Hono<{ Bindings: Bindings }>();
8
9
webhook.post("/tap", async (c) => {
10
  try {
11
    const db = c.env.DB;
12
13
    const secret = c.env.TAP_WEBHOOK_SECRET;
14
    if (secret) {
15
      const auth = c.req.header("Authorization");
16
      // Support both Bearer token (legacy) and Basic Auth (Tap default)
17
      // Tap sends Basic Auth as base64("admin:password")
18
      const expectedBasic = `Basic ${btoa(`admin:${secret}`)}`;
19
      const expectedBearer = `Bearer ${secret}`;
20
21
      if (auth !== expectedBasic && auth !== expectedBearer) {
22
        return c.json({ error: "Unauthorized" }, 401);
23
      }
24
    }
25
26
    const event = (await c.req.json()) as TapEvent;
27
28
    if (event.type === "record") {
29
      const { record } = event;
30
31
      if (record.live === false) {
32
        return c.json({ ok: true });
33
      }
34
35
      if (record.collection === "site.standard.document") {
36
        if (record.action === "create" || record.action === "update") {
37
          await db
38
            .prepare(
39
              `INSERT INTO repo_records (did, rkey, collection, cid, synced_at)
40
               VALUES (?, ?, ?, ?, datetime('now'))
41
               ON CONFLICT(did, collection, rkey) DO UPDATE SET
42
                 cid = ?,
43
                 synced_at = datetime('now')`
44
            )
45
            .bind(
46
              record.did,
47
              record.rkey,
48
              record.collection,
49
              record.cid || null,
50
              record.cid || null
51
            )
52
            .run();
53
54
          if (record.record) {
55
            const uri = `at://${record.did}/${record.collection}/${record.rkey}`;
56
            const doc = record.record as {
57
              title?: string;
58
              path?: string;
59
              site?: string;
60
              content?: unknown;
61
              textContent?: string;
62
              publishedAt?: string;
63
            };
64
65
            let viewUrl: string | null = null;
66
            if (doc.site && doc.path) {
67
              viewUrl = await resolveViewUrl(db, doc.site, doc.path);
68
            }
69
70
            await db
71
              .prepare(
72
                `INSERT INTO resolved_documents (uri, did, rkey, title, path, site, content, text_content, published_at, view_url, resolved_at, stale_at)
73
                 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now', '+${STALE_OFFSET_HOURS} hours'))
74
                 ON CONFLICT(uri) DO UPDATE SET
75
                   title = ?, path = ?, site = ?, content = ?, text_content = ?, published_at = ?, view_url = ?, resolved_at = datetime('now'), stale_at = datetime('now', '+${STALE_OFFSET_HOURS} hours')`
76
              )
77
              .bind(
78
                uri,
79
                record.did,
80
                record.rkey,
81
                doc.title || null,
82
                doc.path || null,
83
                doc.site || null,
84
                doc.content ? JSON.stringify(doc.content) : null,
85
                doc.textContent || null,
86
                doc.publishedAt || null,
87
                viewUrl,
88
                doc.title || null,
89
                doc.path || null,
90
                doc.site || null,
91
                doc.content ? JSON.stringify(doc.content) : null,
92
                doc.textContent || null,
93
                doc.publishedAt || null,
94
                viewUrl
95
              )
96
              .run();
97
          }
98
99
          // Queue for immediate full processing (verification, publication resolution, etc.)
100
          await c.env.RESOLUTION_QUEUE.send({
101
            did: record.did,
102
            collection: record.collection,
103
            rkey: record.rkey,
104
          });
105
        } else if (record.action === "delete") {
106
          await db
107
            .prepare(
108
              "DELETE FROM repo_records WHERE did = ? AND collection = ? AND rkey = ?"
109
            )
110
            .bind(record.did, record.collection, record.rkey)
111
            .run();
112
113
          const uri = `at://${record.did}/${record.collection}/${record.rkey}`;
114
          await db
115
            .prepare("DELETE FROM resolved_documents WHERE uri = ?")
116
            .bind(uri)
117
            .run();
118
        }
119
      }
120
    }
121
122
    return c.json({ ok: true });
123
  } catch (error) {
124
    console.error("Webhook error:", error);
125
    return c.json(
126
      { error: "Failed to process webhook", details: String(error) },
127
      500
128
    );
129
  }
130
});
131
132
webhook.post("/tap/batch", async (c) => {
133
  try {
134
    const db = c.env.DB;
135
136
    const secret = c.env.TAP_WEBHOOK_SECRET;
137
    if (secret) {
138
      const auth = c.req.header("Authorization");
139
      // Support both Bearer token (legacy) and Basic Auth (Tap default)
140
      // Tap sends Basic Auth as base64("admin:password")
141
      const expectedBasic = `Basic ${btoa(`admin:${secret}`)}`;
142
      const expectedBearer = `Bearer ${secret}`;
143
144
      if (auth !== expectedBasic && auth !== expectedBearer) {
145
        return c.json({ error: "Unauthorized" }, 401);
146
      }
147
    }
148
149
    const events = (await c.req.json()) as Array<{
150
      type: string;
151
      did: string;
152
      live?: boolean;
153
      collection?: string;
154
      rkey?: string;
155
      cid?: string;
156
      record?: Record<string, unknown>;
157
    }>;
158
159
    let processed = 0;
160
    let errors = 0;
161
162
    for (const event of events) {
163
      try {
164
        if (event.live === false) {
165
          continue;
166
        }
167
168
        if (
169
          (event.type === "commit" ||
170
            event.type === "create" ||
171
            event.type === "update") &&
172
          event.collection === "site.standard.document" &&
173
          event.did &&
174
          event.rkey
175
        ) {
176
          await db
177
            .prepare(
178
              `INSERT INTO repo_records (did, rkey, collection, cid, synced_at)
179
               VALUES (?, ?, ?, ?, datetime('now'))
180
               ON CONFLICT(did, collection, rkey) DO UPDATE SET cid = ?, synced_at = datetime('now')`
181
            )
182
            .bind(
183
              event.did,
184
              event.rkey,
185
              event.collection,
186
              event.cid || null,
187
              event.cid || null
188
            )
189
            .run();
190
191
          if (event.record) {
192
            const uri = `at://${event.did}/${event.collection}/${event.rkey}`;
193
            const doc = event.record as {
194
              title?: string;
195
              path?: string;
196
              site?: string;
197
              content?: unknown;
198
              textContent?: string;
199
              publishedAt?: string;
200
            };
201
202
            let viewUrl: string | null = null;
203
            if (doc.site && doc.path) {
204
              viewUrl = await resolveViewUrl(db, doc.site, doc.path);
205
            }
206
207
            await db
208
              .prepare(
209
                `INSERT INTO resolved_documents (uri, did, rkey, title, path, site, content, text_content, published_at, view_url, resolved_at, stale_at)
210
                 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now', '+${STALE_OFFSET_HOURS} hours'))
211
                 ON CONFLICT(uri) DO UPDATE SET
212
                   title = ?, path = ?, site = ?, content = ?, text_content = ?, published_at = ?, view_url = ?, resolved_at = datetime('now'), stale_at = datetime('now', '+${STALE_OFFSET_HOURS} hours')`
213
              )
214
              .bind(
215
                uri,
216
                event.did,
217
                event.rkey,
218
                doc.title || null,
219
                doc.path || null,
220
                doc.site || null,
221
                doc.content ? JSON.stringify(doc.content) : null,
222
                doc.textContent || null,
223
                doc.publishedAt || null,
224
                viewUrl,
225
                doc.title || null,
226
                doc.path || null,
227
                doc.site || null,
228
                doc.content ? JSON.stringify(doc.content) : null,
229
                doc.textContent || null,
230
                doc.publishedAt || null,
231
                viewUrl
232
              )
233
              .run();
234
          }
235
236
          // Queue for immediate full processing
237
          await c.env.RESOLUTION_QUEUE.send({
238
            did: event.did,
239
            collection: event.collection,
240
            rkey: event.rkey,
241
          });
242
243
          processed++;
244
        } else if (
245
          event.type === "delete" &&
246
          event.collection === "site.standard.document" &&
247
          event.did &&
248
          event.rkey
249
        ) {
250
          await db
251
            .prepare(
252
              "DELETE FROM repo_records WHERE did = ? AND collection = ? AND rkey = ?"
253
            )
254
            .bind(event.did, event.collection, event.rkey)
255
            .run();
256
257
          const uri = `at://${event.did}/${event.collection}/${event.rkey}`;
258
          await db
259
            .prepare("DELETE FROM resolved_documents WHERE uri = ?")
260
            .bind(uri)
261
            .run();
262
          processed++;
263
        }
264
      } catch {
265
        errors++;
266
      }
267
    }
268
269
    return c.json({ ok: true, processed, errors });
270
  } catch (error) {
271
    console.error("Batch webhook error:", error);
272
    return c.json(
273
      { error: "Failed to process batch webhook", details: String(error) },
274
      500
275
    );
276
  }
277
});
278
279
export default webhook;