feat: persist peers to the db

This commit is contained in:
FrenchGithubUser
2025-10-29 18:44:03 +01:00
parent 7b7ffb6fb6
commit bd11a5c9cb
7 changed files with 290 additions and 5 deletions

View File

@@ -0,0 +1,26 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO peers (\n peer_id,\n ip,\n port,\n agent,\n uploaded,\n downloaded,\n \"left\",\n active,\n seeder,\n created_at,\n updated_at,\n torrent_id,\n user_id\n )\n SELECT\n t.peer_id,\n t.ip,\n t.port,\n t.agent,\n t.uploaded,\n t.downloaded,\n t.\"left\",\n t.active,\n t.seeder,\n -- stored as timestamp without time zone in DB\n (t.created_at AT TIME ZONE 'UTC')::timestamp,\n (t.updated_at AT TIME ZONE 'UTC')::timestamp,\n t.torrent_id,\n t.user_id\n FROM (\n SELECT * FROM unnest(\n $1::bytea[],\n $2::inet[],\n $3::int[],\n $4::varchar[],\n $5::bigint[],\n $6::bigint[],\n $7::bigint[],\n $8::boolean[],\n $9::boolean[],\n $10::timestamptz[],\n $11::timestamptz[],\n $12::int[],\n $13::int[]\n ) AS t(\n peer_id,\n ip,\n port,\n agent,\n uploaded,\n downloaded,\n \"left\",\n active,\n seeder,\n created_at,\n updated_at,\n torrent_id,\n user_id\n )\n ) AS t\n ON CONFLICT (user_id, torrent_id, peer_id) DO UPDATE SET\n ip = EXCLUDED.ip,\n port = EXCLUDED.port,\n agent = EXCLUDED.agent,\n uploaded = EXCLUDED.uploaded,\n downloaded = EXCLUDED.downloaded,\n \"left\" = EXCLUDED.\"left\",\n active = EXCLUDED.active,\n seeder = EXCLUDED.seeder,\n updated_at = EXCLUDED.updated_at\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"ByteaArray",
"InetArray",
"Int4Array",
"VarcharArray",
"Int8Array",
"Int8Array",
"Int8Array",
"BoolArray",
"BoolArray",
"TimestamptzArray",
"TimestamptzArray",
"Int4Array",
"Int4Array"
]
},
"nullable": []
},
"hash": "599587c7ce69b090843274603171c411af859ae256fc01eaf66af2aa2a922900"
}

View File

