From bd11a5c9cbfb2ff58df382d477ec2d36b333bb6e Mon Sep 17 00:00:00 2001 From: FrenchGithubUser Date: Wed, 29 Oct 2025 18:44:03 +0100 Subject: [PATCH] feat: persist peers to the db --- ...1c411af859ae256fc01eaf66af2aa2a922900.json | 26 ++++ .../src/repositories/tracker_repository.rs | 135 ++++++++++++++++++ shared/src/tracker/models/mod.rs | 1 + shared/src/tracker/models/peer_update.rs | 98 +++++++++++++ .../handlers/announce/handle_announce.rs | 29 +++- tracker/arcadia_tracker/src/lib.rs | 3 + tracker/arcadia_tracker/src/scheduler.rs | 3 +- 7 files changed, 290 insertions(+), 5 deletions(-) create mode 100644 backend/storage/.sqlx/query-599587c7ce69b090843274603171c411af859ae256fc01eaf66af2aa2a922900.json create mode 100644 shared/src/tracker/models/peer_update.rs diff --git a/backend/storage/.sqlx/query-599587c7ce69b090843274603171c411af859ae256fc01eaf66af2aa2a922900.json b/backend/storage/.sqlx/query-599587c7ce69b090843274603171c411af859ae256fc01eaf66af2aa2a922900.json new file mode 100644 index 00000000..81bcb3eb --- /dev/null +++ b/backend/storage/.sqlx/query-599587c7ce69b090843274603171c411af859ae256fc01eaf66af2aa2a922900.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO peers (\n peer_id,\n ip,\n port,\n agent,\n uploaded,\n downloaded,\n \"left\",\n active,\n seeder,\n created_at,\n updated_at,\n torrent_id,\n user_id\n )\n SELECT\n t.peer_id,\n t.ip,\n t.port,\n t.agent,\n t.uploaded,\n t.downloaded,\n t.\"left\",\n t.active,\n t.seeder,\n -- stored as timestamp without time zone in DB\n (t.created_at AT TIME ZONE 'UTC')::timestamp,\n (t.updated_at AT TIME ZONE 'UTC')::timestamp,\n t.torrent_id,\n t.user_id\n FROM (\n SELECT * FROM unnest(\n $1::bytea[],\n $2::inet[],\n $3::int[],\n $4::varchar[],\n $5::bigint[],\n $6::bigint[],\n $7::bigint[],\n $8::boolean[],\n $9::boolean[],\n $10::timestamptz[],\n $11::timestamptz[],\n $12::int[],\n $13::int[]\n ) AS t(\n peer_id,\n ip,\n port,\n agent,\n uploaded,\n downloaded,\n \"left\",\n active,\n seeder,\n created_at,\n updated_at,\n torrent_id,\n user_id\n )\n ) AS t\n ON CONFLICT (user_id, torrent_id, peer_id) DO UPDATE SET\n ip = EXCLUDED.ip,\n port = EXCLUDED.port,\n agent = EXCLUDED.agent,\n uploaded = EXCLUDED.uploaded,\n downloaded = EXCLUDED.downloaded,\n \"left\" = EXCLUDED.\"left\",\n active = EXCLUDED.active,\n seeder = EXCLUDED.seeder,\n updated_at = EXCLUDED.updated_at\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "ByteaArray", + "InetArray", + "Int4Array", + "VarcharArray", + "Int8Array", + "Int8Array", + "Int8Array", + "BoolArray", + "BoolArray", + "TimestamptzArray", + "TimestamptzArray", + "Int4Array", + "Int4Array" + ] + }, + "nullable": [] + }, + "hash": "599587c7ce69b090843274603171c411af859ae256fc01eaf66af2aa2a922900" +} diff --git a/backend/storage/src/repositories/tracker_repository.rs b/backend/storage/src/repositories/tracker_repository.rs index 89919d8e..a656825d 100644 --- a/backend/storage/src/repositories/tracker_repository.rs +++ b/backend/storage/src/repositories/tracker_repository.rs @@ -4,6 +4,7 @@ use arcadia_shared::tracker::models::{ infohash_2_id, passkey_2_id, peer::{self, Peer}, peer_id::PeerId, + peer_update::{self, PeerUpdate}, torrent::{self, InfoHash, Torrent}, torrent_update::{self, TorrentUpdate}, user::{self, Passkey, User}, @@ -12,6 +13,7 @@ use arcadia_shared::tracker::models::{ use chrono::{DateTime, Utc}; use futures_util::TryStreamExt; use indexmap::IndexMap; +use sqlx::types::ipnetwork::IpNetwork; use std::{borrow::Borrow, net::IpAddr}; // This file contains functions for Arcadia's tracker @@ -296,4 +298,137 @@ impl ConnectionPool { Ok(()) } + + pub async fn bulk_upsert_peers( + &self, + updates: &Vec<(peer_update::Index, PeerUpdate)>, + ) -> arcadia_shared::error::Result { + if updates.is_empty() { + return Ok(0); + } + + let mut user_ids: Vec = Vec::with_capacity(updates.len()); + let mut torrent_ids: Vec = Vec::with_capacity(updates.len()); + let mut peer_ids: Vec> = Vec::with_capacity(updates.len()); + let mut ips: Vec = Vec::with_capacity(updates.len()); + let mut ports: Vec = Vec::with_capacity(updates.len()); + let mut agents: Vec = Vec::with_capacity(updates.len()); + let mut uploadeds: Vec = Vec::with_capacity(updates.len()); + let mut downloadeds: Vec = Vec::with_capacity(updates.len()); + let mut lefts: Vec = Vec::with_capacity(updates.len()); + let mut actives: Vec = Vec::with_capacity(updates.len()); + let mut seeders: Vec = Vec::with_capacity(updates.len()); + let mut created_ats: Vec> = Vec::with_capacity(updates.len()); + let mut updated_ats: Vec> = Vec::with_capacity(updates.len()); + + for (index, update) in updates { + user_ids.push(index.user_id as i32); + torrent_ids.push(index.torrent_id as i32); + peer_ids.push(index.peer_id.to_vec()); + ips.push(IpNetwork::from(update.ip)); + ports.push(update.port as i32); + agents.push(update.agent.clone()); + uploadeds.push(update.uploaded as i64); + downloadeds.push(update.downloaded as i64); + lefts.push(update.left as i64); + actives.push(update.is_active); + seeders.push(update.is_seeder); + created_ats.push(update.created_at); + updated_ats.push(update.updated_at); + } + + let result = sqlx::query!( + r#" + INSERT INTO peers ( + peer_id, + ip, + port, + agent, + uploaded, + downloaded, + "left", + active, + seeder, + created_at, + updated_at, + torrent_id, + user_id + ) + SELECT + t.peer_id, + t.ip, + t.port, + t.agent, + t.uploaded, + t.downloaded, + t."left", + t.active, + t.seeder, + -- stored as timestamp without time zone in DB + (t.created_at AT TIME ZONE 'UTC')::timestamp, + (t.updated_at AT TIME ZONE 'UTC')::timestamp, + t.torrent_id, + t.user_id + FROM ( + SELECT * FROM unnest( + $1::bytea[], + $2::inet[], + $3::int[], + $4::varchar[], + $5::bigint[], + $6::bigint[], + $7::bigint[], + $8::boolean[], + $9::boolean[], + $10::timestamptz[], + $11::timestamptz[], + $12::int[], + $13::int[] + ) AS t( + peer_id, + ip, + port, + agent, + uploaded, + downloaded, + "left", + active, + seeder, + created_at, + updated_at, + torrent_id, + user_id + ) + ) AS t + ON CONFLICT (user_id, torrent_id, peer_id) DO UPDATE SET + ip = EXCLUDED.ip, + port = EXCLUDED.port, + agent = EXCLUDED.agent, + uploaded = EXCLUDED.uploaded, + downloaded = EXCLUDED.downloaded, + "left" = EXCLUDED."left", + active = EXCLUDED.active, + seeder = EXCLUDED.seeder, + updated_at = EXCLUDED.updated_at + "#, + &peer_ids, + &ips, + &ports, + &agents, + &uploadeds, + &downloadeds, + &lefts, + &actives, + &seeders, + &created_ats, + &updated_ats, + &torrent_ids, + &user_ids + ) + .execute(self.borrow()) + .await + .map_err(|e| arcadia_shared::error::BackendError::DatabseError(e.to_string()))?; + + Ok(result.rows_affected()) + } } diff --git a/shared/src/tracker/models/mod.rs b/shared/src/tracker/models/mod.rs index 9a977168..3a69f65b 100644 --- a/shared/src/tracker/models/mod.rs +++ b/shared/src/tracker/models/mod.rs @@ -7,6 +7,7 @@ pub mod infohash_2_id; pub mod passkey_2_id; pub mod peer; pub mod peer_id; +pub mod peer_update; pub mod torrent; pub mod torrent_activity_update; pub mod torrent_update; diff --git a/shared/src/tracker/models/peer_update.rs b/shared/src/tracker/models/peer_update.rs new file mode 100644 index 00000000..18f91b5b --- /dev/null +++ b/shared/src/tracker/models/peer_update.rs @@ -0,0 +1,98 @@ +use chrono::{DateTime, Utc}; +use parking_lot::Mutex; +use serde::{Deserialize, Serialize}; + +use crate::tracker::models::{peer_id::PeerId, Flushable, Mergeable, Queue}; + +// Fields must be in same order as database primary key +#[derive( + Debug, + Clone, + Serialize, + Deserialize, + Eq, + Hash, + PartialEq, + PartialOrd, + Ord, + bincode::Encode, + bincode::Decode, +)] +pub struct Index { + pub user_id: u32, + pub torrent_id: u32, + pub peer_id: PeerId, +} + +#[derive(Debug, Clone, Serialize, Deserialize, bincode::Encode, bincode::Decode)] +pub struct PeerUpdate { + pub ip: std::net::IpAddr, + pub port: u16, + pub agent: String, + pub uploaded: u64, + pub downloaded: u64, + pub is_active: bool, + pub is_seeder: bool, + pub left: u64, + #[bincode(with_serde)] + pub created_at: DateTime, + #[bincode(with_serde)] + pub updated_at: DateTime, +} + +impl Mergeable for PeerUpdate { + fn merge(&mut self, new: &Self) { + if new.updated_at > self.updated_at { + self.ip = new.ip; + self.port = new.port; + self.agent = new.agent.clone(); + self.uploaded = new.uploaded; + self.downloaded = new.downloaded; + self.is_active = new.is_active; + self.is_seeder = new.is_seeder; + self.left = new.left; + self.updated_at = new.updated_at; + } + + self.created_at = std::cmp::min(self.created_at, new.created_at); + } +} + +impl Flushable for Mutex> { + async fn flush_to_backend(&self) { + let amount_of_updates = self.lock().records.len(); + let updates = self + .lock() + .records + .drain(0..amount_of_updates) + .collect::>(); + if updates.is_empty() { + return; + } + let base_url = + std::env::var("ARCADIA_API_BASE_URL").expect("env var ARCADIA_API_BASE_URL not set"); + let url = format!("{}/api/tracker/peer-updates", base_url); + + let client = reqwest::Client::new(); + let api_key = std::env::var("API_KEY").expect("env var API_KEY not set"); + + let config = bincode::config::standard(); + let bytes = bincode::encode_to_vec(updates, config).expect("error encoding to bincode"); + + let response = client + .post(url) + .header("api_key", api_key) + .header("Content-Type", "application/octet-stream") + .body(bytes) + .send() + .await + .expect("failed to send peer updates to backend"); + + if !response.status().is_success() { + // TODO: reinsert the updates that failed and retry + panic!("Backend returned error: {}", response.text().await.unwrap()); + } else { + log::info!("Inserted {amount_of_updates} peer updates"); + } + } +} diff --git a/tracker/arcadia_tracker/src/announce/handlers/announce/handle_announce.rs b/tracker/arcadia_tracker/src/announce/handlers/announce/handle_announce.rs index 7e5cca13..a909ea5f 100644 --- a/tracker/arcadia_tracker/src/announce/handlers/announce/handle_announce.rs +++ b/tracker/arcadia_tracker/src/announce/handlers/announce/handle_announce.rs @@ -19,6 +19,7 @@ use actix_web::{ }; use arcadia_shared::tracker::models::{ peer::{self, Peer}, + peer_update::{self, PeerUpdate}, torrent_update::{self, TorrentUpdate}, user::Passkey, user_update::{self, UserUpdate}, @@ -104,9 +105,9 @@ impl FromRequest for ClientIp { pub async fn exec( arc: Data, passkey: Path, - // agent: UserAgent, + user_agent: UserAgent, ann: Announce, - ClientIp(ip): ClientIp, + ClientIp(client_ip): ClientIp, ) -> Result { // let headers = req.headers(); // if headers.contains_key(ACCEPT_LANGUAGE) @@ -257,7 +258,7 @@ pub async fn exec( .and_modify(|peer| { old_peer = Some(*peer); - peer.ip_address = ip; + peer.ip_address = client_ip; peer.port = ann.port; peer.is_seeder = ann.left == 0; // peer.is_visible = peer.is_included_in_leech_list(); @@ -269,7 +270,7 @@ pub async fn exec( peer.downloaded = ann.downloaded; }) .or_insert(peer::Peer { - ip_address: ip, + ip_address: client_ip, port: ann.port, is_seeder: ann.left == 0, is_active: true, @@ -520,6 +521,26 @@ pub async fn exec( }); } + arc.peer_updates.lock().upsert( + peer_update::Index { + peer_id: ann.peer_id, + torrent_id, + user_id, + }, + PeerUpdate { + ip: client_ip, + port: ann.port, + agent: user_agent.0, + uploaded: ann.uploaded, + downloaded: ann.downloaded, + is_active: ann.event != AnnounceEvent::Stopped, + is_seeder: ann.left == 0, + left: ann.left, + created_at: now, + updated_at: now, + }, + ); + if credited_uploaded_delta != 0 || credited_downloaded_delta != 0 { arc.user_updates.lock().upsert( user_update::Index { user_id }, diff --git a/tracker/arcadia_tracker/src/lib.rs b/tracker/arcadia_tracker/src/lib.rs index 1180d9d1..7cc63051 100644 --- a/tracker/arcadia_tracker/src/lib.rs +++ b/tracker/arcadia_tracker/src/lib.rs @@ -1,4 +1,5 @@ use arcadia_shared::tracker::models::{ + peer_update::{self, PeerUpdate}, torrent_update::{self, TorrentUpdate}, user_update::{self, UserUpdate}, Queue, @@ -26,6 +27,7 @@ pub struct Tracker { pub torrents: Mutex, pub user_updates: Mutex>, pub torrent_updates: Mutex>, + pub peer_updates: Mutex>, } impl Deref for Tracker { @@ -74,6 +76,7 @@ impl Tracker { torrents: Mutex::new(torrents), user_updates: Mutex::new(Queue::::default()), torrent_updates: Mutex::new(Queue::::default()), + peer_updates: Mutex::new(Queue::::default()), } } } diff --git a/tracker/arcadia_tracker/src/scheduler.rs b/tracker/arcadia_tracker/src/scheduler.rs index d2bce214..2fb9e062 100644 --- a/tracker/arcadia_tracker/src/scheduler.rs +++ b/tracker/arcadia_tracker/src/scheduler.rs @@ -30,7 +30,8 @@ pub async fn handle(arc: &Arc) { pub async fn flush(arc: &Arc) { join!( arc.user_updates.flush_to_backend(), - arc.torrent_updates.flush_to_backend() + arc.torrent_updates.flush_to_backend(), + arc.peer_updates.flush_to_backend() ); }