fix: fixed race condition for seeding feexs
59c1b2e9
1 file(s) · +156 −26
| 9 | 9 | }; |
|
| 10 | 10 | use serde::Deserialize; |
|
| 11 | 11 | ||
| 12 | + | use andromeda_db::Db; |
|
| 13 | + | ||
| 12 | 14 | use crate::auth::ApiAuth; |
|
| 13 | - | use crate::feeds::{discover_feeds, fetch_feed, parse_opml}; |
|
| 15 | + | use crate::feeds::{discover_feeds, fetch_feed, parse_opml, ParsedEntry}; |
|
| 14 | 16 | use crate::poller::POLL_INTERVAL_KEY; |
|
| 15 | 17 | use crate::AppState; |
|
| 16 | 18 | ||
| 122 | 124 | .into_response(); |
|
| 123 | 125 | } |
|
| 124 | 126 | ||
| 125 | - | // Probe once to resolve title + site_url. |
|
| 127 | + | // Probe once to resolve title + site_url + initial entries. |
|
| 126 | 128 | let probed = fetch_feed(feed_url, None, None).await; |
|
| 127 | - | let (title, site_url, etag, last_modified) = match probed { |
|
| 129 | + | let (title, site_url, etag, last_modified, entries) = match probed { |
|
| 128 | 130 | Ok(r) => ( |
|
| 129 | 131 | body.title |
|
| 130 | 132 | .clone() |
|
| 133 | 135 | r.site_url, |
|
| 134 | 136 | r.etag, |
|
| 135 | 137 | r.last_modified, |
|
| 138 | + | r.entries, |
|
| 136 | 139 | ), |
|
| 137 | 140 | Err(e) => { |
|
| 138 | 141 | return err_json( |
|
| 159 | 162 | Err(e) => return err_json(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()), |
|
| 160 | 163 | }; |
|
| 161 | 164 | ||
| 162 | - | // Seed items immediately so the UI isn't empty until the next poll. |
|
| 163 | - | let _ = fdb::update_subscription_meta( |
|
| 165 | + | seed_subscription( |
|
| 164 | 166 | &state.db, |
|
| 165 | 167 | sub.id, |
|
| 168 | + | &entries, |
|
| 166 | 169 | etag.as_deref(), |
|
| 167 | 170 | last_modified.as_deref(), |
|
| 168 | - | &chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(), |
|
| 169 | - | None, |
|
| 171 | + | state.item_cap, |
|
| 170 | 172 | ); |
|
| 171 | - | if let Ok(result) = fetch_feed(feed_url, None, None).await { |
|
| 172 | - | for entry in &result.entries { |
|
| 173 | - | if entry.link.is_empty() { |
|
| 174 | - | continue; |
|
| 175 | - | } |
|
| 176 | - | let _ = fdb::insert_item_ignore_dup( |
|
| 177 | - | &state.db, |
|
| 178 | - | &fdb::NewItem { |
|
| 179 | - | subscription_id: sub.id, |
|
| 180 | - | guid: &entry.guid, |
|
| 181 | - | title: &entry.title, |
|
| 182 | - | link: &entry.link, |
|
| 183 | - | author: entry.author.as_deref(), |
|
| 184 | - | published_at: entry.published_at, |
|
| 185 | - | }, |
|
| 186 | - | ); |
|
| 187 | - | } |
|
| 188 | - | let _ = fdb::prune_subscription(&state.db, sub.id, state.item_cap as i64); |
|
| 189 | - | } |
|
| 190 | 173 | ||
| 191 | 174 | (StatusCode::CREATED, Json(serde_json::json!({ "subscription": sub }))).into_response() |
|
| 175 | + | } |
|
| 176 | + | ||
| 177 | + | /// Insert probe entries into the new subscription, prune to the item cap, then |
|
| 178 | + | /// persist etag/last_modified. The order matters: persisting the conditional-fetch |
|
| 179 | + | /// metadata before seeding would let the next poller pass receive a 304 against an |
|
| 180 | + | /// empty subscription, leaving it permanently dry until upstream changes. |
|
| 181 | + | pub(crate) fn seed_subscription( |
|
| 182 | + | db: &Db, |
|
| 183 | + | sub_id: i64, |
|
| 184 | + | entries: &[ParsedEntry], |
|
| 185 | + | etag: Option<&str>, |
|
| 186 | + | last_modified: Option<&str>, |
|
| 187 | + | item_cap: usize, |
|
| 188 | + | ) -> usize { |
|
| 189 | + | let mut inserted = 0usize; |
|
| 190 | + | for entry in entries { |
|
| 191 | + | if entry.link.is_empty() { |
|
| 192 | + | continue; |
|
| 193 | + | } |
|
| 194 | + | match fdb::insert_item_ignore_dup( |
|
| 195 | + | db, |
|
| 196 | + | &fdb::NewItem { |
|
| 197 | + | subscription_id: sub_id, |
|
| 198 | + | guid: &entry.guid, |
|
| 199 | + | title: &entry.title, |
|
| 200 | + | link: &entry.link, |
|
| 201 | + | author: entry.author.as_deref(), |
|
| 202 | + | published_at: entry.published_at, |
|
| 203 | + | }, |
|
| 204 | + | ) { |
|
| 205 | + | Ok(true) => inserted += 1, |
|
| 206 | + | Ok(false) => {} |
|
| 207 | + | Err(e) => tracing::warn!("seed insert failed for sub {sub_id}: {e}"), |
|
| 208 | + | } |
|
| 209 | + | } |
|
| 210 | + | let _ = fdb::prune_subscription(db, sub_id, item_cap as i64); |
|
| 211 | + | let _ = fdb::update_subscription_meta( |
|
| 212 | + | db, |
|
| 213 | + | sub_id, |
|
| 214 | + | etag, |
|
| 215 | + | last_modified, |
|
| 216 | + | &chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(), |
|
| 217 | + | None, |
|
| 218 | + | ); |
|
| 219 | + | inserted |
|
| 192 | 220 | } |
|
| 193 | 221 | ||
| 194 | 222 | fn resolve_category( |
|
| 464 | 492 | Err(e) => err_json(StatusCode::BAD_REQUEST, e), |
|
| 465 | 493 | } |
|
| 466 | 494 | } |
|
| 495 | + | ||
| 496 | + | #[cfg(test)] |
|
| 497 | + | mod tests { |
|
| 498 | + | use super::*; |
|
| 499 | + | use andromeda_db::feeds::{ |
|
| 500 | + | get_subscription, insert_subscription, list_items, ListItemsFilter, FEEDS_SCHEMA, |
|
| 501 | + | }; |
|
| 502 | + | use rusqlite::Connection; |
|
| 503 | + | use std::sync::Mutex; |
|
| 504 | + | ||
| 505 | + | fn test_db() -> Db { |
|
| 506 | + | let conn = Connection::open_in_memory().unwrap(); |
|
| 507 | + | conn.execute_batch(FEEDS_SCHEMA).unwrap(); |
|
| 508 | + | Arc::new(Mutex::new(conn)) |
|
| 509 | + | } |
|
| 510 | + | ||
| 511 | + | fn entry(guid: &str, link: &str, ts: i64) -> ParsedEntry { |
|
| 512 | + | ParsedEntry { |
|
| 513 | + | guid: guid.into(), |
|
| 514 | + | title: format!("post {guid}"), |
|
| 515 | + | link: link.into(), |
|
| 516 | + | author: None, |
|
| 517 | + | published_at: ts, |
|
| 518 | + | } |
|
| 519 | + | } |
|
| 520 | + | ||
| 521 | + | #[test] |
|
| 522 | + | fn seed_subscription_inserts_entries_and_persists_meta() { |
|
| 523 | + | let db = test_db(); |
|
| 524 | + | let sub = insert_subscription(&db, "https://x.com/feed", "X", None, None).unwrap(); |
|
| 525 | + | let entries = vec![ |
|
| 526 | + | entry("g1", "https://x.com/1", 100), |
|
| 527 | + | entry("g2", "https://x.com/2", 200), |
|
| 528 | + | ]; |
|
| 529 | + | ||
| 530 | + | let inserted = |
|
| 531 | + | seed_subscription(&db, sub.id, &entries, Some("etag-1"), Some("Sun, 01 Jan"), 50); |
|
| 532 | + | ||
| 533 | + | assert_eq!(inserted, 2); |
|
| 534 | + | let items = list_items(&db, &ListItemsFilter::default()).unwrap(); |
|
| 535 | + | assert_eq!(items.len(), 2); |
|
| 536 | + | let after = get_subscription(&db, sub.id).unwrap().unwrap(); |
|
| 537 | + | assert_eq!(after.etag.as_deref(), Some("etag-1")); |
|
| 538 | + | assert_eq!(after.last_modified.as_deref(), Some("Sun, 01 Jan")); |
|
| 539 | + | assert!(after.last_fetched_at.is_some()); |
|
| 540 | + | assert!(after.last_error.is_none()); |
|
| 541 | + | } |
|
| 542 | + | ||
| 543 | + | #[test] |
|
| 544 | + | fn seed_subscription_skips_empty_links() { |
|
| 545 | + | let db = test_db(); |
|
| 546 | + | let sub = insert_subscription(&db, "https://x.com/feed", "X", None, None).unwrap(); |
|
| 547 | + | let entries = vec![entry("g1", "", 100), entry("g2", "https://x.com/2", 200)]; |
|
| 548 | + | ||
| 549 | + | let inserted = seed_subscription(&db, sub.id, &entries, None, None, 50); |
|
| 550 | + | ||
| 551 | + | assert_eq!(inserted, 1); |
|
| 552 | + | let items = list_items(&db, &ListItemsFilter::default()).unwrap(); |
|
| 553 | + | assert_eq!(items.len(), 1); |
|
| 554 | + | assert_eq!(items[0].guid, "g2"); |
|
| 555 | + | } |
|
| 556 | + | ||
| 557 | + | #[test] |
|
| 558 | + | fn seed_subscription_dedups_on_repeat() { |
|
| 559 | + | let db = test_db(); |
|
| 560 | + | let sub = insert_subscription(&db, "https://x.com/feed", "X", None, None).unwrap(); |
|
| 561 | + | let entries = vec![entry("g1", "https://x.com/1", 100)]; |
|
| 562 | + | ||
| 563 | + | assert_eq!(seed_subscription(&db, sub.id, &entries, None, None, 50), 1); |
|
| 564 | + | assert_eq!(seed_subscription(&db, sub.id, &entries, None, None, 50), 0); |
|
| 565 | + | assert_eq!(list_items(&db, &ListItemsFilter::default()).unwrap().len(), 1); |
|
| 566 | + | } |
|
| 567 | + | ||
| 568 | + | #[test] |
|
| 569 | + | fn seed_subscription_prunes_to_item_cap() { |
|
| 570 | + | let db = test_db(); |
|
| 571 | + | let sub = insert_subscription(&db, "https://x.com/feed", "X", None, None).unwrap(); |
|
| 572 | + | let entries: Vec<_> = (0..10) |
|
| 573 | + | .map(|i| entry(&format!("g{i}"), &format!("https://x.com/{i}"), i as i64)) |
|
| 574 | + | .collect(); |
|
| 575 | + | ||
| 576 | + | seed_subscription(&db, sub.id, &entries, None, None, 3); |
|
| 577 | + | ||
| 578 | + | let items = list_items(&db, &ListItemsFilter::default()).unwrap(); |
|
| 579 | + | assert_eq!(items.len(), 3); |
|
| 580 | + | // newest survive |
|
| 581 | + | assert_eq!(items[0].published_at, 9); |
|
| 582 | + | assert_eq!(items[2].published_at, 7); |
|
| 583 | + | } |
|
| 584 | + | ||
| 585 | + | #[test] |
|
| 586 | + | fn seed_subscription_with_no_entries_still_persists_meta() { |
|
| 587 | + | let db = test_db(); |
|
| 588 | + | let sub = insert_subscription(&db, "https://x.com/feed", "X", None, None).unwrap(); |
|
| 589 | + | ||
| 590 | + | let inserted = seed_subscription(&db, sub.id, &[], Some("etag-empty"), None, 50); |
|
| 591 | + | ||
| 592 | + | assert_eq!(inserted, 0); |
|
| 593 | + | let after = get_subscription(&db, sub.id).unwrap().unwrap(); |
|
| 594 | + | assert_eq!(after.etag.as_deref(), Some("etag-empty")); |
|
| 595 | + | } |
|
| 596 | + | } |
|