@@ -4,6 +4,7 @@ use arcadia_shared::tracker::models::{
infohash_2_id, passkey_2_id,
peer::{self, Peer},
peer_id::PeerId,
peer_update::{self, PeerUpdate},
torrent::{self, InfoHash, Torrent},
torrent_update::{self, TorrentUpdate},
user::{self, Passkey, User},
@@ -12,6 +13,7 @@ use arcadia_shared::tracker::models::{
use chrono::{DateTime, Utc};
use futures_util::TryStreamExt;
use indexmap::IndexMap;
use sqlx::types::ipnetwork::IpNetwork;
use std::{borrow::Borrow, net::IpAddr};
// This file contains functions for Arcadia's tracker
@@ -296,4 +298,137 @@ impl ConnectionPool {
Ok(())
}
pub async fn bulk_upsert_peers(
&self,
updates: &Vec<(peer_update::Index, PeerUpdate)>,
) -> arcadia_shared::error::Result<u64> {
if updates.is_empty() {
return Ok(0);
}
let mut user_ids: Vec<i32> = Vec::with_capacity(updates.len());
let mut torrent_ids: Vec<i32> = Vec::with_capacity(updates.len());
let mut peer_ids: Vec<Vec<u8>> = Vec::with_capacity(updates.len());
let mut ips: Vec<IpNetwork> = Vec::with_capacity(updates.len());
let mut ports: Vec<i32> = Vec::with_capacity(updates.len());
let mut agents: Vec<String> = Vec::with_capacity(updates.len());
let mut uploadeds: Vec<i64> = Vec::with_capacity(updates.len());
let mut downloadeds: Vec<i64> = Vec::with_capacity(updates.len());
let mut lefts: Vec<i64> = Vec::with_capacity(updates.len());
let mut actives: Vec<bool> = Vec::with_capacity(updates.len());
let mut seeders: Vec<bool> = Vec::with_capacity(updates.len());
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(updates.len());
let mut updated_ats: Vec<DateTime<Utc>> = Vec::with_capacity(updates.len());
for (index, update) in updates {
user_ids.push(index.user_id as i32);
torrent_ids.push(index.torrent_id as i32);
peer_ids.push(index.peer_id.to_vec());
ips.push(IpNetwork::from(update.ip));
ports.push(update.port as i32);
agents.push(update.agent.clone());
uploadeds.push(update.uploaded as i64);
downloadeds.push(update.downloaded as i64);
lefts.push(update.left as i64);
actives.push(update.is_active);
seeders.push(update.is_seeder);
created_ats.push(update.created_at);
updated_ats.push(update.updated_at);
}
let result = sqlx::query!(
r#"
INSERT INTO peers (
peer_id,
ip,
port,
agent,
uploaded,
downloaded,
"left",
active,
seeder,
created_at,
updated_at,
torrent_id,
user_id
)
SELECT
t.peer_id,
t.ip,
t.port,
t.agent,
t.uploaded,
t.downloaded,
t."left",
t.active,
t.seeder,
-- stored as timestamp without time zone in DB
(t.created_at AT TIME ZONE 'UTC')::timestamp,
(t.updated_at AT TIME ZONE 'UTC')::timestamp,
t.torrent_id,
t.user_id
FROM (
SELECT * FROM unnest(
$1::bytea[],
$2::inet[],
$3::int[],
$4::varchar[],
$5::bigint[],
$6::bigint[],
$7::bigint[],
$8::boolean[],
$9::boolean[],
$10::timestamptz[],
$11::timestamptz[],
$12::int[],
$13::int[]
) AS t(
peer_id,
ip,
port,
agent,
uploaded,
downloaded,
"left",
active,
seeder,
created_at,
updated_at,
torrent_id,
user_id
)
) AS t
ON CONFLICT (user_id, torrent_id, peer_id) DO UPDATE SET
ip = EXCLUDED.ip,
port = EXCLUDED.port,
agent = EXCLUDED.agent,
uploaded = EXCLUDED.uploaded,
downloaded = EXCLUDED.downloaded,
"left" = EXCLUDED."left",
active = EXCLUDED.active,
seeder = EXCLUDED.seeder,
updated_at = EXCLUDED.updated_at
"#,
&peer_ids,
&ips,
&ports,
&agents,
&uploadeds,
&downloadeds,
&lefts,
&actives,
&seeders,
&created_ats,
&updated_ats,
&torrent_ids,
&user_ids
)
.execute(self.borrow())
.await
.map_err(|e| arcadia_shared::error::BackendError::DatabseError(e.to_string()))?;
Ok(result.rows_affected())
}
}

View File

@@ -7,6 +7,7 @@ pub mod infohash_2_id;
pub mod passkey_2_id;
pub mod peer;
pub mod peer_id;
pub mod peer_update;
pub mod torrent;
pub mod torrent_activity_update;
pub mod torrent_update;

View File

@@ -0,0 +1,98 @@
use chrono::{DateTime, Utc};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use crate::tracker::models::{peer_id::PeerId, Flushable, Mergeable, Queue};
// Fields must be in same order as database primary key
#[derive(
Debug,
Clone,
Serialize,
Deserialize,
Eq,
Hash,
PartialEq,
PartialOrd,
Ord,
bincode::Encode,
bincode::Decode,
)]
pub struct Index {
pub user_id: u32,
pub torrent_id: u32,
pub peer_id: PeerId,
}
#[derive(Debug, Clone, Serialize, Deserialize, bincode::Encode, bincode::Decode)]
pub struct PeerUpdate {
pub ip: std::net::IpAddr,
pub port: u16,
pub agent: String,
pub uploaded: u64,
pub downloaded: u64,
pub is_active: bool,
pub is_seeder: bool,
pub left: u64,
#[bincode(with_serde)]
pub created_at: DateTime<Utc>,
#[bincode(with_serde)]
pub updated_at: DateTime<Utc>,
}
impl Mergeable for PeerUpdate {
fn merge(&mut self, new: &Self) {
if new.updated_at > self.updated_at {
self.ip = new.ip;
self.port = new.port;
self.agent = new.agent.clone();
self.uploaded = new.uploaded;
self.downloaded = new.downloaded;
self.is_active = new.is_active;
self.is_seeder = new.is_seeder;
self.left = new.left;
self.updated_at = new.updated_at;
}
self.created_at = std::cmp::min(self.created_at, new.created_at);
}
}
impl Flushable<PeerUpdate> for Mutex<Queue<Index, PeerUpdate>> {
async fn flush_to_backend(&self) {
let amount_of_updates = self.lock().records.len();
let updates = self
.lock()
.records
.drain(0..amount_of_updates)
.collect::<Vec<(Index, PeerUpdate)>>();
if updates.is_empty() {
return;
}
let base_url =
std::env::var("ARCADIA_API_BASE_URL").expect("env var ARCADIA_API_BASE_URL not set");
let url = format!("{}/api/tracker/peer-updates", base_url);
let client = reqwest::Client::new();
let api_key = std::env::var("API_KEY").expect("env var API_KEY not set");
let config = bincode::config::standard();
let bytes = bincode::encode_to_vec(updates, config).expect("error encoding to bincode");
let response = client
.post(url)
.header("api_key", api_key)
.header("Content-Type", "application/octet-stream")
.body(bytes)
.send()
.await
.expect("failed to send peer updates to backend");
if !response.status().is_success() {
// TODO: reinsert the updates that failed and retry
panic!("Backend returned error: {}", response.text().await.unwrap());
} else {
log::info!("Inserted {amount_of_updates} peer updates");
}
}
}

