From 7d7ba2fbab641b84d320bda83ffacd9c8d4e8a83 Mon Sep 17 00:00:00 2001 From: Maya Date: Sun, 16 Nov 2025 20:37:29 -0500 Subject: [PATCH] feat: daemon mode foundations --- backend/src/bin/daemon.rs | 43 ++++++++++++++------ backend/src/daemon/runtime/service.rs | 2 + backend/src/daemon/shared/storage.rs | 13 ++++++ backend/src/server/daemons/handlers.rs | 1 + backend/src/server/daemons/impl/api.rs | 6 ++- backend/src/server/daemons/impl/base.rs | 12 ++++++ backend/src/server/daemons/impl/storage.rs | 11 ++++- backend/src/server/shared/storage/generic.rs | 1 + backend/src/server/shared/storage/traits.rs | 3 +- backend/src/tests/mod.rs | 3 +- 10 files changed, 78 insertions(+), 17 deletions(-) diff --git a/backend/src/bin/daemon.rs b/backend/src/bin/daemon.rs index 3dd4fa07..fcce664f 100644 --- a/backend/src/bin/daemon.rs +++ b/backend/src/bin/daemon.rs @@ -1,12 +1,15 @@ use axum::{Router, http::Method}; use clap::Parser; -use netvisor::daemon::{ - runtime::types::DaemonAppState, - shared::{ - handlers::create_router, - storage::{AppConfig, CliArgs, ConfigStore}, +use netvisor::{ + daemon::{ + runtime::types::DaemonAppState, + shared::{ + handlers::create_router, + storage::{AppConfig, CliArgs, ConfigStore}, + }, + utils::base::{DaemonUtils, PlatformDaemonUtils}, }, - utils::base::{DaemonUtils, PlatformDaemonUtils}, + server::daemons::r#impl::base::DaemonMode, }; use std::sync::Arc; use tower::ServiceBuilder; @@ -68,6 +71,9 @@ struct Cli { /// Docker socket proxy #[arg(long)] docker_proxy: Option, + + #[arg(long)] + mode: Option, } impl From for CliArgs { @@ -84,6 +90,7 @@ impl From for CliArgs { concurrent_scans: cli.concurrent_scans, daemon_api_key: cli.daemon_api_key, docker_proxy: cli.docker_proxy, + mode: cli.mode, } } } @@ -118,6 +125,7 @@ async fn main() -> anyhow::Result<()> { let server_addr = &config_store.get_server_endpoint().await?; let network_id = &config_store.get_network_id().await?; let api_key = &config_store.get_api_key().await?; + let mode = &config_store.get_mode().await?; let state = DaemonAppState::new(config_store, utils).await?; let runtime_service = state.services.runtime_service.clone(); @@ -164,12 +172,23 @@ async fn main() -> anyhow::Result<()> { tracing::info!("Missing network ID - waiting for server to hit /api/initialize..."); } - // Spawn heartbeat task in background - tokio::spawn(async move { - if let Err(e) = runtime_service.heartbeat().await { - tracing::warn!("Failed to update heartbeat timestamp: {}", e); - } - }); + if *mode == DaemonMode::Push { + tracing::info!("Daemon running in Push mode"); + // Spawn heartbeat task in background + tokio::spawn(async move { + if let Err(e) = runtime_service.heartbeat().await { + tracing::warn!("Failed to update heartbeat timestamp: {}", e); + } + }); + } else { + tracing::info!("Daemon running in Pull mode"); + // Spawn request work in background + tokio::spawn(async move { + if let Err(e) = runtime_service.heartbeat().await { + tracing::warn!("Failed to update heartbeat timestamp: {}", e); + } + }); + } // 7. Keep process alive tokio::signal::ctrl_c().await?; diff --git a/backend/src/daemon/runtime/service.rs b/backend/src/daemon/runtime/service.rs index 1de9eb3b..97ed1187 100644 --- a/backend/src/daemon/runtime/service.rs +++ b/backend/src/daemon/runtime/service.rs @@ -173,6 +173,7 @@ impl DaemonRuntimeService { has_docker_socket: bool, ) -> Result<()> { let bind_address = self.config_store.get_bind_address().await?; + let mode = self.config_store.get_mode().await?; let daemon_ip = if bind_address == "0.0.0.0" || bind_address == "::" { // If binding to all interfaces, auto-detect the primary IP @@ -192,6 +193,7 @@ impl DaemonRuntimeService { network_id, daemon_ip, daemon_port, + mode, capabilities: DaemonCapabilities { has_docker_socket, interfaced_subnet_ids: Vec::new(), diff --git a/backend/src/daemon/shared/storage.rs b/backend/src/daemon/shared/storage.rs index 58667acd..6c95a94a 100644 --- a/backend/src/daemon/shared/storage.rs +++ b/backend/src/daemon/shared/storage.rs @@ -10,6 +10,8 @@ use std::{path::PathBuf, sync::Arc}; use tokio::sync::RwLock; use uuid::Uuid; +use crate::server::daemons::r#impl::base::DaemonMode; + /// CLI arguments structure (for figment integration) #[derive(Debug)] pub struct CliArgs { @@ -24,6 +26,7 @@ pub struct CliArgs { pub concurrent_scans: Option, pub daemon_api_key: Option, pub docker_proxy: Option, + pub mode: Option, } /// Unified configuration struct that handles both startup and runtime config @@ -48,6 +51,7 @@ pub struct AppConfig { pub host_id: Option, pub daemon_api_key: Option, pub docker_proxy: Option, + pub mode: DaemonMode, } impl Default for AppConfig { @@ -67,6 +71,7 @@ impl Default for AppConfig { daemon_api_key: None, concurrent_scans: 15, docker_proxy: None, + mode: DaemonMode::Push, } } } @@ -127,6 +132,9 @@ impl AppConfig { if let Some(docker_proxy) = cli_args.docker_proxy { figment = figment.merge(("docker_proxy", docker_proxy)); } + if let Some(mode) = cli_args.mode { + figment = figment.merge(("mode", mode)); + } let config: AppConfig = figment .extract() @@ -250,6 +258,11 @@ impl ConfigStore { Ok(config.bind_address.clone()) } + pub async fn get_mode(&self) -> Result { + let config = self.config.read().await; + Ok(config.mode) + } + pub async fn set_network_id(&self, network_id: Uuid) -> Result<()> { let mut config = self.config.write().await; config.network_id = Some(network_id); diff --git a/backend/src/server/daemons/handlers.rs b/backend/src/server/daemons/handlers.rs index 2649af28..9a1bd9e7 100644 --- a/backend/src/server/daemons/handlers.rs +++ b/backend/src/server/daemons/handlers.rs @@ -69,6 +69,7 @@ async fn register_daemon( port: request.daemon_port, capabilities: request.capabilities.clone(), last_seen: Utc::now(), + mode: request.mode, }); daemon.id = request.daemon_id; diff --git a/backend/src/server/daemons/impl/api.rs b/backend/src/server/daemons/impl/api.rs index c1000890..15685262 100644 --- a/backend/src/server/daemons/impl/api.rs +++ b/backend/src/server/daemons/impl/api.rs @@ -4,7 +4,10 @@ use crate::{ daemon::discovery::types::base::{ DiscoveryPhase, DiscoverySessionInfo, DiscoverySessionUpdate, }, - server::{daemons::r#impl::base::Daemon, discovery::r#impl::types::DiscoveryType}, + server::{ + daemons::r#impl::base::{Daemon, DaemonMode}, + discovery::r#impl::types::DiscoveryType, + }, }; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; @@ -26,6 +29,7 @@ pub struct DaemonRegistrationRequest { pub network_id: Uuid, pub daemon_ip: IpAddr, pub daemon_port: u16, + pub mode: DaemonMode, pub capabilities: DaemonCapabilities, } diff --git a/backend/src/server/daemons/impl/base.rs b/backend/src/server/daemons/impl/base.rs index d287a272..72a1374e 100644 --- a/backend/src/server/daemons/impl/base.rs +++ b/backend/src/server/daemons/impl/base.rs @@ -1,7 +1,9 @@ use std::{fmt::Display, net::IpAddr}; use chrono::{DateTime, Utc}; +use clap::ValueEnum; use serde::{Deserialize, Serialize}; +use strum::Display; use uuid::Uuid; use crate::server::daemons::r#impl::api::DaemonCapabilities; @@ -15,6 +17,7 @@ pub struct DaemonBase { pub port: u16, #[serde(default)] pub capabilities: DaemonCapabilities, + pub mode: DaemonMode, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -31,3 +34,12 @@ impl Display for Daemon { write!(f, "{}: {}", self.base.ip, self.id) } } + +#[derive( + Debug, Display, Copy, Clone, Serialize, Deserialize, Default, PartialEq, Eq, ValueEnum, +)] +pub enum DaemonMode { + #[default] + Push, + Pull, +} diff --git a/backend/src/server/daemons/impl/storage.rs b/backend/src/server/daemons/impl/storage.rs index 8d05c1f4..dc8b8f43 100644 --- a/backend/src/server/daemons/impl/storage.rs +++ b/backend/src/server/daemons/impl/storage.rs @@ -7,7 +7,7 @@ use uuid::Uuid; use crate::server::{ daemons::r#impl::{ api::DaemonCapabilities, - base::{Daemon, DaemonBase}, + base::{Daemon, DaemonBase, DaemonMode}, }, shared::storage::traits::{SqlValue, StorableEntity}, }; @@ -63,6 +63,7 @@ impl StorableEntity for Daemon { port, capabilities, last_seen, + mode, }, } = self.clone(); @@ -77,6 +78,7 @@ impl StorableEntity for Daemon { "capabilities", "port", "ip", + "mode", ], vec![ SqlValue::Uuid(id), @@ -88,13 +90,17 @@ impl StorableEntity for Daemon { SqlValue::DaemonCapabilities(capabilities), SqlValue::U16(port), SqlValue::IpAddr(ip), + SqlValue::DaemonMode(mode), ], )) } fn from_row(row: &PgRow) -> Result { let ip: IpAddr = serde_json::from_str(&row.get::("ip")) - .map_err(|e| anyhow::anyhow!("Failed to deserialize IP: {}", e))?; + .map_err(|e| anyhow::anyhow!("Failed to deserialize ip: {}", e))?; + + let mode: DaemonMode = serde_json::from_str(&row.get::("mode")) + .map_err(|e| anyhow::anyhow!("Failed to deserialize mode: {}", e))?; let capabilities: DaemonCapabilities = serde_json::from_value(row.get::("capabilities")) @@ -110,6 +116,7 @@ impl StorableEntity for Daemon { last_seen: row.get("last_seen"), host_id: row.get("host_id"), network_id: row.get("network_id"), + mode, capabilities, }, }) diff --git a/backend/src/server/shared/storage/generic.rs b/backend/src/server/shared/storage/generic.rs index 502c5e27..e20c713d 100644 --- a/backend/src/server/shared/storage/generic.rs +++ b/backend/src/server/shared/storage/generic.rs @@ -86,6 +86,7 @@ where SqlValue::DiscoveryType(v) => query.bind(serde_json::to_value(v)?), SqlValue::Email(v) => query.bind(v.as_str()), SqlValue::UserOrgPermissions(v) => query.bind(serde_json::to_string(v)?), + SqlValue::DaemonMode(v) => query.bind(serde_json::to_string(v)?), SqlValue::OptionBillingPlan(v) => query.bind(serde_json::to_value(v)?), SqlValue::OptionBillingPlanStatus(v) => query.bind(serde_json::to_string(v)?), SqlValue::EdgeStyle(v) => query.bind(v.to_string()), diff --git a/backend/src/server/shared/storage/traits.rs b/backend/src/server/shared/storage/traits.rs index 13c14949..2ee617e8 100644 --- a/backend/src/server/shared/storage/traits.rs +++ b/backend/src/server/shared/storage/traits.rs @@ -10,7 +10,7 @@ use uuid::Uuid; use crate::server::{ billing::types::base::BillingPlan, - daemons::r#impl::api::DaemonCapabilities, + daemons::r#impl::{api::DaemonCapabilities, base::DaemonMode}, discovery::r#impl::types::{DiscoveryType, RunType}, groups::r#impl::types::GroupType, hosts::r#impl::{ @@ -93,4 +93,5 @@ pub enum SqlValue { OptionBillingPlan(Option), OptionBillingPlanStatus(Option), EdgeStyle(EdgeStyle), + DaemonMode(DaemonMode), } diff --git a/backend/src/tests/mod.rs b/backend/src/tests/mod.rs index 5abc1647..647c0a5b 100644 --- a/backend/src/tests/mod.rs +++ b/backend/src/tests/mod.rs @@ -2,7 +2,7 @@ use crate::server::{ config::{AppState, ServerConfig}, daemons::r#impl::{ api::DaemonCapabilities, - base::{Daemon, DaemonBase}, + base::{Daemon, DaemonBase, DaemonMode}, }, groups::r#impl::{ base::{Group, GroupBase}, @@ -159,6 +159,7 @@ pub fn daemon(network_id: &Uuid, host_id: &Uuid) -> Daemon { ip: IpAddr::V4(Ipv4Addr::new(192, 168, 1, 50)), port: 60073, last_seen: Utc::now(), + mode: DaemonMode::Push, capabilities: DaemonCapabilities { has_docker_socket: false, interfaced_subnet_ids: Vec::new(),