feat: daemon mode foundations

This commit is contained in:
Maya
2025-11-16 20:37:29 -05:00
parent c3ae270abe
commit 7d7ba2fbab
10 changed files with 78 additions and 17 deletions

View File

@@ -1,12 +1,15 @@
use axum::{Router, http::Method};
use clap::Parser;
use netvisor::daemon::{
runtime::types::DaemonAppState,
shared::{
handlers::create_router,
storage::{AppConfig, CliArgs, ConfigStore},
use netvisor::{
daemon::{
runtime::types::DaemonAppState,
shared::{
handlers::create_router,
storage::{AppConfig, CliArgs, ConfigStore},
},
utils::base::{DaemonUtils, PlatformDaemonUtils},
},
utils::base::{DaemonUtils, PlatformDaemonUtils},
server::daemons::r#impl::base::DaemonMode,
};
use std::sync::Arc;
use tower::ServiceBuilder;
@@ -68,6 +71,9 @@ struct Cli {
/// Docker socket proxy
#[arg(long)]
docker_proxy: Option<String>,
#[arg(long)]
mode: Option<DaemonMode>,
}
impl From<Cli> for CliArgs {
@@ -84,6 +90,7 @@ impl From<Cli> for CliArgs {
concurrent_scans: cli.concurrent_scans,
daemon_api_key: cli.daemon_api_key,
docker_proxy: cli.docker_proxy,
mode: cli.mode,
}
}
}
@@ -118,6 +125,7 @@ async fn main() -> anyhow::Result<()> {
let server_addr = &config_store.get_server_endpoint().await?;
let network_id = &config_store.get_network_id().await?;
let api_key = &config_store.get_api_key().await?;
let mode = &config_store.get_mode().await?;
let state = DaemonAppState::new(config_store, utils).await?;
let runtime_service = state.services.runtime_service.clone();
@@ -164,12 +172,23 @@ async fn main() -> anyhow::Result<()> {
tracing::info!("Missing network ID - waiting for server to hit /api/initialize...");
}
// Spawn heartbeat task in background
tokio::spawn(async move {
if let Err(e) = runtime_service.heartbeat().await {
tracing::warn!("Failed to update heartbeat timestamp: {}", e);
}
});
if *mode == DaemonMode::Push {
tracing::info!("Daemon running in Push mode");
// Spawn heartbeat task in background
tokio::spawn(async move {
if let Err(e) = runtime_service.heartbeat().await {
tracing::warn!("Failed to update heartbeat timestamp: {}", e);
}
});
} else {
tracing::info!("Daemon running in Pull mode");
// Spawn request work in background
tokio::spawn(async move {
if let Err(e) = runtime_service.heartbeat().await {
tracing::warn!("Failed to update heartbeat timestamp: {}", e);
}
});
}
// 7. Keep process alive
tokio::signal::ctrl_c().await?;

View File

@@ -173,6 +173,7 @@ impl DaemonRuntimeService {
has_docker_socket: bool,
) -> Result<()> {
let bind_address = self.config_store.get_bind_address().await?;
let mode = self.config_store.get_mode().await?;
let daemon_ip = if bind_address == "0.0.0.0" || bind_address == "::" {
// If binding to all interfaces, auto-detect the primary IP
@@ -192,6 +193,7 @@ impl DaemonRuntimeService {
network_id,
daemon_ip,
daemon_port,
mode,
capabilities: DaemonCapabilities {
has_docker_socket,
interfaced_subnet_ids: Vec::new(),

View File

@@ -10,6 +10,8 @@ use std::{path::PathBuf, sync::Arc};
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::server::daemons::r#impl::base::DaemonMode;
/// CLI arguments structure (for figment integration)
#[derive(Debug)]
pub struct CliArgs {
@@ -24,6 +26,7 @@ pub struct CliArgs {
pub concurrent_scans: Option<usize>,
pub daemon_api_key: Option<String>,
pub docker_proxy: Option<String>,
pub mode: Option<DaemonMode>,
}
/// Unified configuration struct that handles both startup and runtime config
@@ -48,6 +51,7 @@ pub struct AppConfig {
pub host_id: Option<Uuid>,
pub daemon_api_key: Option<String>,
pub docker_proxy: Option<String>,
pub mode: DaemonMode,
}
impl Default for AppConfig {
@@ -67,6 +71,7 @@ impl Default for AppConfig {
daemon_api_key: None,
concurrent_scans: 15,
docker_proxy: None,
mode: DaemonMode::Push,
}
}
}
@@ -127,6 +132,9 @@ impl AppConfig {
if let Some(docker_proxy) = cli_args.docker_proxy {
figment = figment.merge(("docker_proxy", docker_proxy));
}
if let Some(mode) = cli_args.mode {
figment = figment.merge(("mode", mode));
}
let config: AppConfig = figment
.extract()
@@ -250,6 +258,11 @@ impl ConfigStore {
Ok(config.bind_address.clone())
}
pub async fn get_mode(&self) -> Result<DaemonMode> {
let config = self.config.read().await;
Ok(config.mode)
}
pub async fn set_network_id(&self, network_id: Uuid) -> Result<()> {
let mut config = self.config.write().await;
config.network_id = Some(network_id);

View File

@@ -69,6 +69,7 @@ async fn register_daemon(
port: request.daemon_port,
capabilities: request.capabilities.clone(),
last_seen: Utc::now(),
mode: request.mode,
});
daemon.id = request.daemon_id;

View File

@@ -4,7 +4,10 @@ use crate::{
daemon::discovery::types::base::{
DiscoveryPhase, DiscoverySessionInfo, DiscoverySessionUpdate,
},
server::{daemons::r#impl::base::Daemon, discovery::r#impl::types::DiscoveryType},
server::{
daemons::r#impl::base::{Daemon, DaemonMode},
discovery::r#impl::types::DiscoveryType,
},
};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
@@ -26,6 +29,7 @@ pub struct DaemonRegistrationRequest {
pub network_id: Uuid,
pub daemon_ip: IpAddr,
pub daemon_port: u16,
pub mode: DaemonMode,
pub capabilities: DaemonCapabilities,
}

View File

@@ -1,7 +1,9 @@
use std::{fmt::Display, net::IpAddr};
use chrono::{DateTime, Utc};
use clap::ValueEnum;
use serde::{Deserialize, Serialize};
use strum::Display;
use uuid::Uuid;
use crate::server::daemons::r#impl::api::DaemonCapabilities;
@@ -15,6 +17,7 @@ pub struct DaemonBase {
pub port: u16,
#[serde(default)]
pub capabilities: DaemonCapabilities,
pub mode: DaemonMode,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -31,3 +34,12 @@ impl Display for Daemon {
write!(f, "{}: {}", self.base.ip, self.id)
}
}
#[derive(
Debug, Display, Copy, Clone, Serialize, Deserialize, Default, PartialEq, Eq, ValueEnum,
)]
pub enum DaemonMode {
#[default]
Push,
Pull,
}

View File

@@ -7,7 +7,7 @@ use uuid::Uuid;
use crate::server::{
daemons::r#impl::{
api::DaemonCapabilities,
base::{Daemon, DaemonBase},
base::{Daemon, DaemonBase, DaemonMode},
},
shared::storage::traits::{SqlValue, StorableEntity},
};
@@ -63,6 +63,7 @@ impl StorableEntity for Daemon {
port,
capabilities,
last_seen,
mode,
},
} = self.clone();
@@ -77,6 +78,7 @@ impl StorableEntity for Daemon {
"capabilities",
"port",
"ip",
"mode",
],
vec![
SqlValue::Uuid(id),
@@ -88,13 +90,17 @@ impl StorableEntity for Daemon {
SqlValue::DaemonCapabilities(capabilities),
SqlValue::U16(port),
SqlValue::IpAddr(ip),
SqlValue::DaemonMode(mode),
],
))
}
fn from_row(row: &PgRow) -> Result<Self, anyhow::Error> {
let ip: IpAddr = serde_json::from_str(&row.get::<String, _>("ip"))
.map_err(|e| anyhow::anyhow!("Failed to deserialize IP: {}", e))?;
.map_err(|e| anyhow::anyhow!("Failed to deserialize ip: {}", e))?;
let mode: DaemonMode = serde_json::from_str(&row.get::<String, _>("mode"))
.map_err(|e| anyhow::anyhow!("Failed to deserialize mode: {}", e))?;
let capabilities: DaemonCapabilities =
serde_json::from_value(row.get::<serde_json::Value, _>("capabilities"))
@@ -110,6 +116,7 @@ impl StorableEntity for Daemon {
last_seen: row.get("last_seen"),
host_id: row.get("host_id"),
network_id: row.get("network_id"),
mode,
capabilities,
},
})

View File

@@ -86,6 +86,7 @@ where
SqlValue::DiscoveryType(v) => query.bind(serde_json::to_value(v)?),
SqlValue::Email(v) => query.bind(v.as_str()),
SqlValue::UserOrgPermissions(v) => query.bind(serde_json::to_string(v)?),
SqlValue::DaemonMode(v) => query.bind(serde_json::to_string(v)?),
SqlValue::OptionBillingPlan(v) => query.bind(serde_json::to_value(v)?),
SqlValue::OptionBillingPlanStatus(v) => query.bind(serde_json::to_string(v)?),
SqlValue::EdgeStyle(v) => query.bind(v.to_string()),

View File

@@ -10,7 +10,7 @@ use uuid::Uuid;
use crate::server::{
billing::types::base::BillingPlan,
daemons::r#impl::api::DaemonCapabilities,
daemons::r#impl::{api::DaemonCapabilities, base::DaemonMode},
discovery::r#impl::types::{DiscoveryType, RunType},
groups::r#impl::types::GroupType,
hosts::r#impl::{
@@ -93,4 +93,5 @@ pub enum SqlValue {
OptionBillingPlan(Option<BillingPlan>),
OptionBillingPlanStatus(Option<SubscriptionStatus>),
EdgeStyle(EdgeStyle),
DaemonMode(DaemonMode),
}

View File

@@ -2,7 +2,7 @@ use crate::server::{
config::{AppState, ServerConfig},
daemons::r#impl::{
api::DaemonCapabilities,
base::{Daemon, DaemonBase},
base::{Daemon, DaemonBase, DaemonMode},
},
groups::r#impl::{
base::{Group, GroupBase},
@@ -159,6 +159,7 @@ pub fn daemon(network_id: &Uuid, host_id: &Uuid) -> Daemon {
ip: IpAddr::V4(Ipv4Addr::new(192, 168, 1, 50)),
port: 60073,
last_seen: Utc::now(),
mode: DaemonMode::Push,
capabilities: DaemonCapabilities {
has_docker_socket: false,
interfaced_subnet_ids: Vec::new(),