View File

@@ -19,6 +19,7 @@ use actix_web::{
};
use arcadia_shared::tracker::models::{
peer::{self, Peer},
peer_update::{self, PeerUpdate},
torrent_update::{self, TorrentUpdate},
user::Passkey,
user_update::{self, UserUpdate},
@@ -104,9 +105,9 @@ impl FromRequest for ClientIp {
pub async fn exec(
arc: Data<Tracker>,
passkey: Path<String>,
// agent: UserAgent,
user_agent: UserAgent,
ann: Announce,
ClientIp(ip): ClientIp,
ClientIp(client_ip): ClientIp,
) -> Result<HttpResponse> {
// let headers = req.headers();
// if headers.contains_key(ACCEPT_LANGUAGE)
@@ -257,7 +258,7 @@ pub async fn exec(
.and_modify(|peer| {
old_peer = Some(*peer);
peer.ip_address = ip;
peer.ip_address = client_ip;
peer.port = ann.port;
peer.is_seeder = ann.left == 0;
// peer.is_visible = peer.is_included_in_leech_list();
@@ -269,7 +270,7 @@ pub async fn exec(
peer.downloaded = ann.downloaded;
})
.or_insert(peer::Peer {
ip_address: ip,
ip_address: client_ip,
port: ann.port,
is_seeder: ann.left == 0,
is_active: true,
@@ -520,6 +521,26 @@ pub async fn exec(
});
}
arc.peer_updates.lock().upsert(
peer_update::Index {
peer_id: ann.peer_id,
torrent_id,
user_id,
},
PeerUpdate {
ip: client_ip,
port: ann.port,
agent: user_agent.0,
uploaded: ann.uploaded,
downloaded: ann.downloaded,
is_active: ann.event != AnnounceEvent::Stopped,
is_seeder: ann.left == 0,
left: ann.left,
created_at: now,
updated_at: now,
},
);
if credited_uploaded_delta != 0 || credited_downloaded_delta != 0 {
arc.user_updates.lock().upsert(
user_update::Index { user_id },

View File

@@ -1,4 +1,5 @@
use arcadia_shared::tracker::models::{
peer_update::{self, PeerUpdate},
torrent_update::{self, TorrentUpdate},
user_update::{self, UserUpdate},
Queue,
@@ -26,6 +27,7 @@ pub struct Tracker {
pub torrents: Mutex<arcadia_shared::tracker::models::torrent::Map>,
pub user_updates: Mutex<Queue<user_update::Index, UserUpdate>>,
pub torrent_updates: Mutex<Queue<torrent_update::Index, TorrentUpdate>>,
pub peer_updates: Mutex<Queue<peer_update::Index, PeerUpdate>>,
}
impl Deref for Tracker {
@@ -74,6 +76,7 @@ impl Tracker {
torrents: Mutex::new(torrents),
user_updates: Mutex::new(Queue::<user_update::Index, UserUpdate>::default()),
torrent_updates: Mutex::new(Queue::<torrent_update::Index, TorrentUpdate>::default()),
peer_updates: Mutex::new(Queue::<peer_update::Index, PeerUpdate>::default()),
}
}
}

View File

@@ -30,7 +30,8 @@ pub async fn handle(arc: &Arc<Tracker>) {
pub async fn flush(arc: &Arc<Tracker>) {
join!(
arc.user_updates.flush_to_backend(),
arc.torrent_updates.flush_to_backend()
arc.torrent_updates.flush_to_backend(),
arc.peer_updates.flush_to_backend()
);
}