updated virtualization data model to only store virtualization info on actual service/host

This commit is contained in:
Maya
2025-10-23 17:57:07 -04:00
parent 9bdb214c28
commit 5c66f2dc13
26 changed files with 253 additions and 237 deletions
@@ -8,8 +8,6 @@ CREATE TABLE IF NOT EXISTS services (
bindings JSONB,
service_definition TEXT NOT NULL,
virtualization JSONB,
vms JSONB,
containers JSONB,
source JSONB NOT NULL,
FOREIGN KEY (host_id) REFERENCES hosts(id) ON DELETE CASCADE
);
+17 -13
View File
@@ -121,6 +121,13 @@ impl DiscoversNetworkedEntities for Discovery<DockerScanDiscovery> {
// Get container info
let containers = self.get_containers_and_summaries().await?;
// Create service for docker daemon
let (_, services) = self.create_docker_daemon_service().await?;
let docker_daemon_service = services
.first()
.ok_or_else(|| anyhow!("Docker daemon service was not created, aborting"))?;
// Combine host interfaces + subnets to get a map of containers to the interfaces they have + subnets those interfaces are for
let containers_interfaces_and_subnets =
self.get_container_interfaces(&containers, &subnets, &mut host_interfaces);
@@ -130,6 +137,7 @@ impl DiscoversNetworkedEntities for Discovery<DockerScanDiscovery> {
cancel.clone(),
containers,
&containers_interfaces_and_subnets,
&docker_daemon_service.id,
)
.await;
@@ -139,16 +147,9 @@ impl DiscoversNetworkedEntities for Discovery<DockerScanDiscovery> {
Err(anyhow::Error::msg(""))
};
let services: Vec<Service> = discovered_hosts_services?
.iter()
.flat_map(|(_, s)| s.clone())
.collect();
self.finish_discovery(discovery_result, cancel.clone())
.await?;
self.create_docker_daemon_service(services).await?;
Ok(())
}
@@ -215,6 +216,7 @@ pub struct ProcessContainerParams<'a> {
pub containers_interfaces_and_subnets: &'a HashMap<String, Vec<(Interface, Subnet)>>,
pub container: &'a ContainerInspectResponse,
pub container_summary: &'a ContainerSummary,
pub docker_service_id: &'a Uuid,
pub scanned_count: Arc<AtomicUsize>,
pub discovered_count: Arc<AtomicUsize>,
pub cancel: CancellationToken,
@@ -235,7 +237,7 @@ impl Discovery<DockerScanDiscovery> {
/// Create docker daemon service which has all discovered containers in containers field
/// Create netvisor daemon service which has container relationship with docker daemon service
pub async fn create_docker_daemon_service(&self, services: Vec<Service>) -> Result<(), Error> {
pub async fn create_docker_daemon_service(&self) -> Result<(Host, Vec<Service>), Error> {
let daemon_id = self.as_ref().config_store.get_id().await?;
let network_id = self
.as_ref()
@@ -255,8 +257,6 @@ impl Discovery<DockerScanDiscovery> {
host_id,
network_id,
virtualization: None,
vms: vec![],
containers: services.iter().map(|s| s.id).collect(),
source: EntitySource::DiscoveryWithMatch {
metadata: vec![DiscoveryMetadata::new(DiscoveryType::SelfReport, daemon_id)],
details: MatchDetails::new_certain("Docker daemon self-report"),
@@ -272,9 +272,7 @@ impl Discovery<DockerScanDiscovery> {
temp_docker_daemon_host.base.services = vec![docker_service.id];
self.create_host(temp_docker_daemon_host, vec![docker_service])
.await?;
Ok(())
.await
}
async fn scan_and_process_containers(
@@ -282,6 +280,7 @@ impl Discovery<DockerScanDiscovery> {
cancel: CancellationToken,
containers: Vec<(ContainerInspectResponse, ContainerSummary)>,
containers_interfaces_and_subnets: &HashMap<String, Vec<(Interface, Subnet)>>,
docker_service_id: &Uuid,
) -> Result<Vec<(Host, Vec<Service>)>> {
let session = self.as_ref().get_session().await?;
let scanned_count = session.scanned_count.clone();
@@ -304,6 +303,7 @@ impl Discovery<DockerScanDiscovery> {
containers_interfaces_and_subnets,
container: &container,
container_summary: &container_summary,
docker_service_id,
scanned_count,
discovered_count,
cancel,
@@ -394,6 +394,7 @@ impl Discovery<DockerScanDiscovery> {
scanned_count,
discovered_count,
cancel,
docker_service_id,
..
} = params;
@@ -443,6 +444,7 @@ impl Discovery<DockerScanDiscovery> {
.clone()
.map(|n| n.trim_start_matches("/").to_string()),
container_id: container.id.clone(),
service_id: **docker_service_id,
})),
};
@@ -477,6 +479,7 @@ impl Discovery<DockerScanDiscovery> {
scanned_count,
discovered_count,
cancel,
docker_service_id,
} = params;
tracing::info!(
@@ -542,6 +545,7 @@ impl Discovery<DockerScanDiscovery> {
.clone()
.map(|n| n.trim_start_matches("/").to_string()),
container_id: container.id.clone(),
service_id: **docker_service_id,
},
)),
},
@@ -175,8 +175,6 @@ impl Discovery<SelfReportDiscovery> {
.collect(),
host_id: host.id,
virtualization: None,
vms: vec![],
containers: vec![],
source: EntitySource::DiscoveryWithMatch {
metadata: vec![DiscoveryMetadata::new(DiscoveryType::SelfReport, daemon_id)],
details: MatchDetails::new_certain("NetVisor Daemon self-report"),
-21
View File
@@ -428,11 +428,6 @@ impl HostService {
.await?
.ok_or_else(|| anyhow::anyhow!("Host {} not found", id))?;
let mut all_services = self
.service_service
.get_all_services(&host.base.network_id)
.await?;
let lock = self.get_host_lock(id).await;
let _guard = lock.lock().await;
@@ -442,22 +437,6 @@ impl HostService {
}
}
let vm_update_futures = all_services.iter_mut().filter_map(|s| {
if s.base.vms.contains(id) {
s.base.vms = s
.base
.vms
.clone()
.into_iter()
.filter(|h_id| h_id != id)
.collect();
return Some(self.service_service.update_service(s.clone()));
}
None
});
try_join_all(vm_update_futures).await?;
self.storage.delete(id).await?;
tracing::info!(
"Deleted host {}: {}; deleted service + associated subnet/group bindings: {}",
@@ -1,6 +1,7 @@
use serde::{Deserialize, Serialize};
use std::hash::Hash;
use strum_macros::IntoStaticStr;
use uuid::Uuid;
use validator::Validate;
use crate::server::shared::{
@@ -9,7 +10,7 @@ use crate::server::shared::{
};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, IntoStaticStr)]
#[serde(tag = "type", content = "config")]
#[serde(tag = "type", content = "details")]
pub enum HostVirtualization {
Proxmox(ProxmoxVirtualization),
}
@@ -18,6 +19,7 @@ pub enum HostVirtualization {
pub struct ProxmoxVirtualization {
pub vm_name: Option<String>,
pub vm_id: Option<String>,
pub service_id: Uuid,
}
impl HasId for HostVirtualization {
-18
View File
@@ -421,26 +421,8 @@ impl ServiceService {
.await?
.ok_or_else(|| anyhow::anyhow!("Service {} not found", id))?;
let mut all_services = self.get_all_services(&service.base.network_id).await?;
self.update_group_service_bindings(&service, None).await?;
let container_update_futures = all_services.iter_mut().filter_map(|s| {
if s.base.containers.contains(id) {
s.base.containers = s
.base
.containers
.clone()
.into_iter()
.filter(|s_id| s_id != id)
.collect();
return Some(self.update_service(s.clone()));
}
None
});
try_join_all(container_update_futures).await?;
self.storage.delete(id).await?;
tracing::info!(
"Deleted service {}: {} for host {}",
+3 -18
View File
@@ -39,16 +39,14 @@ impl ServiceStorage for PostgresServiceStorage {
let service_def_str = serde_json::to_string(&service.base.service_definition)?;
let bindings_str = serde_json::to_value(&service.base.bindings)?;
let virtualization_str = serde_json::to_value(&service.base.virtualization)?;
let vms_str = serde_json::to_value(&service.base.vms)?;
let containers_str = serde_json::to_value(&service.base.containers)?;
let source_str = serde_json::to_value(&service.base.source)?;
sqlx::query(
r#"
INSERT INTO services (
id, name, host_id, service_definition, bindings, virtualization,
vms, containers, source, created_at, updated_at, network_id
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
source, created_at, updated_at, network_id
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
"#,
)
.bind(service.id)
@@ -57,8 +55,6 @@ impl ServiceStorage for PostgresServiceStorage {
.bind(service_def_str)
.bind(bindings_str)
.bind(virtualization_str)
.bind(vms_str)
.bind(containers_str)
.bind(source_str)
.bind(service.created_at)
.bind(service.updated_at)
@@ -114,14 +110,12 @@ impl ServiceStorage for PostgresServiceStorage {
let service_def_str = serde_json::to_string(&service.base.service_definition)?;
let bindings_str = serde_json::to_value(&service.base.bindings)?;
let virtualization_str = serde_json::to_value(&service.base.virtualization)?;
let vms_str = serde_json::to_value(&service.base.vms)?;
let containers_str = serde_json::to_value(&service.base.containers)?;
let source_str = serde_json::to_value(&service.base.source)?;
sqlx::query(
r#"
UPDATE services SET
name = $2, host_id = $3, service_definition = $4, bindings = $5, virtualization = $6, vms = $7, containers = $8, source = $9, updated_at = $10
name = $2, host_id = $3, service_definition = $4, bindings = $5, virtualization = $6, source = $7, updated_at = $8
WHERE id = $1
"#,
)
@@ -131,8 +125,6 @@ impl ServiceStorage for PostgresServiceStorage {
.bind(service_def_str)
.bind(bindings_str)
.bind(virtualization_str)
.bind(vms_str)
.bind(containers_str)
.bind(source_str)
.bind(service.updated_at)
.execute(&self.pool)
@@ -162,11 +154,6 @@ fn row_to_service(row: sqlx::postgres::PgRow) -> Result<Service, Error> {
let virtualization: Option<ServiceVirtualization> =
serde_json::from_value(row.get::<serde_json::Value, _>("virtualization"))
.or(Err(Error::msg("Failed to deserialize virtualization")))?;
let vms: Vec<Uuid> = serde_json::from_value(row.get::<serde_json::Value, _>("vms"))
.or(Err(Error::msg("Failed to deserialize vms")))?;
let containers: Vec<Uuid> =
serde_json::from_value(row.get::<serde_json::Value, _>("containers"))
.or(Err(Error::msg("Failed to deserialize containers")))?;
let source: EntitySource = serde_json::from_value(row.get::<serde_json::Value, _>("source"))
.or(Err(Error::msg("Failed to deserialize source")))?;
@@ -180,8 +167,6 @@ fn row_to_service(row: sqlx::postgres::PgRow) -> Result<Service, Error> {
host_id: row.get("host_id"),
service_definition,
virtualization,
vms,
containers,
bindings,
source,
},
+1 -23
View File
@@ -110,16 +110,9 @@ async fn test_service_deletion_cleans_up_relationships() {
);
svc.base.bindings = vec![binding];
let mut svc_with_containers = service(&network.id, &host_obj.id);
svc_with_containers.base.containers = vec![svc.id];
svc_with_containers.base.name = "Service with Containers".to_string();
services
.host_service
.create_host_with_services(
host_obj.clone(),
vec![svc.clone(), svc_with_containers.clone()],
)
.create_host_with_services(host_obj.clone(), vec![svc.clone()])
.await
.unwrap();
@@ -129,12 +122,6 @@ async fn test_service_deletion_cleans_up_relationships() {
.await
.unwrap()
.unwrap();
let created_svc_with_containers = services
.service_service
.get_service(&svc_with_containers.id)
.await
.unwrap()
.unwrap();
let mut group_obj = group(&network.id);
group_obj.base.service_bindings = vec![created_svc.base.bindings[0].id()];
@@ -160,13 +147,4 @@ async fn test_service_deletion_cleans_up_relationships() {
.unwrap();
assert!(group_after.base.service_bindings.is_empty());
let container_svc_after = services
.service_service
.get_service(&created_svc_with_containers.id)
.await
.unwrap()
.unwrap();
assert!(container_svc_after.base.containers.is_empty())
}
-10
View File
@@ -26,10 +26,6 @@ pub struct ServiceBase {
pub name: String,
pub bindings: Vec<Binding>,
pub virtualization: Option<ServiceVirtualization>,
/// Host IDs that are VMs managed by service
pub vms: Vec<Uuid>,
/// Service IDs that are VMs managed by service
pub containers: Vec<Uuid>,
pub source: EntitySource,
}
@@ -42,8 +38,6 @@ impl Default for ServiceBase {
name: String::new(),
bindings: Vec::new(),
virtualization: None,
vms: vec![],
containers: vec![],
source: EntitySource::Unknown,
}
}
@@ -247,8 +241,6 @@ impl Service {
name,
bindings: vec![Binding::new_l3(interface.id)],
virtualization: virtualization.clone(),
vms: vec![],
containers: vec![],
source: EntitySource::DiscoveryWithMatch {
metadata: vec![discovery_metadata],
details: result.details.clone(),
@@ -277,8 +269,6 @@ impl Service {
.iter()
.map(|p| Binding::new_l4(p.id, Some(interface.id)))
.collect(),
vms: vec![],
containers: vec![],
source: EntitySource::DiscoveryWithMatch {
metadata: vec![discovery_metadata],
details: result.details.clone(),
@@ -576,11 +576,9 @@ impl Pattern<'_> {
mod tests {
use std::net::IpAddr;
use crate::server::discovery::types::base::DiscoveryType;
use crate::server::services::types::base::Service;
use crate::server::{
discovery::types::base::DiscoveryType,
services::types::virtualization::ServiceVirtualization,
};
use crate::server::services::types::virtualization::ServiceVirtualization;
use crate::tests::{network, user};
use serial_test::serial;
use uuid::Uuid;
@@ -1,6 +1,7 @@
use serde::{Deserialize, Serialize};
use std::hash::Hash;
use strum_macros::IntoStaticStr;
use uuid::Uuid;
use validator::Validate;
use crate::server::shared::{
@@ -18,6 +19,7 @@ pub enum ServiceVirtualization {
pub struct DockerVirtualization {
pub container_name: Option<String>,
pub container_id: Option<String>,
pub service_id: Uuid,
}
impl HasId for ServiceVirtualization {
@@ -100,8 +100,6 @@ pub fn create_remote_host(remote_subnet: &Subnet, network_id: Uuid) -> (Host, Se
service_definition: Box::new(Client),
bindings: vec![binding],
virtualization: None,
vms: vec![],
containers: vec![],
source: EntitySource::System,
});
@@ -143,8 +141,6 @@ pub fn create_internet_connectivity_host(
service_definition: Box::new(WebService),
bindings: vec![binding],
virtualization: None,
vms: vec![],
containers: vec![],
source: EntitySource::System,
});
@@ -184,8 +180,6 @@ pub fn create_public_dns_host(internet_subnet: &Subnet, network_id: Uuid) -> (Ho
service_definition: Box::new(DnsServer),
bindings: vec![binding],
virtualization: None,
vms: vec![],
containers: vec![],
source: EntitySource::System,
});
+24 -6
View File
@@ -2,8 +2,8 @@ use uuid::Uuid;
use crate::server::{
groups::types::Group,
hosts::types::{base::Host, interfaces::Interface},
services::types::base::Service,
hosts::types::{base::Host, interfaces::Interface, virtualization::HostVirtualization},
services::types::{base::Service, virtualization::ServiceVirtualization},
subnets::types::base::Subnet,
topology::types::{
edges::Edge,
@@ -96,13 +96,31 @@ impl<'a> TopologyContext<'a> {
}
pub fn get_host_is_virtualized_by(&self, host_id: &Uuid) -> Option<&Service> {
self.services.iter().find(|s| s.base.vms.contains(host_id))
if let Some(host) = self.get_host_by_id(*host_id) {
if let Some(HostVirtualization::Proxmox(proxmox_virtualization)) =
&host.base.virtualization
{
return self
.services
.iter()
.find(|s| s.id == proxmox_virtualization.service_id);
}
}
None
}
pub fn get_service_is_containerized_by(&self, service_id: &Uuid) -> Option<&Service> {
self.services
.iter()
.find(|s| s.base.containers.contains(service_id))
if let Some(service) = self.get_service_by_id(*service_id) {
if let Some(ServiceVirtualization::Docker(docker_virtualization)) =
&service.base.virtualization
{
return self
.services
.iter()
.find(|s| s.id == docker_virtualization.service_id);
}
}
None
}
pub fn get_interfaces_with_infra_service(&self, subnet: &Subnet) -> Vec<Option<Uuid>> {
@@ -1,12 +1,16 @@
use itertools::Itertools;
use petgraph::{graph::NodeIndex, Graph};
use std::collections::HashMap;
use uuid::Uuid;
use crate::server::topology::{
service::context::TopologyContext,
types::{
edges::{Edge, EdgeHandle, EdgeType},
nodes::Node,
use crate::server::{
services::types::virtualization::ServiceVirtualization,
topology::{
service::context::TopologyContext,
types::{
edges::{Edge, EdgeHandle, EdgeType},
nodes::Node,
},
},
};
@@ -73,10 +77,30 @@ impl EdgeBuilder {
// Host id to subnet id that will be used for grouping, if enabled
let mut docker_bridge_host_subnet_id_to_group_on: HashMap<Uuid, Uuid> = HashMap::new();
let mut docker_service_to_containerized_service_ids: HashMap<Uuid, Vec<Uuid>> =
HashMap::new();
ctx.services.iter().for_each(|s| {
if let Some(ServiceVirtualization::Docker(docker_virtualization)) =
&s.base.virtualization
{
let entry = docker_service_to_containerized_service_ids
.entry(docker_virtualization.service_id)
.or_default();
if !entry.contains(&s.id) {
entry.push(s.id);
}
}
});
let edges = ctx
.services
.iter()
.filter(|s| !s.base.containers.is_empty())
.filter(|s| {
docker_service_to_containerized_service_ids
.keys()
.contains(&s.id)
})
.filter_map(|s| {
let host = ctx.get_host_by_id(s.base.host_id)?;
let origin_interface = host.base.interfaces.first()?;
@@ -136,9 +160,9 @@ impl EdgeBuilder {
}
}
} else {
return s
.base
.containers
return docker_service_to_containerized_service_ids
.get(&s.id)
.unwrap_or(&Vec::new())
.iter()
.filter_map(move |cs| {
let containerized = ctx.get_service_by_id(*cs)?;
-2
View File
@@ -122,8 +122,6 @@ pub fn service(network_id: &Uuid, host_id: &Uuid) -> Service {
network_id: *network_id,
service_definition: service_def,
virtualization: None,
vms: vec![],
containers: vec![],
source: EntitySource::System,
})
}