mirror of
https://github.com/Arcadia-Solutions/arcadia.git
synced 2025-12-16 23:14:15 -06:00
feat: run sql queries from the tracker directly, not the backend
This commit is contained in:
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -569,6 +569,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"bincode",
|
||||
"chrono 0.4.41",
|
||||
"futures-util",
|
||||
"indexmap",
|
||||
"log",
|
||||
"parking_lot",
|
||||
@@ -592,7 +593,6 @@ dependencies = [
|
||||
"chrono 0.4.41",
|
||||
"deadpool",
|
||||
"deadpool-redis",
|
||||
"futures-util",
|
||||
"indexmap",
|
||||
"musicbrainz_rs",
|
||||
"rand 0.9.2",
|
||||
@@ -626,6 +626,7 @@ dependencies = [
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_bencode",
|
||||
"sqlx",
|
||||
"strum",
|
||||
"thiserror 2.0.16",
|
||||
"tokio",
|
||||
@@ -3619,7 +3620,6 @@ dependencies = [
|
||||
"ipnetwork",
|
||||
"log",
|
||||
"memchr",
|
||||
"native-tls",
|
||||
"once_cell",
|
||||
"percent-encoding",
|
||||
"serde",
|
||||
|
||||
@@ -46,5 +46,5 @@ arcadia-shared = { path = "../../shared" }
|
||||
[dev-dependencies]
|
||||
actix-multipart-rfc7578 = "0.11.0"
|
||||
bytes = "1.10.1"
|
||||
sqlx = { version = "0.8", features = [ "runtime-tokio", "tls-native-tls", "postgres", "chrono", "ipnetwork" ] }
|
||||
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "chrono", "ipnetwork" ] }
|
||||
redis = "0.32.5"
|
||||
|
||||
@@ -1,24 +0,0 @@
|
||||
use crate::Arcadia;
|
||||
use actix_web::{
|
||||
web::{Bytes, Data},
|
||||
HttpResponse,
|
||||
};
|
||||
use arcadia_shared::{
|
||||
error::{BackendError, Result},
|
||||
tracker::models::peer_update::Index,
|
||||
};
|
||||
use arcadia_storage::redis::RedisPoolInterface;
|
||||
use bincode::config;
|
||||
|
||||
pub async fn exec<R: RedisPoolInterface + 'static>(
|
||||
arc: Data<Arcadia<R>>,
|
||||
bytes: Bytes,
|
||||
) -> Result<HttpResponse> {
|
||||
let config = config::standard();
|
||||
let (peers, _): (Vec<Index>, usize) = bincode::decode_from_slice(&bytes, config)
|
||||
.map_err(|e| BackendError::DecodingError(e.to_string()))?;
|
||||
|
||||
arc.pool.bulk_delete_peers(&peers).await?;
|
||||
|
||||
Ok(HttpResponse::Ok().body(""))
|
||||
}
|
||||
@@ -1,9 +0,0 @@
|
||||
use crate::{handlers::tracker::binary_response, Arcadia};
|
||||
use actix_web::{web::Data, HttpResponse};
|
||||
use arcadia_common::error::Result;
|
||||
use arcadia_storage::redis::RedisPoolInterface;
|
||||
|
||||
pub async fn exec<R: RedisPoolInterface + 'static>(arc: Data<Arcadia<R>>) -> Result<HttpResponse> {
|
||||
let infohashes_2_ids = arc.pool.find_infohashes_2_ids().await?;
|
||||
binary_response(&infohashes_2_ids)
|
||||
}
|
||||
@@ -1,9 +0,0 @@
|
||||
use crate::{handlers::tracker::binary_response, Arcadia};
|
||||
use actix_web::{web::Data, HttpResponse};
|
||||
use arcadia_common::error::Result;
|
||||
use arcadia_storage::redis::RedisPoolInterface;
|
||||
|
||||
pub async fn exec<R: RedisPoolInterface + 'static>(arc: Data<Arcadia<R>>) -> Result<HttpResponse> {
|
||||
let passkeys_2_ids = arc.pool.find_passkeys_2_ids().await?;
|
||||
binary_response(&passkeys_2_ids)
|
||||
}
|
||||
@@ -1,9 +0,0 @@
|
||||
use crate::{handlers::tracker::binary_response, Arcadia};
|
||||
use actix_web::{web::Data, HttpResponse};
|
||||
use arcadia_common::error::Result;
|
||||
use arcadia_storage::redis::RedisPoolInterface;
|
||||
|
||||
pub async fn exec<R: RedisPoolInterface + 'static>(arc: Data<Arcadia<R>>) -> Result<HttpResponse> {
|
||||
let torrents = arc.pool.find_torrents().await?;
|
||||
binary_response(&torrents)
|
||||
}
|
||||
@@ -1,9 +0,0 @@
|
||||
use crate::{handlers::tracker::binary_response, Arcadia};
|
||||
use actix_web::{web::Data, HttpResponse};
|
||||
use arcadia_common::error::Result;
|
||||
use arcadia_storage::redis::RedisPoolInterface;
|
||||
|
||||
pub async fn exec<R: RedisPoolInterface + 'static>(arc: Data<Arcadia<R>>) -> Result<HttpResponse> {
|
||||
let users = arc.pool.find_users().await?;
|
||||
binary_response(&users)
|
||||
}
|
||||
@@ -1,15 +1,7 @@
|
||||
pub mod delete_peers;
|
||||
pub mod get_env;
|
||||
pub mod get_infohash_2_id;
|
||||
pub mod get_passkey_2_id;
|
||||
pub mod get_torrents;
|
||||
pub mod get_users;
|
||||
pub mod post_peer_updates;
|
||||
pub mod post_torrent_updates;
|
||||
pub mod post_user_updates;
|
||||
|
||||
use actix_web::{
|
||||
web::{delete, get, post, resource, ServiceConfig},
|
||||
web::{get, resource, ServiceConfig},
|
||||
HttpResponse,
|
||||
};
|
||||
use arcadia_common::error::Result;
|
||||
@@ -19,16 +11,6 @@ use bincode::config;
|
||||
// TODO: protect by only allowing requests from tracker's ip
|
||||
pub fn config<R: RedisPoolInterface + 'static>(cfg: &mut ServiceConfig) {
|
||||
cfg.service(resource("/env").route(get().to(self::get_env::exec::<R>)));
|
||||
cfg.service(resource("/users").route(get().to(self::get_users::exec::<R>)));
|
||||
cfg.service(resource("/torrents").route(get().to(self::get_torrents::exec::<R>)));
|
||||
cfg.service(resource("/passkeys-2-ids").route(get().to(self::get_passkey_2_id::exec::<R>)));
|
||||
cfg.service(resource("/infohashes-2-ids").route(get().to(self::get_infohash_2_id::exec::<R>)));
|
||||
cfg.service(resource("/user-updates").route(post().to(self::post_user_updates::exec::<R>)));
|
||||
cfg.service(
|
||||
resource("/torrent-updates").route(post().to(self::post_torrent_updates::exec::<R>)),
|
||||
);
|
||||
cfg.service(resource("/peer-updates").route(post().to(self::post_peer_updates::exec::<R>)));
|
||||
cfg.service(resource("/peers").route(delete().to(self::delete_peers::exec::<R>)));
|
||||
}
|
||||
|
||||
fn binary_response<T: bincode::Encode>(value: &T) -> Result<HttpResponse> {
|
||||
|
||||
@@ -1,25 +0,0 @@
|
||||
use crate::Arcadia;
|
||||
use actix_web::{
|
||||
web::{Bytes, Data},
|
||||
HttpResponse,
|
||||
};
|
||||
use arcadia_shared::{
|
||||
error::{BackendError, Result},
|
||||
tracker::models::peer_update::{Index, PeerUpdate},
|
||||
};
|
||||
use arcadia_storage::redis::RedisPoolInterface;
|
||||
use bincode::config;
|
||||
|
||||
pub async fn exec<R: RedisPoolInterface + 'static>(
|
||||
arc: Data<Arcadia<R>>,
|
||||
bytes: Bytes,
|
||||
) -> Result<HttpResponse> {
|
||||
let config = config::standard();
|
||||
let (updates, _): (Vec<(Index, PeerUpdate)>, usize) =
|
||||
bincode::decode_from_slice(&bytes, config)
|
||||
.map_err(|e| BackendError::DecodingError(e.to_string()))?;
|
||||
|
||||
arc.pool.bulk_upsert_peers(&updates).await?;
|
||||
|
||||
Ok(HttpResponse::Ok().body(""))
|
||||
}
|
||||
@@ -1,25 +0,0 @@
|
||||
use crate::Arcadia;
|
||||
use actix_web::{
|
||||
web::{Bytes, Data},
|
||||
HttpResponse,
|
||||
};
|
||||
use arcadia_shared::{
|
||||
error::{BackendError, Result},
|
||||
tracker::models::torrent_update::{Index, TorrentUpdate},
|
||||
};
|
||||
use arcadia_storage::redis::RedisPoolInterface;
|
||||
use bincode::config;
|
||||
|
||||
pub async fn exec<R: RedisPoolInterface + 'static>(
|
||||
arc: Data<Arcadia<R>>,
|
||||
bytes: Bytes,
|
||||
) -> Result<HttpResponse> {
|
||||
let config = config::standard();
|
||||
let (updates, _): (Vec<(Index, TorrentUpdate)>, usize) =
|
||||
bincode::decode_from_slice(&bytes, config)
|
||||
.map_err(|e| BackendError::DecodingError(e.to_string()))?;
|
||||
|
||||
arc.pool.bulk_update_torrents(&updates).await?;
|
||||
|
||||
Ok(HttpResponse::Ok().body(""))
|
||||
}
|
||||
@@ -1,25 +0,0 @@
|
||||
use crate::Arcadia;
|
||||
use actix_web::{
|
||||
web::{Bytes, Data},
|
||||
HttpResponse,
|
||||
};
|
||||
use arcadia_shared::{
|
||||
error::{BackendError, Result},
|
||||
tracker::models::user_update::{Index, UserUpdate},
|
||||
};
|
||||
use arcadia_storage::redis::RedisPoolInterface;
|
||||
use bincode::config;
|
||||
|
||||
pub async fn exec<R: RedisPoolInterface + 'static>(
|
||||
arc: Data<Arcadia<R>>,
|
||||
bytes: Bytes,
|
||||
) -> Result<HttpResponse> {
|
||||
let config = config::standard();
|
||||
let (updates, _): (Vec<(Index, UserUpdate)>, usize) =
|
||||
bincode::decode_from_slice(&bytes, config)
|
||||
.map_err(|e| BackendError::DecodingError(e.to_string()))?;
|
||||
|
||||
arc.pool.bulk_update_users(&updates).await?;
|
||||
|
||||
Ok(HttpResponse::Ok().body(""))
|
||||
}
|
||||
@@ -1,18 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n UPDATE users\n SET\n uploaded = uploaded + updates.uploaded_delta,\n downloaded = downloaded + updates.downloaded_delta,\n real_uploaded = real_uploaded + updates.uploaded_delta,\n real_downloaded = real_downloaded + updates.downloaded_delta\n FROM (\n SELECT * FROM unnest($1::int[], $2::bigint[], $3::bigint[], $4::bigint[], $5::bigint[]) AS\n t(user_id, uploaded_delta, downloaded_delta, real_uploaded_delta, real_downloaded_delta)\n ) AS updates\n WHERE users.id = updates.user_id\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int4Array",
|
||||
"Int8Array",
|
||||
"Int8Array",
|
||||
"Int8Array",
|
||||
"Int8Array"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "2eaafbcd2d74898da894128ded1e7b5b80ace3994b244d90b29c24119a9b22aa"
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT\n id,\n upload_factor,\n download_factor,\n seeders,\n leechers,\n times_completed,\n CASE\n WHEN deleted_at IS NOT NULL THEN TRUE\n ELSE FALSE\n END AS \"is_deleted!\"\n FROM torrents\n ",
|
||||
"query": "\n SELECT\n id,\n upload_factor,\n download_factor,\n seeders,\n leechers,\n times_completed,\n CASE\n WHEN deleted_at IS NOT NULL THEN TRUE\n ELSE FALSE\n END AS \"is_deleted!\"\n FROM torrents\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
@@ -52,5 +52,5 @@
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "f21e6bc7992a3d8eae3921e783dbe037b89b874b99bc1095a35e864f23fb5064"
|
||||
"hash": "46d7eee133e0653d9f11ab67f5d6faec7050c9b4c6a8c78e2097015d3e0fb7fb"
|
||||
}
|
||||
@@ -1,17 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n UPDATE torrents\n SET\n seeders = seeders + updates.seeder_delta,\n leechers = leechers + updates.leecher_delta,\n times_completed = times_completed + updates.times_completed_delta\n FROM (\n SELECT * FROM unnest($1::int[], $2::bigint[], $3::bigint[], $4::bigint[]) AS\n t(torrent_id, seeder_delta, leecher_delta, times_completed_delta)\n ) AS updates\n WHERE torrents.id = updates.torrent_id\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int4Array",
|
||||
"Int8Array",
|
||||
"Int8Array",
|
||||
"Int8Array"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "50f86f561f9170195c40f44d1eb289bfd574e8e080b8304ffcc7755dd65becdf"
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT\n peers.ip as \"ip_address: IpAddr\",\n peers.user_id as \"user_id\",\n peers.torrent_id as \"torrent_id\",\n peers.port as \"port\",\n peers.seeder as \"is_seeder: bool\",\n peers.active as \"is_active: bool\",\n peers.updated_at as \"updated_at: DateTime<Utc>\",\n peers.uploaded as \"uploaded\",\n peers.downloaded as \"downloaded\",\n peers.peer_id as \"peer_id: PeerId\"\n FROM\n peers\n ",
|
||||
"query": "\n SELECT\n peers.ip as \"ip_address: IpAddr\",\n peers.user_id as \"user_id\",\n peers.torrent_id as \"torrent_id\",\n peers.port as \"port\",\n peers.seeder as \"is_seeder: bool\",\n peers.active as \"is_active: bool\",\n peers.updated_at as \"updated_at: DateTime<Utc>\",\n peers.uploaded as \"uploaded\",\n peers.downloaded as \"downloaded\",\n peers.peer_id as \"peer_id: PeerId\"\n FROM\n peers\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
@@ -70,5 +70,5 @@
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "01e23ac5ea61db34da6f3408ac0b2b08cff82af91117d5a961e1cb2b9e14f6d2"
|
||||
"hash": "5f5cc37d639a2f0650ac2afcdc610538f49816480be0f51c40a67bb74d97e874"
|
||||
}
|
||||
18
backend/storage/.sqlx/query-68c566af855b2cb1e46b77cd829934488b1d4da1086f74b7491d7468753f539a.json
generated
Normal file
18
backend/storage/.sqlx/query-68c566af855b2cb1e46b77cd829934488b1d4da1086f74b7491d7468753f539a.json
generated
Normal file
@@ -0,0 +1,18 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n UPDATE users\n SET\n uploaded = uploaded + updates.uploaded_delta,\n downloaded = downloaded + updates.downloaded_delta,\n real_uploaded = real_uploaded + updates.uploaded_delta,\n real_downloaded = real_downloaded + updates.downloaded_delta\n FROM (\n SELECT * FROM unnest($1::int[], $2::bigint[], $3::bigint[], $4::bigint[], $5::bigint[]) AS\n t(user_id, uploaded_delta, downloaded_delta, real_uploaded_delta, real_downloaded_delta)\n ) AS updates\n WHERE users.id = updates.user_id\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int4Array",
|
||||
"Int8Array",
|
||||
"Int8Array",
|
||||
"Int8Array",
|
||||
"Int8Array"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "68c566af855b2cb1e46b77cd829934488b1d4da1086f74b7491d7468753f539a"
|
||||
}
|
||||
@@ -1,16 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n DELETE FROM peers\n WHERE (user_id, torrent_id, peer_id) IN (\n SELECT t.user_id, t.torrent_id, t.peer_id\n FROM (\n SELECT * FROM unnest(\n $1::int[],\n $2::int[],\n $3::bytea[]\n ) AS t(user_id, torrent_id, peer_id)\n ) AS t\n )\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int4Array",
|
||||
"Int4Array",
|
||||
"ByteaArray"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "7bd4e0806009d1a428a7f8f85d7376f41280e43f408f0555792acb276d597396"
|
||||
}
|
||||
16
backend/storage/.sqlx/query-7da73662a96a68e239d011598ace3bc5b287a82c5b0c34ce9543842a1bed0ea4.json
generated
Normal file
16
backend/storage/.sqlx/query-7da73662a96a68e239d011598ace3bc5b287a82c5b0c34ce9543842a1bed0ea4.json
generated
Normal file
@@ -0,0 +1,16 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n DELETE FROM peers\n WHERE (user_id, torrent_id, peer_id) IN (\n SELECT t.user_id, t.torrent_id, t.peer_id\n FROM (\n SELECT * FROM unnest(\n $1::int[],\n $2::int[],\n $3::bytea[]\n ) AS t(user_id, torrent_id, peer_id)\n ) AS t\n )\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int4Array",
|
||||
"Int4Array",
|
||||
"ByteaArray"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "7da73662a96a68e239d011598ace3bc5b287a82c5b0c34ce9543842a1bed0ea4"
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT\n id,\n passkey as \"passkey: Passkey\",\n 0::INT AS \"num_seeding!\",\n 0::INT AS \"num_leeching!\"\n FROM users\n ",
|
||||
"query": "\n SELECT\n id,\n passkey as \"passkey: Passkey\",\n 0::INT AS \"num_seeding!\",\n 0::INT AS \"num_leeching!\"\n FROM users\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
@@ -34,5 +34,5 @@
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "2e93eb296e0102f4077a7f6eeaa2404c05a6e67325d53b2703e6c0172f2b2da1"
|
||||
"hash": "bb66b1b13123112781db47d98cd02b28d11470b7c84d7aec56ec102fee20264d"
|
||||
}
|
||||
17
backend/storage/.sqlx/query-c45f235654a1b2aa8c849c5644443fe34ea7a4dd976fe6b4405e7b4a585a1325.json
generated
Normal file
17
backend/storage/.sqlx/query-c45f235654a1b2aa8c849c5644443fe34ea7a4dd976fe6b4405e7b4a585a1325.json
generated
Normal file
@@ -0,0 +1,17 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n UPDATE torrents\n SET\n seeders = seeders + updates.seeder_delta,\n leechers = leechers + updates.leecher_delta,\n times_completed = times_completed + updates.times_completed_delta\n FROM (\n SELECT * FROM unnest($1::int[], $2::bigint[], $3::bigint[], $4::bigint[]) AS\n t(torrent_id, seeder_delta, leecher_delta, times_completed_delta)\n ) AS updates\n WHERE torrents.id = updates.torrent_id\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int4Array",
|
||||
"Int8Array",
|
||||
"Int8Array",
|
||||
"Int8Array"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "c45f235654a1b2aa8c849c5644443fe34ea7a4dd976fe6b4405e7b4a585a1325"
|
||||
}
|
||||
@@ -10,7 +10,7 @@ arcadia-common = { path = "../common"}
|
||||
argon2 = "0.5.3"
|
||||
bip_metainfo = "0.12.0"
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
sqlx = { version = "0.8", features = [ "runtime-tokio", "tls-native-tls", "postgres", "chrono", "ipnetwork" ] }
|
||||
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "chrono", "ipnetwork" ] }
|
||||
thiserror = "2.0.12"
|
||||
redis = { version = "0.32.5", features = ["tokio-comp"] }
|
||||
deadpool = { version = "0.12.2", features = ["rt_tokio_1"] }
|
||||
@@ -23,4 +23,3 @@ rand = "0.9.0"
|
||||
utoipa = { version = "5.3.1", features = ["actix_extras"] }
|
||||
indexmap = { version = "2.11.0", default-features = false, features = ["std", "serde"] }
|
||||
arcadia-shared = { path = "../../shared" }
|
||||
futures-util = "0.3.31"
|
||||
|
||||
@@ -20,7 +20,6 @@ pub mod torrent_repository;
|
||||
pub mod torrent_request_comment_repository;
|
||||
pub mod torrent_request_repository;
|
||||
pub mod torrent_request_vote_repository;
|
||||
pub mod tracker_repository;
|
||||
pub mod user_application_repository;
|
||||
pub mod user_repository;
|
||||
pub mod wiki_repository;
|
||||
|
||||
@@ -1,477 +0,0 @@
|
||||
use crate::connection_pool::ConnectionPool;
|
||||
use arcadia_common::error::Result;
|
||||
use arcadia_shared::tracker::models::{
|
||||
infohash_2_id, passkey_2_id,
|
||||
peer::{self, Peer},
|
||||
peer_id::PeerId,
|
||||
peer_update::{self, PeerUpdate},
|
||||
torrent::{self, InfoHash, Torrent},
|
||||
torrent_update::{self, TorrentUpdate},
|
||||
user::{self, Passkey, User},
|
||||
user_update::{self, UserUpdate},
|
||||
};
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures_util::TryStreamExt;
|
||||
use indexmap::IndexMap;
|
||||
use sqlx::types::ipnetwork::IpNetwork;
|
||||
use std::{borrow::Borrow, net::IpAddr};
|
||||
|
||||
// This file contains functions for Arcadia's tracker
|
||||
// but not necessarily related to the tracker itself directly
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DBImportTorrent {
|
||||
pub id: i32,
|
||||
pub upload_factor: i16,
|
||||
pub download_factor: i16,
|
||||
pub seeders: i64,
|
||||
pub leechers: i64,
|
||||
pub times_completed: i32,
|
||||
pub is_deleted: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DBImportUser {
|
||||
pub id: i32,
|
||||
pub passkey: Passkey,
|
||||
pub num_seeding: i32,
|
||||
pub num_leeching: i32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DBImportPasskey2Id {
|
||||
pub id: i32,
|
||||
pub passkey: Passkey,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DBImportInfohash2Id {
|
||||
pub id: i32,
|
||||
pub info_hash: InfoHash,
|
||||
}
|
||||
|
||||
impl ConnectionPool {
|
||||
pub async fn find_users(&self) -> Result<user::Map> {
|
||||
let rows = sqlx::query_as!(
|
||||
DBImportUser,
|
||||
r#"
|
||||
SELECT
|
||||
id,
|
||||
passkey as "passkey: Passkey",
|
||||
0::INT AS "num_seeding!",
|
||||
0::INT AS "num_leeching!"
|
||||
FROM users
|
||||
"#
|
||||
)
|
||||
.fetch_all(self.borrow())
|
||||
.await
|
||||
.expect("could not get users");
|
||||
|
||||
let mut map: user::Map = user::Map(IndexMap::with_capacity(rows.len()));
|
||||
for r in rows {
|
||||
let user = User {
|
||||
num_seeding: r.num_seeding as u32,
|
||||
num_leeching: r.num_leeching as u32,
|
||||
};
|
||||
map.insert(r.id as u32, user);
|
||||
}
|
||||
|
||||
Ok(map)
|
||||
}
|
||||
|
||||
pub async fn find_torrents(&self) -> Result<torrent::Map> {
|
||||
let rows = sqlx::query_as!(
|
||||
DBImportTorrent,
|
||||
r#"
|
||||
SELECT
|
||||
id,
|
||||
upload_factor,
|
||||
download_factor,
|
||||
seeders,
|
||||
leechers,
|
||||
times_completed,
|
||||
CASE
|
||||
WHEN deleted_at IS NOT NULL THEN TRUE
|
||||
ELSE FALSE
|
||||
END AS "is_deleted!"
|
||||
FROM torrents
|
||||
"#
|
||||
)
|
||||
.fetch_all(self.borrow())
|
||||
.await
|
||||
.expect("could not get torrents");
|
||||
|
||||
let mut map: torrent::Map = torrent::Map(IndexMap::with_capacity(rows.len()));
|
||||
for r in rows {
|
||||
let torrent = Torrent {
|
||||
upload_factor: r.upload_factor,
|
||||
download_factor: r.download_factor,
|
||||
seeders: r.seeders as u32,
|
||||
leechers: r.leechers as u32,
|
||||
times_completed: r.times_completed as u32,
|
||||
is_deleted: r.is_deleted,
|
||||
peers: peer::Map::new(),
|
||||
};
|
||||
map.insert(r.id as u32, torrent);
|
||||
}
|
||||
|
||||
// Load peers into each torrent
|
||||
let mut peers = sqlx::query!(
|
||||
r#"
|
||||
SELECT
|
||||
peers.ip as "ip_address: IpAddr",
|
||||
peers.user_id as "user_id",
|
||||
peers.torrent_id as "torrent_id",
|
||||
peers.port as "port",
|
||||
peers.seeder as "is_seeder: bool",
|
||||
peers.active as "is_active: bool",
|
||||
peers.updated_at as "updated_at: DateTime<Utc>",
|
||||
peers.uploaded as "uploaded",
|
||||
peers.downloaded as "downloaded",
|
||||
peers.peer_id as "peer_id: PeerId"
|
||||
FROM
|
||||
peers
|
||||
"#
|
||||
)
|
||||
.fetch(self.borrow());
|
||||
|
||||
while let Some(peer) = peers.try_next().await.expect("Failed loading peers.") {
|
||||
map.entry(peer.torrent_id as u32).and_modify(|torrent| {
|
||||
torrent.peers.insert(
|
||||
peer::Index {
|
||||
user_id: peer.user_id as u32,
|
||||
peer_id: peer.peer_id,
|
||||
},
|
||||
Peer {
|
||||
ip_address: peer.ip_address,
|
||||
port: peer.port as u16,
|
||||
is_seeder: peer.is_seeder,
|
||||
is_active: peer.is_active,
|
||||
has_sent_completed: false,
|
||||
updated_at: peer
|
||||
.updated_at
|
||||
.expect("Peer with a null updated_at found in database."),
|
||||
uploaded: peer.uploaded as u64,
|
||||
downloaded: peer.downloaded as u64,
|
||||
},
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
Ok(map)
|
||||
}
|
||||
|
||||
pub async fn find_passkeys_2_ids(&self) -> Result<passkey_2_id::Map> {
|
||||
let rows = sqlx::query_as!(
|
||||
DBImportPasskey2Id,
|
||||
r#"
|
||||
SELECT
|
||||
id,
|
||||
passkey as "passkey: Passkey"
|
||||
FROM users
|
||||
WHERE banned = FALSE
|
||||
"#
|
||||
)
|
||||
.fetch_all(self.borrow())
|
||||
.await
|
||||
.expect("could not get passkeys2ids");
|
||||
|
||||
let mut map: passkey_2_id::Map = passkey_2_id::Map(IndexMap::with_capacity(rows.len()));
|
||||
for r in rows {
|
||||
map.insert(r.passkey, r.id as u32);
|
||||
}
|
||||
|
||||
Ok(map)
|
||||
}
|
||||
|
||||
pub async fn find_infohashes_2_ids(&self) -> Result<infohash_2_id::Map> {
|
||||
let rows = sqlx::query_as!(
|
||||
DBImportInfohash2Id,
|
||||
r#"
|
||||
SELECT
|
||||
id,
|
||||
info_hash as "info_hash: InfoHash"
|
||||
FROM torrents
|
||||
"#
|
||||
)
|
||||
.fetch_all(self.borrow())
|
||||
.await
|
||||
.expect("could not get infohashes2ids");
|
||||
|
||||
let mut map: infohash_2_id::Map = infohash_2_id::Map(IndexMap::with_capacity(rows.len()));
|
||||
for r in rows {
|
||||
map.insert(r.info_hash, r.id as u32);
|
||||
}
|
||||
|
||||
Ok(map)
|
||||
}
|
||||
|
||||
pub async fn bulk_update_users(
|
||||
&self,
|
||||
updates: &Vec<(user_update::Index, UserUpdate)>,
|
||||
) -> arcadia_shared::error::Result<()> {
|
||||
if updates.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut user_ids = Vec::new();
|
||||
let mut uploaded_deltas = Vec::new();
|
||||
let mut downloaded_deltas = Vec::new();
|
||||
let mut real_uploaded_deltas = Vec::new();
|
||||
let mut real_downloaded_deltas = Vec::new();
|
||||
|
||||
for (index, update) in updates {
|
||||
user_ids.push(index.user_id as i32);
|
||||
uploaded_deltas.push(update.uploaded_delta as i64);
|
||||
downloaded_deltas.push(update.downloaded_delta as i64);
|
||||
real_uploaded_deltas.push(update.real_uploaded_delta as i64);
|
||||
real_downloaded_deltas.push(update.real_downloaded_delta as i64);
|
||||
}
|
||||
|
||||
let _ = sqlx::query!(
|
||||
r#"
|
||||
UPDATE users
|
||||
SET
|
||||
uploaded = uploaded + updates.uploaded_delta,
|
||||
downloaded = downloaded + updates.downloaded_delta,
|
||||
real_uploaded = real_uploaded + updates.uploaded_delta,
|
||||
real_downloaded = real_downloaded + updates.downloaded_delta
|
||||
FROM (
|
||||
SELECT * FROM unnest($1::int[], $2::bigint[], $3::bigint[], $4::bigint[], $5::bigint[]) AS
|
||||
t(user_id, uploaded_delta, downloaded_delta, real_uploaded_delta, real_downloaded_delta)
|
||||
) AS updates
|
||||
WHERE users.id = updates.user_id
|
||||
"#,
|
||||
&user_ids,
|
||||
&uploaded_deltas,
|
||||
&downloaded_deltas,
|
||||
&real_uploaded_deltas,
|
||||
&real_downloaded_deltas
|
||||
)
|
||||
.execute(self.borrow())
|
||||
.await.map_err(|e|arcadia_shared::error::BackendError::DatabseError(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn bulk_update_torrents(
|
||||
&self,
|
||||
updates: &Vec<(torrent_update::Index, TorrentUpdate)>,
|
||||
) -> arcadia_shared::error::Result<()> {
|
||||
if updates.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut torrent_ids = Vec::new();
|
||||
let mut seeder_deltas = Vec::new();
|
||||
let mut leecher_deltas = Vec::new();
|
||||
let mut times_completed_deltas = Vec::new();
|
||||
|
||||
for (index, update) in updates {
|
||||
torrent_ids.push(index.torrent_id as i32);
|
||||
seeder_deltas.push(update.seeder_delta as i64);
|
||||
leecher_deltas.push(update.leecher_delta as i64);
|
||||
times_completed_deltas.push(update.times_completed_delta as i64);
|
||||
}
|
||||
|
||||
let _ = sqlx::query!(
|
||||
r#"
|
||||
UPDATE torrents
|
||||
SET
|
||||
seeders = seeders + updates.seeder_delta,
|
||||
leechers = leechers + updates.leecher_delta,
|
||||
times_completed = times_completed + updates.times_completed_delta
|
||||
FROM (
|
||||
SELECT * FROM unnest($1::int[], $2::bigint[], $3::bigint[], $4::bigint[]) AS
|
||||
t(torrent_id, seeder_delta, leecher_delta, times_completed_delta)
|
||||
) AS updates
|
||||
WHERE torrents.id = updates.torrent_id
|
||||
"#,
|
||||
&torrent_ids,
|
||||
&seeder_deltas,
|
||||
&leecher_deltas,
|
||||
×_completed_deltas,
|
||||
)
|
||||
.execute(self.borrow())
|
||||
.await
|
||||
.map_err(|e| arcadia_shared::error::BackendError::DatabseError(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn bulk_upsert_peers(
|
||||
&self,
|
||||
updates: &Vec<(peer_update::Index, PeerUpdate)>,
|
||||
) -> arcadia_shared::error::Result<u64> {
|
||||
if updates.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let mut user_ids: Vec<i32> = Vec::with_capacity(updates.len());
|
||||
let mut torrent_ids: Vec<i32> = Vec::with_capacity(updates.len());
|
||||
let mut peer_ids: Vec<Vec<u8>> = Vec::with_capacity(updates.len());
|
||||
let mut ips: Vec<IpNetwork> = Vec::with_capacity(updates.len());
|
||||
let mut ports: Vec<i32> = Vec::with_capacity(updates.len());
|
||||
let mut agents: Vec<String> = Vec::with_capacity(updates.len());
|
||||
let mut uploadeds: Vec<i64> = Vec::with_capacity(updates.len());
|
||||
let mut downloadeds: Vec<i64> = Vec::with_capacity(updates.len());
|
||||
let mut lefts: Vec<i64> = Vec::with_capacity(updates.len());
|
||||
let mut actives: Vec<bool> = Vec::with_capacity(updates.len());
|
||||
let mut seeders: Vec<bool> = Vec::with_capacity(updates.len());
|
||||
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(updates.len());
|
||||
let mut updated_ats: Vec<DateTime<Utc>> = Vec::with_capacity(updates.len());
|
||||
|
||||
for (index, update) in updates {
|
||||
user_ids.push(index.user_id as i32);
|
||||
torrent_ids.push(index.torrent_id as i32);
|
||||
peer_ids.push(index.peer_id.to_vec());
|
||||
ips.push(IpNetwork::from(update.ip));
|
||||
ports.push(update.port as i32);
|
||||
agents.push(update.agent.clone());
|
||||
uploadeds.push(update.uploaded as i64);
|
||||
downloadeds.push(update.downloaded as i64);
|
||||
lefts.push(update.left as i64);
|
||||
actives.push(update.is_active);
|
||||
seeders.push(update.is_seeder);
|
||||
created_ats.push(update.created_at);
|
||||
updated_ats.push(update.updated_at);
|
||||
}
|
||||
|
||||
let result = sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO peers (
|
||||
peer_id,
|
||||
ip,
|
||||
port,
|
||||
agent,
|
||||
uploaded,
|
||||
downloaded,
|
||||
"left",
|
||||
active,
|
||||
seeder,
|
||||
created_at,
|
||||
updated_at,
|
||||
torrent_id,
|
||||
user_id
|
||||
)
|
||||
SELECT
|
||||
t.peer_id,
|
||||
t.ip,
|
||||
t.port,
|
||||
t.agent,
|
||||
t.uploaded,
|
||||
t.downloaded,
|
||||
t."left",
|
||||
t.active,
|
||||
t.seeder,
|
||||
-- stored as timestamp without time zone in DB
|
||||
(t.created_at AT TIME ZONE 'UTC')::timestamp,
|
||||
(t.updated_at AT TIME ZONE 'UTC')::timestamp,
|
||||
t.torrent_id,
|
||||
t.user_id
|
||||
FROM (
|
||||
SELECT * FROM unnest(
|
||||
$1::bytea[],
|
||||
$2::inet[],
|
||||
$3::int[],
|
||||
$4::varchar[],
|
||||
$5::bigint[],
|
||||
$6::bigint[],
|
||||
$7::bigint[],
|
||||
$8::boolean[],
|
||||
$9::boolean[],
|
||||
$10::timestamptz[],
|
||||
$11::timestamptz[],
|
||||
$12::int[],
|
||||
$13::int[]
|
||||
) AS t(
|
||||
peer_id,
|
||||
ip,
|
||||
port,
|
||||
agent,
|
||||
uploaded,
|
||||
downloaded,
|
||||
"left",
|
||||
active,
|
||||
seeder,
|
||||
created_at,
|
||||
updated_at,
|
||||
torrent_id,
|
||||
user_id
|
||||
)
|
||||
) AS t
|
||||
ON CONFLICT (user_id, torrent_id, peer_id) DO UPDATE SET
|
||||
ip = EXCLUDED.ip,
|
||||
port = EXCLUDED.port,
|
||||
agent = EXCLUDED.agent,
|
||||
uploaded = EXCLUDED.uploaded,
|
||||
downloaded = EXCLUDED.downloaded,
|
||||
"left" = EXCLUDED."left",
|
||||
active = EXCLUDED.active,
|
||||
seeder = EXCLUDED.seeder,
|
||||
updated_at = EXCLUDED.updated_at
|
||||
"#,
|
||||
&peer_ids,
|
||||
&ips,
|
||||
&ports,
|
||||
&agents,
|
||||
&uploadeds,
|
||||
&downloadeds,
|
||||
&lefts,
|
||||
&actives,
|
||||
&seeders,
|
||||
&created_ats,
|
||||
&updated_ats,
|
||||
&torrent_ids,
|
||||
&user_ids
|
||||
)
|
||||
.execute(self.borrow())
|
||||
.await
|
||||
.map_err(|e| arcadia_shared::error::BackendError::DatabseError(e.to_string()))?;
|
||||
|
||||
Ok(result.rows_affected())
|
||||
}
|
||||
|
||||
pub async fn bulk_delete_peers(
|
||||
&self,
|
||||
peers: &Vec<peer_update::Index>,
|
||||
) -> arcadia_shared::error::Result<()> {
|
||||
if peers.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut user_ids: Vec<i32> = Vec::with_capacity(peers.len());
|
||||
let mut torrent_ids: Vec<i32> = Vec::with_capacity(peers.len());
|
||||
let mut peer_ids: Vec<Vec<u8>> = Vec::with_capacity(peers.len());
|
||||
|
||||
for index in peers {
|
||||
user_ids.push(index.user_id as i32);
|
||||
torrent_ids.push(index.torrent_id as i32);
|
||||
peer_ids.push(index.peer_id.to_vec());
|
||||
}
|
||||
|
||||
let _ = sqlx::query!(
|
||||
r#"
|
||||
DELETE FROM peers
|
||||
WHERE (user_id, torrent_id, peer_id) IN (
|
||||
SELECT t.user_id, t.torrent_id, t.peer_id
|
||||
FROM (
|
||||
SELECT * FROM unnest(
|
||||
$1::int[],
|
||||
$2::int[],
|
||||
$3::bytea[]
|
||||
) AS t(user_id, torrent_id, peer_id)
|
||||
) AS t
|
||||
)
|
||||
"#,
|
||||
&user_ids,
|
||||
&torrent_ids,
|
||||
&peer_ids
|
||||
)
|
||||
.execute(self.borrow())
|
||||
.await
|
||||
.map_err(|e| arcadia_shared::error::BackendError::DatabseError(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
1
shared/.env.example
Normal file
1
shared/.env.example
Normal file
@@ -0,0 +1 @@
|
||||
DATABASE_URL=postgresql://arcadia:password@localhost:4321/arcadia
|
||||
56
shared/.sqlx/query-46d7eee133e0653d9f11ab67f5d6faec7050c9b4c6a8c78e2097015d3e0fb7fb.json
generated
Normal file
56
shared/.sqlx/query-46d7eee133e0653d9f11ab67f5d6faec7050c9b4c6a8c78e2097015d3e0fb7fb.json
generated
Normal file
@@ -0,0 +1,56 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT\n id,\n upload_factor,\n download_factor,\n seeders,\n leechers,\n times_completed,\n CASE\n WHEN deleted_at IS NOT NULL THEN TRUE\n ELSE FALSE\n END AS \"is_deleted!\"\n FROM torrents\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "id",
|
||||
"type_info": "Int4"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "upload_factor",
|
||||
"type_info": "Int2"
|
||||
},
|
||||
{
|
||||
"ordinal": 2,
|
||||
"name": "download_factor",
|
||||
"type_info": "Int2"
|
||||
},
|
||||
{
|
||||
"ordinal": 3,
|
||||
"name": "seeders",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 4,
|
||||
"name": "leechers",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 5,
|
||||
"name": "times_completed",
|
||||
"type_info": "Int4"
|
||||
},
|
||||
{
|
||||
"ordinal": 6,
|
||||
"name": "is_deleted!",
|
||||
"type_info": "Bool"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": []
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "46d7eee133e0653d9f11ab67f5d6faec7050c9b4c6a8c78e2097015d3e0fb7fb"
|
||||
}
|
||||
26
shared/.sqlx/query-4edda78ffd766d9ec15eb015fe5b985755924b0f0b44d5cf9411059cfbc5c757.json
generated
Normal file
26
shared/.sqlx/query-4edda78ffd766d9ec15eb015fe5b985755924b0f0b44d5cf9411059cfbc5c757.json
generated
Normal file
@@ -0,0 +1,26 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT\n id,\n passkey as \"passkey: Passkey\"\n FROM users\n WHERE banned = FALSE\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "id",
|
||||
"type_info": "Int4"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "passkey: Passkey",
|
||||
"type_info": "Varchar"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": []
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "4edda78ffd766d9ec15eb015fe5b985755924b0f0b44d5cf9411059cfbc5c757"
|
||||
}
|
||||
26
shared/.sqlx/query-599587c7ce69b090843274603171c411af859ae256fc01eaf66af2aa2a922900.json
generated
Normal file
26
shared/.sqlx/query-599587c7ce69b090843274603171c411af859ae256fc01eaf66af2aa2a922900.json
generated
Normal file
@@ -0,0 +1,26 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n INSERT INTO peers (\n peer_id,\n ip,\n port,\n agent,\n uploaded,\n downloaded,\n \"left\",\n active,\n seeder,\n created_at,\n updated_at,\n torrent_id,\n user_id\n )\n SELECT\n t.peer_id,\n t.ip,\n t.port,\n t.agent,\n t.uploaded,\n t.downloaded,\n t.\"left\",\n t.active,\n t.seeder,\n -- stored as timestamp without time zone in DB\n (t.created_at AT TIME ZONE 'UTC')::timestamp,\n (t.updated_at AT TIME ZONE 'UTC')::timestamp,\n t.torrent_id,\n t.user_id\n FROM (\n SELECT * FROM unnest(\n $1::bytea[],\n $2::inet[],\n $3::int[],\n $4::varchar[],\n $5::bigint[],\n $6::bigint[],\n $7::bigint[],\n $8::boolean[],\n $9::boolean[],\n $10::timestamptz[],\n $11::timestamptz[],\n $12::int[],\n $13::int[]\n ) AS t(\n peer_id,\n ip,\n port,\n agent,\n uploaded,\n downloaded,\n \"left\",\n active,\n seeder,\n created_at,\n updated_at,\n torrent_id,\n user_id\n )\n ) AS t\n ON CONFLICT (user_id, torrent_id, peer_id) DO UPDATE SET\n ip = EXCLUDED.ip,\n port = EXCLUDED.port,\n agent = EXCLUDED.agent,\n uploaded = EXCLUDED.uploaded,\n downloaded = EXCLUDED.downloaded,\n \"left\" = EXCLUDED.\"left\",\n active = EXCLUDED.active,\n seeder = EXCLUDED.seeder,\n updated_at = EXCLUDED.updated_at\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"ByteaArray",
|
||||
"InetArray",
|
||||
"Int4Array",
|
||||
"VarcharArray",
|
||||
"Int8Array",
|
||||
"Int8Array",
|
||||
"Int8Array",
|
||||
"BoolArray",
|
||||
"BoolArray",
|
||||
"TimestamptzArray",
|
||||
"TimestamptzArray",
|
||||
"Int4Array",
|
||||
"Int4Array"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "599587c7ce69b090843274603171c411af859ae256fc01eaf66af2aa2a922900"
|
||||
}
|
||||
74
shared/.sqlx/query-5f5cc37d639a2f0650ac2afcdc610538f49816480be0f51c40a67bb74d97e874.json
generated
Normal file
74
shared/.sqlx/query-5f5cc37d639a2f0650ac2afcdc610538f49816480be0f51c40a67bb74d97e874.json
generated
Normal file
@@ -0,0 +1,74 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT\n peers.ip as \"ip_address: IpAddr\",\n peers.user_id as \"user_id\",\n peers.torrent_id as \"torrent_id\",\n peers.port as \"port\",\n peers.seeder as \"is_seeder: bool\",\n peers.active as \"is_active: bool\",\n peers.updated_at as \"updated_at: DateTime<Utc>\",\n peers.uploaded as \"uploaded\",\n peers.downloaded as \"downloaded\",\n peers.peer_id as \"peer_id: PeerId\"\n FROM\n peers\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "ip_address: IpAddr",
|
||||
"type_info": "Inet"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "user_id",
|
||||
"type_info": "Int4"
|
||||
},
|
||||
{
|
||||
"ordinal": 2,
|
||||
"name": "torrent_id",
|
||||
"type_info": "Int4"
|
||||
},
|
||||
{
|
||||
"ordinal": 3,
|
||||
"name": "port",
|
||||
"type_info": "Int2"
|
||||
},
|
||||
{
|
||||
"ordinal": 4,
|
||||
"name": "is_seeder: bool",
|
||||
"type_info": "Bool"
|
||||
},
|
||||
{
|
||||
"ordinal": 5,
|
||||
"name": "is_active: bool",
|
||||
"type_info": "Bool"
|
||||
},
|
||||
{
|
||||
"ordinal": 6,
|
||||
"name": "updated_at: DateTime<Utc>",
|
||||
"type_info": "Timestamp"
|
||||
},
|
||||
{
|
||||
"ordinal": 7,
|
||||
"name": "uploaded",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 8,
|
||||
"name": "downloaded",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 9,
|
||||
"name": "peer_id: PeerId",
|
||||
"type_info": "Bytea"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": []
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "5f5cc37d639a2f0650ac2afcdc610538f49816480be0f51c40a67bb74d97e874"
|
||||
}
|
||||
18
shared/.sqlx/query-68c566af855b2cb1e46b77cd829934488b1d4da1086f74b7491d7468753f539a.json
generated
Normal file
18
shared/.sqlx/query-68c566af855b2cb1e46b77cd829934488b1d4da1086f74b7491d7468753f539a.json
generated
Normal file
@@ -0,0 +1,18 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n UPDATE users\n SET\n uploaded = uploaded + updates.uploaded_delta,\n downloaded = downloaded + updates.downloaded_delta,\n real_uploaded = real_uploaded + updates.uploaded_delta,\n real_downloaded = real_downloaded + updates.downloaded_delta\n FROM (\n SELECT * FROM unnest($1::int[], $2::bigint[], $3::bigint[], $4::bigint[], $5::bigint[]) AS\n t(user_id, uploaded_delta, downloaded_delta, real_uploaded_delta, real_downloaded_delta)\n ) AS updates\n WHERE users.id = updates.user_id\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int4Array",
|
||||
"Int8Array",
|
||||
"Int8Array",
|
||||
"Int8Array",
|
||||
"Int8Array"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "68c566af855b2cb1e46b77cd829934488b1d4da1086f74b7491d7468753f539a"
|
||||
}
|
||||
16
shared/.sqlx/query-7da73662a96a68e239d011598ace3bc5b287a82c5b0c34ce9543842a1bed0ea4.json
generated
Normal file
16
shared/.sqlx/query-7da73662a96a68e239d011598ace3bc5b287a82c5b0c34ce9543842a1bed0ea4.json
generated
Normal file
@@ -0,0 +1,16 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n DELETE FROM peers\n WHERE (user_id, torrent_id, peer_id) IN (\n SELECT t.user_id, t.torrent_id, t.peer_id\n FROM (\n SELECT * FROM unnest(\n $1::int[],\n $2::int[],\n $3::bytea[]\n ) AS t(user_id, torrent_id, peer_id)\n ) AS t\n )\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int4Array",
|
||||
"Int4Array",
|
||||
"ByteaArray"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "7da73662a96a68e239d011598ace3bc5b287a82c5b0c34ce9543842a1bed0ea4"
|
||||
}
|
||||
38
shared/.sqlx/query-bb66b1b13123112781db47d98cd02b28d11470b7c84d7aec56ec102fee20264d.json
generated
Normal file
38
shared/.sqlx/query-bb66b1b13123112781db47d98cd02b28d11470b7c84d7aec56ec102fee20264d.json
generated
Normal file
@@ -0,0 +1,38 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT\n id,\n passkey as \"passkey: Passkey\",\n 0::INT AS \"num_seeding!\",\n 0::INT AS \"num_leeching!\"\n FROM users\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "id",
|
||||
"type_info": "Int4"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "passkey: Passkey",
|
||||
"type_info": "Varchar"
|
||||
},
|
||||
{
|
||||
"ordinal": 2,
|
||||
"name": "num_seeding!",
|
||||
"type_info": "Int4"
|
||||
},
|
||||
{
|
||||
"ordinal": 3,
|
||||
"name": "num_leeching!",
|
||||
"type_info": "Int4"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": []
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false,
|
||||
null,
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "bb66b1b13123112781db47d98cd02b28d11470b7c84d7aec56ec102fee20264d"
|
||||
}
|
||||
17
shared/.sqlx/query-c45f235654a1b2aa8c849c5644443fe34ea7a4dd976fe6b4405e7b4a585a1325.json
generated
Normal file
17
shared/.sqlx/query-c45f235654a1b2aa8c849c5644443fe34ea7a4dd976fe6b4405e7b4a585a1325.json
generated
Normal file
@@ -0,0 +1,17 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n UPDATE torrents\n SET\n seeders = seeders + updates.seeder_delta,\n leechers = leechers + updates.leecher_delta,\n times_completed = times_completed + updates.times_completed_delta\n FROM (\n SELECT * FROM unnest($1::int[], $2::bigint[], $3::bigint[], $4::bigint[]) AS\n t(torrent_id, seeder_delta, leecher_delta, times_completed_delta)\n ) AS updates\n WHERE torrents.id = updates.torrent_id\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int4Array",
|
||||
"Int8Array",
|
||||
"Int8Array",
|
||||
"Int8Array"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "c45f235654a1b2aa8c849c5644443fe34ea7a4dd976fe6b4405e7b4a585a1325"
|
||||
}
|
||||
26
shared/.sqlx/query-d94c7cf9c02a4f060345d02ac4bd2434069fc46d43e6f3e7e3618737c2dcd547.json
generated
Normal file
26
shared/.sqlx/query-d94c7cf9c02a4f060345d02ac4bd2434069fc46d43e6f3e7e3618737c2dcd547.json
generated
Normal file
@@ -0,0 +1,26 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT\n id,\n info_hash as \"info_hash: InfoHash\"\n FROM torrents\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "id",
|
||||
"type_info": "Int4"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "info_hash: InfoHash",
|
||||
"type_info": "Bytea"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": []
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "d94c7cf9c02a4f060345d02ac4bd2434069fc46d43e6f3e7e3618737c2dcd547"
|
||||
}
|
||||
@@ -7,7 +7,7 @@ edition = "2024"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
indexmap = { version = "2.11.0", default-features = false, features = ["std", "serde"] }
|
||||
anyhow = { version = "1.0.99", default-features = true, features = ["std"] }
|
||||
sqlx = { version = "0.8", features = [ "runtime-tokio", "tls-native-tls", "postgres", "chrono", "ipnetwork" ] }
|
||||
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "chrono", "ipnetwork" ] }
|
||||
bincode = { version = "2.0.1", features = ["serde"] }
|
||||
reqwest = { version = "0.12", default-features = false, features = ["blocking", "rustls-tls"] }
|
||||
thiserror = { version = "2.0.16", default-features = false }
|
||||
@@ -16,3 +16,4 @@ ringmap = { version = "0.2.0", features = ["serde"] }
|
||||
actix-web = "4"
|
||||
log = "0.4"
|
||||
parking_lot = "0.12.4"
|
||||
futures-util = "0.3.31"
|
||||
|
||||
@@ -7,16 +7,16 @@ pub enum DecodeError {
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum BackendError {
|
||||
pub enum Error {
|
||||
#[error("Database error: {0}")]
|
||||
DatabseError(String),
|
||||
#[error("Decoding error: {0}")]
|
||||
DecodingError(String),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, BackendError>;
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl actix_web::ResponseError for BackendError {
|
||||
impl actix_web::ResponseError for Error {
|
||||
#[inline]
|
||||
fn status_code(&self) -> actix_web::http::StatusCode {
|
||||
use actix_web::http::StatusCode;
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
use crate::tracker::models::torrent::InfoHash;
|
||||
use bincode::config;
|
||||
use indexmap::IndexMap;
|
||||
use reqwest::Client;
|
||||
use sqlx::PgPool;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
#[derive(Debug, bincode::Encode, bincode::Decode)]
|
||||
pub struct Map(#[bincode(with_serde)] pub IndexMap<InfoHash, u32>);
|
||||
#[derive(Debug)]
|
||||
pub struct Map(pub IndexMap<InfoHash, u32>);
|
||||
|
||||
impl Deref for Map {
|
||||
type Target = IndexMap<InfoHash, u32>;
|
||||
@@ -21,27 +20,31 @@ impl DerefMut for Map {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DBImportInfohash2Id {
|
||||
pub id: i32,
|
||||
pub info_hash: InfoHash,
|
||||
}
|
||||
|
||||
impl Map {
|
||||
pub async fn from_backend() -> Self {
|
||||
let base_url =
|
||||
std::env::var("ARCADIA_API_BASE_URL").expect("env var ARCADIA_API_BASE_URL not set");
|
||||
let url = format!("{}/api/tracker/infohashes-2-ids", base_url);
|
||||
pub async fn from_database(db: &PgPool) -> Self {
|
||||
let rows = sqlx::query_as!(
|
||||
DBImportInfohash2Id,
|
||||
r#"
|
||||
SELECT
|
||||
id,
|
||||
info_hash as "info_hash: InfoHash"
|
||||
FROM torrents
|
||||
"#
|
||||
)
|
||||
.fetch_all(db)
|
||||
.await
|
||||
.expect("could not get infohashes2ids");
|
||||
|
||||
let client = Client::new();
|
||||
let api_key = std::env::var("API_KEY").expect("env var API_KEY not set");
|
||||
let resp = client
|
||||
.get(url)
|
||||
.header("api_key", api_key)
|
||||
.send()
|
||||
.await
|
||||
.expect("failed to fetch infohashes to ids");
|
||||
let bytes = resp
|
||||
.bytes()
|
||||
.await
|
||||
.expect("failed to read infohashes to ids response body");
|
||||
|
||||
let config = config::standard();
|
||||
let (map, _): (Map, usize) = bincode::decode_from_slice(&bytes[..], config).unwrap();
|
||||
let mut map: Map = Map(IndexMap::with_capacity(rows.len()));
|
||||
for r in rows {
|
||||
map.insert(r.info_hash, r.id as u32);
|
||||
}
|
||||
|
||||
map
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use ringmap::RingMap;
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use sqlx::PgPool;
|
||||
use std::hash::Hash;
|
||||
|
||||
pub mod env;
|
||||
@@ -32,7 +33,7 @@ pub trait Mergeable {
|
||||
#[allow(async_fn_in_trait)]
|
||||
pub trait Flushable<T> {
|
||||
/// Flushes updates to postgresql database
|
||||
async fn flush_to_backend(&self);
|
||||
async fn flush_to_database(&self, db: &PgPool);
|
||||
}
|
||||
|
||||
impl<K, V> Queue<K, V>
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
use crate::tracker::models::user::Passkey;
|
||||
use bincode::config;
|
||||
use indexmap::IndexMap;
|
||||
use reqwest::Client;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
#[derive(Debug, bincode::Encode, bincode::Decode)]
|
||||
pub struct Map(#[bincode(with_serde)] pub IndexMap<Passkey, u32>);
|
||||
#[derive(Debug)]
|
||||
pub struct Map(pub IndexMap<Passkey, u32>);
|
||||
|
||||
impl Deref for Map {
|
||||
type Target = IndexMap<Passkey, u32>;
|
||||
@@ -21,27 +19,32 @@ impl DerefMut for Map {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DBImportPasskey2Id {
|
||||
pub id: i32,
|
||||
pub passkey: Passkey,
|
||||
}
|
||||
|
||||
impl Map {
|
||||
pub async fn from_backend() -> Self {
|
||||
let base_url =
|
||||
std::env::var("ARCADIA_API_BASE_URL").expect("env var ARCADIA_API_BASE_URL not set");
|
||||
let url = format!("{}/api/tracker/passkeys-2-ids", base_url);
|
||||
pub async fn from_database(db: &sqlx::PgPool) -> Self {
|
||||
let rows = sqlx::query_as!(
|
||||
DBImportPasskey2Id,
|
||||
r#"
|
||||
SELECT
|
||||
id,
|
||||
passkey as "passkey: Passkey"
|
||||
FROM users
|
||||
WHERE banned = FALSE
|
||||
"#
|
||||
)
|
||||
.fetch_all(db)
|
||||
.await
|
||||
.expect("could not get passkeys2ids");
|
||||
|
||||
let client = Client::new();
|
||||
let api_key = std::env::var("API_KEY").expect("env var API_KEY not set");
|
||||
let resp = client
|
||||
.get(url)
|
||||
.header("api_key", api_key)
|
||||
.send()
|
||||
.await
|
||||
.expect("failed to fetch passkeys to ids");
|
||||
let bytes = resp
|
||||
.bytes()
|
||||
.await
|
||||
.expect("failed to read users response body");
|
||||
|
||||
let config = config::standard();
|
||||
let (map, _): (Map, usize) = bincode::decode_from_slice(&bytes[..], config).unwrap();
|
||||
let mut map: Map = Map(IndexMap::with_capacity(rows.len()));
|
||||
for r in rows {
|
||||
map.insert(r.passkey, r.id as u32);
|
||||
}
|
||||
|
||||
map
|
||||
}
|
||||
|
||||
@@ -1,28 +1,24 @@
|
||||
use std::fmt::Display;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::tracker::models::peer_id::PeerId;
|
||||
use crate::tracker::models::peer_update;
|
||||
use chrono::serde::ts_seconds;
|
||||
use indexmap::IndexMap;
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use sqlx::types::chrono::{DateTime, Utc};
|
||||
use sqlx::PgPool;
|
||||
use std::fmt::Display;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use crate::tracker::models::peer_id::PeerId;
|
||||
use crate::tracker::models::peer_update;
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct Map(IndexMap<Index, Peer>);
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, bincode::Encode, bincode::Decode)]
|
||||
pub struct Map(#[bincode(with_serde)] IndexMap<Index, Peer>);
|
||||
|
||||
#[derive(
|
||||
Debug, Clone, Copy, Deserialize, PartialEq, Eq, Hash, bincode::Encode, bincode::Decode,
|
||||
)]
|
||||
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Hash)]
|
||||
pub struct Index {
|
||||
pub user_id: u32,
|
||||
pub peer_id: PeerId,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Clone, Copy, Debug, Serialize, Deserialize, PartialEq, bincode::Encode, bincode::Decode,
|
||||
)]
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct Peer {
|
||||
pub ip_address: std::net::IpAddr,
|
||||
pub port: u16,
|
||||
@@ -31,7 +27,6 @@ pub struct Peer {
|
||||
// pub is_visible: bool,
|
||||
pub has_sent_completed: bool,
|
||||
#[serde(with = "ts_seconds")]
|
||||
#[bincode(with_serde)]
|
||||
pub updated_at: DateTime<Utc>,
|
||||
pub uploaded: u64,
|
||||
pub downloaded: u64,
|
||||
@@ -99,32 +94,46 @@ impl Serialize for Index {
|
||||
}
|
||||
|
||||
// we use peer_update::Index because we also need the torrent_id, which isn't in peer::Index
|
||||
pub async fn remove_peers_from_backend(peers: &Vec<peer_update::Index>) {
|
||||
pub async fn remove_peers_from_database(db: &PgPool, peers: &Vec<peer_update::Index>) {
|
||||
if peers.is_empty() {
|
||||
return;
|
||||
}
|
||||
let base_url =
|
||||
std::env::var("ARCADIA_API_BASE_URL").expect("env var ARCADIA_API_BASE_URL not set");
|
||||
let url = format!("{}/api/tracker/peers", base_url);
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let api_key = std::env::var("API_KEY").expect("env var API_KEY not set");
|
||||
let mut user_ids: Vec<i32> = Vec::with_capacity(peers.len());
|
||||
let mut torrent_ids: Vec<i32> = Vec::with_capacity(peers.len());
|
||||
let mut peer_ids: Vec<Vec<u8>> = Vec::with_capacity(peers.len());
|
||||
|
||||
let config = bincode::config::standard();
|
||||
let bytes = bincode::encode_to_vec(peers, config).expect("error encoding to bincode");
|
||||
for index in peers {
|
||||
user_ids.push(index.user_id as i32);
|
||||
torrent_ids.push(index.torrent_id as i32);
|
||||
peer_ids.push(index.peer_id.to_vec());
|
||||
}
|
||||
|
||||
let response = client
|
||||
.delete(url)
|
||||
.header("api_key", api_key)
|
||||
.header("Content-Type", "application/octet-stream")
|
||||
.body(bytes)
|
||||
.send()
|
||||
.await
|
||||
.expect("failed to send peer removals to backend");
|
||||
let result = sqlx::query!(
|
||||
r#"
|
||||
DELETE FROM peers
|
||||
WHERE (user_id, torrent_id, peer_id) IN (
|
||||
SELECT t.user_id, t.torrent_id, t.peer_id
|
||||
FROM (
|
||||
SELECT * FROM unnest(
|
||||
$1::int[],
|
||||
$2::int[],
|
||||
$3::bytea[]
|
||||
) AS t(user_id, torrent_id, peer_id)
|
||||
) AS t
|
||||
)
|
||||
"#,
|
||||
&user_ids,
|
||||
&torrent_ids,
|
||||
&peer_ids
|
||||
)
|
||||
.execute(db)
|
||||
.await
|
||||
.map_err(|e| Error::DatabseError(e.to_string()));
|
||||
|
||||
if !response.status().is_success() {
|
||||
if result.is_err() {
|
||||
// TODO: reinsert the updates that failed and retry
|
||||
panic!("Backend returned error: {}", response.text().await.unwrap());
|
||||
panic!("Failed removing peers from db: {}", result.err().unwrap());
|
||||
} else {
|
||||
log::info!("Removed {} peers", peers.len());
|
||||
}
|
||||
|
||||
@@ -8,9 +8,7 @@ use sqlx::{Database, Decode};
|
||||
|
||||
use crate::utils::hex_encode;
|
||||
|
||||
#[derive(
|
||||
Clone, Copy, Eq, Deserialize, Hash, PartialEq, PartialOrd, Ord, bincode::Encode, bincode::Decode,
|
||||
)]
|
||||
#[derive(Clone, Copy, Eq, Deserialize, Hash, PartialEq, PartialOrd, Ord)]
|
||||
pub struct PeerId(pub [u8; 20]);
|
||||
|
||||
impl Deref for PeerId {
|
||||
|
||||
@@ -1,30 +1,22 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use parking_lot::Mutex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::{types::ipnetwork::IpNetwork, PgPool};
|
||||
|
||||
use crate::tracker::models::{peer_id::PeerId, Flushable, Mergeable, Queue};
|
||||
use crate::{
|
||||
error::Error,
|
||||
tracker::models::{peer_id::PeerId, Flushable, Mergeable, Queue},
|
||||
};
|
||||
|
||||
// Fields must be in same order as database primary key
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
Eq,
|
||||
Hash,
|
||||
PartialEq,
|
||||
PartialOrd,
|
||||
Ord,
|
||||
bincode::Encode,
|
||||
bincode::Decode,
|
||||
)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq, PartialOrd, Ord)]
|
||||
pub struct Index {
|
||||
pub user_id: u32,
|
||||
pub torrent_id: u32,
|
||||
pub peer_id: PeerId,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, bincode::Encode, bincode::Decode)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PeerUpdate {
|
||||
pub ip: std::net::IpAddr,
|
||||
pub port: u16,
|
||||
@@ -34,9 +26,7 @@ pub struct PeerUpdate {
|
||||
pub is_active: bool,
|
||||
pub is_seeder: bool,
|
||||
pub left: u64,
|
||||
#[bincode(with_serde)]
|
||||
pub created_at: DateTime<Utc>,
|
||||
#[bincode(with_serde)]
|
||||
pub updated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
@@ -59,7 +49,7 @@ impl Mergeable for PeerUpdate {
|
||||
}
|
||||
|
||||
impl Flushable<PeerUpdate> for Mutex<Queue<Index, PeerUpdate>> {
|
||||
async fn flush_to_backend(&self) {
|
||||
async fn flush_to_database(&self, db: &PgPool) {
|
||||
let amount_of_updates = self.lock().records.len();
|
||||
let updates = self
|
||||
.lock()
|
||||
@@ -69,28 +59,132 @@ impl Flushable<PeerUpdate> for Mutex<Queue<Index, PeerUpdate>> {
|
||||
if updates.is_empty() {
|
||||
return;
|
||||
}
|
||||
let base_url =
|
||||
std::env::var("ARCADIA_API_BASE_URL").expect("env var ARCADIA_API_BASE_URL not set");
|
||||
let url = format!("{}/api/tracker/peer-updates", base_url);
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let api_key = std::env::var("API_KEY").expect("env var API_KEY not set");
|
||||
let mut user_ids: Vec<i32> = Vec::with_capacity(updates.len());
|
||||
let mut torrent_ids: Vec<i32> = Vec::with_capacity(updates.len());
|
||||
let mut peer_ids: Vec<Vec<u8>> = Vec::with_capacity(updates.len());
|
||||
let mut ips: Vec<IpNetwork> = Vec::with_capacity(updates.len());
|
||||
let mut ports: Vec<i32> = Vec::with_capacity(updates.len());
|
||||
let mut agents: Vec<String> = Vec::with_capacity(updates.len());
|
||||
let mut uploadeds: Vec<i64> = Vec::with_capacity(updates.len());
|
||||
let mut downloadeds: Vec<i64> = Vec::with_capacity(updates.len());
|
||||
let mut lefts: Vec<i64> = Vec::with_capacity(updates.len());
|
||||
let mut actives: Vec<bool> = Vec::with_capacity(updates.len());
|
||||
let mut seeders: Vec<bool> = Vec::with_capacity(updates.len());
|
||||
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(updates.len());
|
||||
let mut updated_ats: Vec<DateTime<Utc>> = Vec::with_capacity(updates.len());
|
||||
|
||||
let config = bincode::config::standard();
|
||||
let bytes = bincode::encode_to_vec(updates, config).expect("error encoding to bincode");
|
||||
for (index, update) in updates {
|
||||
user_ids.push(index.user_id as i32);
|
||||
torrent_ids.push(index.torrent_id as i32);
|
||||
peer_ids.push(index.peer_id.to_vec());
|
||||
ips.push(IpNetwork::from(update.ip));
|
||||
ports.push(update.port as i32);
|
||||
agents.push(update.agent.clone());
|
||||
uploadeds.push(update.uploaded as i64);
|
||||
downloadeds.push(update.downloaded as i64);
|
||||
lefts.push(update.left as i64);
|
||||
actives.push(update.is_active);
|
||||
seeders.push(update.is_seeder);
|
||||
created_ats.push(update.created_at);
|
||||
updated_ats.push(update.updated_at);
|
||||
}
|
||||
|
||||
let response = client
|
||||
.post(url)
|
||||
.header("api_key", api_key)
|
||||
.header("Content-Type", "application/octet-stream")
|
||||
.body(bytes)
|
||||
.send()
|
||||
.await
|
||||
.expect("failed to send peer updates to backend");
|
||||
let result = sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO peers (
|
||||
peer_id,
|
||||
ip,
|
||||
port,
|
||||
agent,
|
||||
uploaded,
|
||||
downloaded,
|
||||
"left",
|
||||
active,
|
||||
seeder,
|
||||
created_at,
|
||||
updated_at,
|
||||
torrent_id,
|
||||
user_id
|
||||
)
|
||||
SELECT
|
||||
t.peer_id,
|
||||
t.ip,
|
||||
t.port,
|
||||
t.agent,
|
||||
t.uploaded,
|
||||
t.downloaded,
|
||||
t."left",
|
||||
t.active,
|
||||
t.seeder,
|
||||
-- stored as timestamp without time zone in DB
|
||||
(t.created_at AT TIME ZONE 'UTC')::timestamp,
|
||||
(t.updated_at AT TIME ZONE 'UTC')::timestamp,
|
||||
t.torrent_id,
|
||||
t.user_id
|
||||
FROM (
|
||||
SELECT * FROM unnest(
|
||||
$1::bytea[],
|
||||
$2::inet[],
|
||||
$3::int[],
|
||||
$4::varchar[],
|
||||
$5::bigint[],
|
||||
$6::bigint[],
|
||||
$7::bigint[],
|
||||
$8::boolean[],
|
||||
$9::boolean[],
|
||||
$10::timestamptz[],
|
||||
$11::timestamptz[],
|
||||
$12::int[],
|
||||
$13::int[]
|
||||
) AS t(
|
||||
peer_id,
|
||||
ip,
|
||||
port,
|
||||
agent,
|
||||
uploaded,
|
||||
downloaded,
|
||||
"left",
|
||||
active,
|
||||
seeder,
|
||||
created_at,
|
||||
updated_at,
|
||||
torrent_id,
|
||||
user_id
|
||||
)
|
||||
) AS t
|
||||
ON CONFLICT (user_id, torrent_id, peer_id) DO UPDATE SET
|
||||
ip = EXCLUDED.ip,
|
||||
port = EXCLUDED.port,
|
||||
agent = EXCLUDED.agent,
|
||||
uploaded = EXCLUDED.uploaded,
|
||||
downloaded = EXCLUDED.downloaded,
|
||||
"left" = EXCLUDED."left",
|
||||
active = EXCLUDED.active,
|
||||
seeder = EXCLUDED.seeder,
|
||||
updated_at = EXCLUDED.updated_at
|
||||
"#,
|
||||
&peer_ids,
|
||||
&ips,
|
||||
&ports,
|
||||
&agents,
|
||||
&uploadeds,
|
||||
&downloadeds,
|
||||
&lefts,
|
||||
&actives,
|
||||
&seeders,
|
||||
&created_ats,
|
||||
&updated_ats,
|
||||
&torrent_ids,
|
||||
&user_ids
|
||||
)
|
||||
.execute(db)
|
||||
.await
|
||||
.map_err(|e| Error::DatabseError(e.to_string()));
|
||||
|
||||
if !response.status().is_success() {
|
||||
if result.is_err() {
|
||||
// TODO: reinsert the updates that failed and retry
|
||||
panic!("Backend returned error: {}", response.text().await.unwrap());
|
||||
panic!("Failed to insert peer updates: {}", result.err().unwrap());
|
||||
} else {
|
||||
log::info!("Inserted {amount_of_updates} peer updates");
|
||||
}
|
||||
|
||||
@@ -1,27 +1,18 @@
|
||||
use bincode::config;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures_util::TryStreamExt;
|
||||
use indexmap::IndexMap;
|
||||
use reqwest::Client;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::{Database, Decode};
|
||||
use sqlx::{Database, Decode, PgPool};
|
||||
use std::net::IpAddr;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use crate::tracker::models::peer;
|
||||
use crate::tracker::models::peer::{self, Peer};
|
||||
use crate::tracker::models::peer_id::PeerId;
|
||||
|
||||
#[derive(
|
||||
Clone,
|
||||
Copy,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
Debug,
|
||||
Eq,
|
||||
Hash,
|
||||
PartialEq,
|
||||
bincode::Encode,
|
||||
bincode::Decode,
|
||||
)]
|
||||
#[derive(Clone, Copy, Serialize, Deserialize, Debug, Eq, Hash, PartialEq)]
|
||||
pub struct InfoHash(pub [u8; 20]);
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, bincode::Encode, bincode::Decode, PartialEq)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct Torrent {
|
||||
pub upload_factor: i16,
|
||||
pub download_factor: i16,
|
||||
@@ -32,30 +23,100 @@ pub struct Torrent {
|
||||
pub peers: peer::Map,
|
||||
}
|
||||
|
||||
#[derive(Debug, bincode::Encode, bincode::Decode)]
|
||||
pub struct Map(#[bincode(with_serde)] pub IndexMap<u32, Torrent>);
|
||||
#[derive(Debug)]
|
||||
pub struct Map(pub IndexMap<u32, Torrent>);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DBImportTorrent {
|
||||
pub id: i32,
|
||||
pub upload_factor: i16,
|
||||
pub download_factor: i16,
|
||||
pub seeders: i64,
|
||||
pub leechers: i64,
|
||||
pub times_completed: i32,
|
||||
pub is_deleted: bool,
|
||||
}
|
||||
|
||||
impl Map {
|
||||
pub async fn from_backend() -> Self {
|
||||
let base_url =
|
||||
std::env::var("ARCADIA_API_BASE_URL").expect("env var ARCADIA_API_BASE_URL not set");
|
||||
let url = format!("{}/api/tracker/torrents", base_url);
|
||||
pub async fn from_database(db: &PgPool) -> Self {
|
||||
let rows = sqlx::query_as!(
|
||||
DBImportTorrent,
|
||||
r#"
|
||||
SELECT
|
||||
id,
|
||||
upload_factor,
|
||||
download_factor,
|
||||
seeders,
|
||||
leechers,
|
||||
times_completed,
|
||||
CASE
|
||||
WHEN deleted_at IS NOT NULL THEN TRUE
|
||||
ELSE FALSE
|
||||
END AS "is_deleted!"
|
||||
FROM torrents
|
||||
"#
|
||||
)
|
||||
.fetch_all(db)
|
||||
.await
|
||||
.expect("could not get torrents");
|
||||
|
||||
let client = Client::new();
|
||||
let api_key = std::env::var("API_KEY").expect("env var API_KEY not set");
|
||||
let resp = client
|
||||
.get(url)
|
||||
.header("api_key", api_key)
|
||||
.send()
|
||||
.await
|
||||
.expect("failed to fetch torrents");
|
||||
let bytes = resp
|
||||
.bytes()
|
||||
.await
|
||||
.expect("failed to read torrents response body");
|
||||
let mut map: Map = Map(IndexMap::with_capacity(rows.len()));
|
||||
for r in rows {
|
||||
let torrent = Torrent {
|
||||
upload_factor: r.upload_factor,
|
||||
download_factor: r.download_factor,
|
||||
seeders: r.seeders as u32,
|
||||
leechers: r.leechers as u32,
|
||||
times_completed: r.times_completed as u32,
|
||||
is_deleted: r.is_deleted,
|
||||
peers: peer::Map::new(),
|
||||
};
|
||||
map.insert(r.id as u32, torrent);
|
||||
}
|
||||
|
||||
// Load peers into each torrent
|
||||
let mut peers = sqlx::query!(
|
||||
r#"
|
||||
SELECT
|
||||
peers.ip as "ip_address: IpAddr",
|
||||
peers.user_id as "user_id",
|
||||
peers.torrent_id as "torrent_id",
|
||||
peers.port as "port",
|
||||
peers.seeder as "is_seeder: bool",
|
||||
peers.active as "is_active: bool",
|
||||
peers.updated_at as "updated_at: DateTime<Utc>",
|
||||
peers.uploaded as "uploaded",
|
||||
peers.downloaded as "downloaded",
|
||||
peers.peer_id as "peer_id: PeerId"
|
||||
FROM
|
||||
peers
|
||||
"#
|
||||
)
|
||||
.fetch(db);
|
||||
|
||||
while let Some(peer) = peers.try_next().await.expect("Failed loading peers.") {
|
||||
map.entry(peer.torrent_id as u32).and_modify(|torrent| {
|
||||
torrent.peers.insert(
|
||||
peer::Index {
|
||||
user_id: peer.user_id as u32,
|
||||
peer_id: peer.peer_id,
|
||||
},
|
||||
Peer {
|
||||
ip_address: peer.ip_address,
|
||||
port: peer.port as u16,
|
||||
is_seeder: peer.is_seeder,
|
||||
is_active: peer.is_active,
|
||||
has_sent_completed: false,
|
||||
updated_at: peer
|
||||
.updated_at
|
||||
.expect("Peer with a null updated_at found in database."),
|
||||
uploaded: peer.uploaded as u64,
|
||||
downloaded: peer.downloaded as u64,
|
||||
},
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
let config = config::standard();
|
||||
let (map, _): (Map, usize) = bincode::decode_from_slice(&bytes[..], config).unwrap();
|
||||
map
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,33 +1,20 @@
|
||||
use crate::tracker::models::{Flushable, Mergeable, Queue};
|
||||
use crate::tracker::models::Mergeable;
|
||||
use chrono::{DateTime, Local};
|
||||
use parking_lot::Mutex;
|
||||
// use parking_lot::Mutex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
// use sqlx::PgPool;
|
||||
|
||||
// TODO: use this to populate the torrent activites table
|
||||
// or something else if we find a better solution
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
Eq,
|
||||
Hash,
|
||||
PartialEq,
|
||||
PartialOrd,
|
||||
Ord,
|
||||
bincode::Encode,
|
||||
bincode::Decode,
|
||||
)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq, PartialOrd, Ord)]
|
||||
pub struct Index {
|
||||
pub torrent_id: u32,
|
||||
pub user_id: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, bincode::Encode, bincode::Decode)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct TorrentActivityUpdate {
|
||||
#[bincode(with_serde)]
|
||||
pub snatched_at: Option<DateTime<Local>>,
|
||||
#[bincode(with_serde)]
|
||||
pub last_seen_seeding_at: Option<DateTime<Local>>,
|
||||
pub uploaded_delta: u64,
|
||||
pub real_uploaded_delta: u64,
|
||||
@@ -55,41 +42,23 @@ impl Mergeable for TorrentActivityUpdate {
|
||||
}
|
||||
}
|
||||
|
||||
impl Flushable<TorrentActivityUpdate> for Mutex<Queue<Index, TorrentActivityUpdate>> {
|
||||
async fn flush_to_backend(&self) {
|
||||
let amount_of_updates = self.lock().records.len();
|
||||
let updates = self
|
||||
.lock()
|
||||
.records
|
||||
.drain(0..amount_of_updates)
|
||||
.collect::<Vec<(Index, TorrentActivityUpdate)>>();
|
||||
if updates.is_empty() {
|
||||
return;
|
||||
}
|
||||
let base_url =
|
||||
std::env::var("ARCADIA_API_BASE_URL").expect("env var ARCADIA_API_BASE_URL not set");
|
||||
let url = format!("{}/api/tracker/torrent-activity-updates", base_url);
|
||||
// impl Flushable<TorrentActivityUpdate> for Mutex<Queue<Index, TorrentActivityUpdate>> {
|
||||
// async fn flush_to_database(&self, db: &PgPool) {
|
||||
// let amount_of_updates = self.lock().records.len();
|
||||
// let updates = self
|
||||
// .lock()
|
||||
// .records
|
||||
// .drain(0..amount_of_updates)
|
||||
// .collect::<Vec<(Index, TorrentActivityUpdate)>>();
|
||||
// if updates.is_empty() {
|
||||
// return;
|
||||
// }
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let api_key = std::env::var("API_KEY").expect("env var API_KEY not set");
|
||||
|
||||
let config = bincode::config::standard();
|
||||
let bytes = bincode::encode_to_vec(updates, config).expect("error encoding to bincode");
|
||||
|
||||
let response = client
|
||||
.post(url)
|
||||
.header("api_key", api_key)
|
||||
.header("Content-Type", "application/octet-stream")
|
||||
.body(bytes)
|
||||
.send()
|
||||
.await
|
||||
.expect("failed to send torrent activity updates to backend");
|
||||
|
||||
if !response.status().is_success() {
|
||||
// TODO: reinsert the updates that failed and retry
|
||||
panic!("Backend returned error: {}", response.text().await.unwrap());
|
||||
} else {
|
||||
log::info!("Inserted {amount_of_updates} torrent activity updates");
|
||||
}
|
||||
}
|
||||
}
|
||||
// // if !response.status().is_success() {
|
||||
// // // TODO: reinsert the updates that failed and retry
|
||||
// // panic!("Backend returned error: {}", response.text().await.unwrap());
|
||||
// // } else {
|
||||
// // log::info!("Inserted {amount_of_updates} torrent activity updates");
|
||||
// // }
|
||||
// }
|
||||
// }
|
||||
|
||||
@@ -1,26 +1,18 @@
|
||||
use crate::tracker::models::{Flushable, Mergeable, Queue};
|
||||
use crate::{
|
||||
error::Error,
|
||||
tracker::models::{Flushable, Mergeable, Queue},
|
||||
};
|
||||
use parking_lot::Mutex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::PgPool;
|
||||
|
||||
// Fields must be in same order as database primary key
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
Eq,
|
||||
Hash,
|
||||
PartialEq,
|
||||
PartialOrd,
|
||||
Ord,
|
||||
bincode::Encode,
|
||||
bincode::Decode,
|
||||
)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq, PartialOrd, Ord)]
|
||||
pub struct Index {
|
||||
pub torrent_id: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, bincode::Encode, bincode::Decode)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct TorrentUpdate {
|
||||
pub seeder_delta: i32,
|
||||
pub leecher_delta: i32,
|
||||
@@ -38,7 +30,7 @@ impl Mergeable for TorrentUpdate {
|
||||
}
|
||||
|
||||
impl Flushable<TorrentUpdate> for Mutex<Queue<Index, TorrentUpdate>> {
|
||||
async fn flush_to_backend(&self) {
|
||||
async fn flush_to_database(&self, db: &PgPool) {
|
||||
let amount_of_updates = self.lock().records.len();
|
||||
let updates = self
|
||||
.lock()
|
||||
@@ -48,28 +40,46 @@ impl Flushable<TorrentUpdate> for Mutex<Queue<Index, TorrentUpdate>> {
|
||||
if updates.is_empty() {
|
||||
return;
|
||||
}
|
||||
let base_url =
|
||||
std::env::var("ARCADIA_API_BASE_URL").expect("env var ARCADIA_API_BASE_URL not set");
|
||||
let url = format!("{}/api/tracker/torrent-updates", base_url);
|
||||
let mut torrent_ids = Vec::new();
|
||||
let mut seeder_deltas = Vec::new();
|
||||
let mut leecher_deltas = Vec::new();
|
||||
let mut times_completed_deltas = Vec::new();
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let api_key = std::env::var("API_KEY").expect("env var API_KEY not set");
|
||||
for (index, update) in updates {
|
||||
torrent_ids.push(index.torrent_id as i32);
|
||||
seeder_deltas.push(update.seeder_delta as i64);
|
||||
leecher_deltas.push(update.leecher_delta as i64);
|
||||
times_completed_deltas.push(update.times_completed_delta as i64);
|
||||
}
|
||||
|
||||
let config = bincode::config::standard();
|
||||
let bytes = bincode::encode_to_vec(updates, config).expect("error encoding to bincode");
|
||||
let result = sqlx::query!(
|
||||
r#"
|
||||
UPDATE torrents
|
||||
SET
|
||||
seeders = seeders + updates.seeder_delta,
|
||||
leechers = leechers + updates.leecher_delta,
|
||||
times_completed = times_completed + updates.times_completed_delta
|
||||
FROM (
|
||||
SELECT * FROM unnest($1::int[], $2::bigint[], $3::bigint[], $4::bigint[]) AS
|
||||
t(torrent_id, seeder_delta, leecher_delta, times_completed_delta)
|
||||
) AS updates
|
||||
WHERE torrents.id = updates.torrent_id
|
||||
"#,
|
||||
&torrent_ids,
|
||||
&seeder_deltas,
|
||||
&leecher_deltas,
|
||||
×_completed_deltas,
|
||||
)
|
||||
.execute(db)
|
||||
.await
|
||||
.map_err(|e| Error::DatabseError(e.to_string()));
|
||||
|
||||
let response = client
|
||||
.post(url)
|
||||
.header("api_key", api_key)
|
||||
.header("Content-Type", "application/octet-stream")
|
||||
.body(bytes)
|
||||
.send()
|
||||
.await
|
||||
.expect("failed to send torrent updates to backend");
|
||||
|
||||
if !response.status().is_success() {
|
||||
if result.is_err() {
|
||||
// TODO: reinsert the updates that failed and retry
|
||||
panic!("Backend returned error: {}", response.text().await.unwrap());
|
||||
panic!(
|
||||
"Failed to insert torrent updates: {}",
|
||||
result.err().unwrap()
|
||||
);
|
||||
} else {
|
||||
log::info!("Inserted {amount_of_updates} torrent updates");
|
||||
}
|
||||
|
||||
@@ -1,26 +1,24 @@
|
||||
use anyhow::bail;
|
||||
use bincode::config;
|
||||
use indexmap::IndexMap;
|
||||
use reqwest::Client;
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use sqlx::{Database, Decode};
|
||||
use sqlx::{Database, Decode, PgPool};
|
||||
use std::{
|
||||
fmt::Display,
|
||||
ops::{Deref, DerefMut},
|
||||
str::FromStr,
|
||||
};
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, bincode::Encode, bincode::Decode)]
|
||||
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
|
||||
pub struct Passkey(pub [u8; 32]);
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, bincode::Encode, bincode::Decode, PartialEq)]
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
|
||||
pub struct User {
|
||||
pub num_seeding: u32,
|
||||
pub num_leeching: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, bincode::Encode, bincode::Decode)]
|
||||
pub struct Map(#[bincode(with_serde)] pub IndexMap<u32, User>);
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct Map(pub IndexMap<u32, User>);
|
||||
|
||||
impl FromStr for Passkey {
|
||||
type Err = anyhow::Error;
|
||||
@@ -93,27 +91,39 @@ impl DerefMut for Map {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DBImportUser {
|
||||
pub id: i32,
|
||||
pub passkey: Passkey,
|
||||
pub num_seeding: i32,
|
||||
pub num_leeching: i32,
|
||||
}
|
||||
|
||||
impl Map {
|
||||
pub async fn from_backend() -> Self {
|
||||
let base_url =
|
||||
std::env::var("ARCADIA_API_BASE_URL").expect("env var ARCADIA_API_BASE_URL not set");
|
||||
let url = format!("{}/api/tracker/users", base_url);
|
||||
pub async fn from_database(db: &PgPool) -> Self {
|
||||
let rows = sqlx::query_as!(
|
||||
DBImportUser,
|
||||
r#"
|
||||
SELECT
|
||||
id,
|
||||
passkey as "passkey: Passkey",
|
||||
0::INT AS "num_seeding!",
|
||||
0::INT AS "num_leeching!"
|
||||
FROM users
|
||||
"#
|
||||
)
|
||||
.fetch_all(db)
|
||||
.await
|
||||
.expect("could not get users");
|
||||
|
||||
let client = Client::new();
|
||||
let api_key = std::env::var("API_KEY").expect("env var API_KEY not set");
|
||||
let resp = client
|
||||
.get(url)
|
||||
.header("api_key", api_key)
|
||||
.send()
|
||||
.await
|
||||
.expect("failed to fetch users");
|
||||
let bytes = resp
|
||||
.bytes()
|
||||
.await
|
||||
.expect("failed to read users response body");
|
||||
|
||||
let config = config::standard();
|
||||
let (map, _): (Map, usize) = bincode::decode_from_slice(&bytes[..], config).unwrap();
|
||||
let mut map: Map = Map(IndexMap::with_capacity(rows.len()));
|
||||
for r in rows {
|
||||
let user = User {
|
||||
num_seeding: r.num_seeding as u32,
|
||||
num_leeching: r.num_leeching as u32,
|
||||
};
|
||||
map.insert(r.id as u32, user);
|
||||
}
|
||||
|
||||
map
|
||||
}
|
||||
|
||||
@@ -1,27 +1,19 @@
|
||||
use parking_lot::Mutex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::PgPool;
|
||||
|
||||
use crate::tracker::models::{Flushable, Mergeable, Queue};
|
||||
use crate::{
|
||||
error::Error,
|
||||
tracker::models::{Flushable, Mergeable, Queue},
|
||||
};
|
||||
|
||||
// Fields must be in same order as database primary key
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
Eq,
|
||||
Hash,
|
||||
PartialEq,
|
||||
PartialOrd,
|
||||
Ord,
|
||||
bincode::Encode,
|
||||
bincode::Decode,
|
||||
)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq, PartialOrd, Ord)]
|
||||
pub struct Index {
|
||||
pub user_id: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, bincode::Encode, bincode::Decode)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct UserUpdate {
|
||||
pub uploaded_delta: u64,
|
||||
pub downloaded_delta: u64,
|
||||
@@ -43,7 +35,7 @@ impl Mergeable for UserUpdate {
|
||||
}
|
||||
|
||||
impl Flushable<UserUpdate> for Mutex<Queue<Index, UserUpdate>> {
|
||||
async fn flush_to_backend(&self) {
|
||||
async fn flush_to_database(&self, db: &PgPool) {
|
||||
let amount_of_updates = self.lock().records.len();
|
||||
let updates = self
|
||||
.lock()
|
||||
@@ -53,28 +45,47 @@ impl Flushable<UserUpdate> for Mutex<Queue<Index, UserUpdate>> {
|
||||
if updates.is_empty() {
|
||||
return;
|
||||
}
|
||||
let base_url =
|
||||
std::env::var("ARCADIA_API_BASE_URL").expect("env var ARCADIA_API_BASE_URL not set");
|
||||
let url = format!("{}/api/tracker/user-updates", base_url);
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let api_key = std::env::var("API_KEY").expect("env var API_KEY not set");
|
||||
let mut user_ids = Vec::new();
|
||||
let mut uploaded_deltas = Vec::new();
|
||||
let mut downloaded_deltas = Vec::new();
|
||||
let mut real_uploaded_deltas = Vec::new();
|
||||
let mut real_downloaded_deltas = Vec::new();
|
||||
|
||||
let config = bincode::config::standard();
|
||||
let bytes = bincode::encode_to_vec(updates, config).expect("error encoding to bincode");
|
||||
for (index, update) in updates {
|
||||
user_ids.push(index.user_id as i32);
|
||||
uploaded_deltas.push(update.uploaded_delta as i64);
|
||||
downloaded_deltas.push(update.downloaded_delta as i64);
|
||||
real_uploaded_deltas.push(update.real_uploaded_delta as i64);
|
||||
real_downloaded_deltas.push(update.real_downloaded_delta as i64);
|
||||
}
|
||||
|
||||
let response = client
|
||||
.post(url)
|
||||
.header("api_key", api_key)
|
||||
.header("Content-Type", "application/octet-stream")
|
||||
.body(bytes)
|
||||
.send()
|
||||
.await
|
||||
.expect("failed to send user updates to backend");
|
||||
let result = sqlx::query!(
|
||||
r#"
|
||||
UPDATE users
|
||||
SET
|
||||
uploaded = uploaded + updates.uploaded_delta,
|
||||
downloaded = downloaded + updates.downloaded_delta,
|
||||
real_uploaded = real_uploaded + updates.uploaded_delta,
|
||||
real_downloaded = real_downloaded + updates.downloaded_delta
|
||||
FROM (
|
||||
SELECT * FROM unnest($1::int[], $2::bigint[], $3::bigint[], $4::bigint[], $5::bigint[]) AS
|
||||
t(user_id, uploaded_delta, downloaded_delta, real_uploaded_delta, real_downloaded_delta)
|
||||
) AS updates
|
||||
WHERE users.id = updates.user_id
|
||||
"#,
|
||||
&user_ids,
|
||||
&uploaded_deltas,
|
||||
&downloaded_deltas,
|
||||
&real_uploaded_deltas,
|
||||
&real_downloaded_deltas
|
||||
)
|
||||
.execute(db)
|
||||
.await.map_err(|e| Error::DatabseError(e.to_string()));
|
||||
|
||||
if !response.status().is_success() {
|
||||
if result.is_err() {
|
||||
// TODO: reinsert the updates that failed and retry
|
||||
panic!("Backend returned error: {}", response.text().await.unwrap());
|
||||
panic!("failed inserting user updates: {}", result.err().unwrap());
|
||||
} else {
|
||||
log::info!("Inserted {amount_of_updates} user updates");
|
||||
}
|
||||
|
||||
@@ -92,3 +92,5 @@ INACTIVE_PEER_TTL=1814400
|
||||
|
||||
## Backend Configuration
|
||||
ARCADIA_API_BASE_URL=http://localhost:8080
|
||||
# connection string for the database.
|
||||
DATABASE_URL=postgresql://arcadia:password@localhost:4321/arcadia
|
||||
|
||||
@@ -27,3 +27,4 @@ reqwest = { version = "0.12", default-features = false, features = ["blocking",
|
||||
percent-encoding = "2.3.1"
|
||||
rand = { version = "0.9.2", default-features = false, features = ["alloc", "thread_rng"] }
|
||||
chrono = { version = "0.4.41", default-features = false, features = ["serde"] }
|
||||
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "chrono", "ipnetwork" ] }
|
||||
|
||||
@@ -5,9 +5,10 @@ use arcadia_shared::tracker::models::{
|
||||
Queue,
|
||||
};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use sqlx::{postgres::PgPoolOptions, PgPool};
|
||||
|
||||
use crate::env::Env;
|
||||
use std::{io::Write, ops::Deref};
|
||||
use std::{io::Write, ops::Deref, time::Duration};
|
||||
|
||||
pub mod announce;
|
||||
pub mod api_doc;
|
||||
@@ -21,6 +22,8 @@ pub mod services;
|
||||
pub struct Tracker {
|
||||
pub env: Env,
|
||||
|
||||
pub pool: PgPool,
|
||||
|
||||
pub users: RwLock<arcadia_shared::tracker::models::user::Map>,
|
||||
pub passkey2id: RwLock<arcadia_shared::tracker::models::passkey_2_id::Map>,
|
||||
pub infohash2id: RwLock<arcadia_shared::tracker::models::infohash_2_id::Map>,
|
||||
@@ -48,28 +51,36 @@ impl Tracker {
|
||||
log::info!("[Setup] Got shared env");
|
||||
println!("{:?}", env);
|
||||
|
||||
print!("Connecting to database... ");
|
||||
std::io::stdout().flush().unwrap();
|
||||
let pool = connect_to_database().await;
|
||||
println!("[Finished]");
|
||||
|
||||
log::info!("[Setup] Getting users...");
|
||||
std::io::stdout().flush().unwrap();
|
||||
let users = arcadia_shared::tracker::models::user::Map::from_backend().await;
|
||||
let users = arcadia_shared::tracker::models::user::Map::from_database(&pool).await;
|
||||
log::info!("[Setup] Got {:?} users", users.len());
|
||||
|
||||
log::info!("[Setup] Getting passkey2id...");
|
||||
std::io::stdout().flush().unwrap();
|
||||
let passkey2id = arcadia_shared::tracker::models::passkey_2_id::Map::from_backend().await;
|
||||
let passkey2id =
|
||||
arcadia_shared::tracker::models::passkey_2_id::Map::from_database(&pool).await;
|
||||
log::info!("[Setup] Got {:?} passkey2ids", passkey2id.len());
|
||||
|
||||
log::info!("[Setup] Getting infohash2id...");
|
||||
std::io::stdout().flush().unwrap();
|
||||
let infohash2id = arcadia_shared::tracker::models::infohash_2_id::Map::from_backend().await;
|
||||
let infohash2id =
|
||||
arcadia_shared::tracker::models::infohash_2_id::Map::from_database(&pool).await;
|
||||
log::info!("[Setup] Got {:?} infohash2ids", infohash2id.len());
|
||||
|
||||
log::info!("[Setup] Getting torrents...");
|
||||
std::io::stdout().flush().unwrap();
|
||||
let torrents = arcadia_shared::tracker::models::torrent::Map::from_backend().await;
|
||||
let torrents = arcadia_shared::tracker::models::torrent::Map::from_database(&pool).await;
|
||||
log::info!("[Setup] Got {:?} torrents", torrents.len());
|
||||
|
||||
Self {
|
||||
env,
|
||||
pool,
|
||||
users: RwLock::new(users),
|
||||
passkey2id: RwLock::new(passkey2id),
|
||||
infohash2id: RwLock::new(infohash2id),
|
||||
@@ -80,3 +91,16 @@ impl Tracker {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect_to_database() -> sqlx::Pool<sqlx::Postgres> {
|
||||
// Get pool of database connections.
|
||||
PgPoolOptions::new()
|
||||
.min_connections(0)
|
||||
.max_connections(60)
|
||||
.max_lifetime(Duration::from_secs(30 * 60))
|
||||
.idle_timeout(Duration::from_secs(10 * 60))
|
||||
.acquire_timeout(Duration::from_secs(30))
|
||||
.connect(&std::env::var("DATABASE_URL").expect("DATABASE_URL not found in .env file. Aborting."))
|
||||
.await
|
||||
.expect("Could not connect to the database using the DATABASE_URL value in .env file. Aborting.")
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use actix_web::web::Data;
|
||||
use arcadia_shared::tracker::models::{
|
||||
peer::remove_peers_from_backend,
|
||||
peer::remove_peers_from_database,
|
||||
peer_update,
|
||||
torrent_update::{self, TorrentUpdate},
|
||||
Flushable,
|
||||
@@ -30,9 +30,9 @@ pub async fn handle(arc: &Data<Tracker>) {
|
||||
|
||||
pub async fn flush(arc: &Data<Tracker>) {
|
||||
join!(
|
||||
arc.user_updates.flush_to_backend(),
|
||||
arc.torrent_updates.flush_to_backend(),
|
||||
arc.peer_updates.flush_to_backend()
|
||||
arc.user_updates.flush_to_database(&arc.pool),
|
||||
arc.torrent_updates.flush_to_database(&arc.pool),
|
||||
arc.peer_updates.flush_to_database(&arc.pool)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -107,5 +107,5 @@ pub async fn reap(arc: &Data<Tracker>) {
|
||||
}
|
||||
}
|
||||
|
||||
remove_peers_from_backend(&all_removed_peers).await;
|
||||
remove_peers_from_database(&arc.pool, &all_removed_peers).await;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user