add hetzner source

This commit is contained in:
dedys
2026-02-13 09:07:26 +02:00
parent 34f910904f
commit ab0b5837e1
10 changed files with 325 additions and 1 deletions
+12
View File
@@ -0,0 +1,12 @@
# Source: hetzner
## Setup
You need to have a source section in your `settings.ini` file with following type:
```ini
type = hetzner
```
### Hetzner api
You need to create a "Read-only" api_token
https://docs.hetzner.com/cloud/api/getting-started/generating-api-token/
+2 -1
View File
@@ -9,6 +9,7 @@
# define all available sources here
from module.sources.vmware.connection import VMWareHandler
from module.sources.hetzner.connection import HetznerHandler
from module.sources.check_redfish.import_inventory import CheckRedfish
from module.common.logging import get_logger
@@ -18,7 +19,7 @@ from module.config.base import ConfigOptions
from module.config import source_config_section_name
# list of valid sources
valid_sources = [VMWareHandler, CheckRedfish]
valid_sources = [VMWareHandler, CheckRedfish, HetznerHandler]
def validate_source(source_class_object=None, state="pre"):
+1
View File
@@ -0,0 +1 @@
from module.sources.hetzner.connection import HetznerHandler
+10
View File
@@ -0,0 +1,10 @@
from hcloud import Client
class HetznerClient:
def __init__(self, token):
self.client = Client(token=token)
def get_servers(self):
return self.client.servers.get_all()
+17
View File
@@ -0,0 +1,17 @@
from module.config import source_config_section_name
from module.config.base import ConfigBase
from module.config.option import ConfigOption
class HetznerConfig(ConfigBase):
section_name = source_config_section_name
def __init__(self):
self.options = [
ConfigOption("enabled", bool, default_value=True),
ConfigOption("type", str),
ConfigOption("api_token", str, mandatory=True),
]
super().__init__()
+140
View File
@@ -0,0 +1,140 @@
from module.common.logging import get_logger
from module.sources.common.source_base import SourceBase
from module.sources.hetzner.client import HetznerClient
from module.sources.hetzner.config import HetznerConfig
from module.sources.hetzner.network import sync_vm_network
from module.sources.hetzner.disk import sync_vm_disks
from module.netbox.inventory import (
NetBoxInventory,
NBVM,
NBSite,
NBCluster,
NBClusterType,
NBVMInterface,
NBIPAddress,
NBVirtualDisk,
)
class HetznerHandler(SourceBase):
source_type = "hetzner"
source_tag = "hetzner"
settings = HetznerConfig()
dependent_netbox_objects = [
NBVM,
NBCluster,
NBSite,
NBClusterType,
NBIPAddress,
NBVirtualDisk,
NBVMInterface,
]
def __init__(self, name=None):
if name is None:
raise ValueError(f"Invalid value for attribute 'name': '{name}'.")
self.inventory = NetBoxInventory()
self.name = name
self.log = get_logger()
settings_handler = HetznerConfig()
settings_handler.source_name = self.name
self.settings = settings_handler.parse()
self.set_source_tag()
if self.settings.enabled is False:
log.info(f"Source '{name}' is currently disabled. Skipping")
return
self.init_successful = True
@classmethod
def implements(cls, source_type):
return source_type == "hetzner"
def apply(self):
token = self.settings.api_token
self.log.error(f"TOKEN DEBUG >>> {repr(token)}")
if not token:
self.log.error("Hetzner api_token not defined in settings.ini")
return
self.client = HetznerClient(token=token)
servers = self.client.get_servers()
self.log.info(f"Connected to Hetzner, found {len(servers)} servers")
# ---------------------------
# main object
# ---------------------------
site = self.inventory.add_update_object(
NBSite,
data={"name": "cloud"},
source=self,
)
cluster_type = self.inventory.add_update_object(
NBClusterType,
data={"name": "cloud"},
source=self,
)
cluster_name = f"Hetzner: {self.name}"
cluster = self.inventory.add_update_object(
NBCluster,
data={
"name": cluster_name,
"type": cluster_type,
"scope_type": 17,
"scope_id": site,
},
source=self,
)
# ---------------------------
# servers loop
# ---------------------------
for server in servers:
# -------- VM --------
vm = self.inventory.add_update_object(
NBVM,
data={
"name": server.name,
"status": "active",
"cluster": cluster,
"site": site,
},
source=self,
)
# -------- interfaces --------
sync_vm_network(self, vm, server)
# -------- disks --------
sync_vm_disks(self, vm, server)
+42
View File
@@ -0,0 +1,42 @@
from module.netbox.inventory import NBVirtualDisk
def sync_vm_disks(handler, vm, server):
"""
Sync Hetzner volumes → NetBox virtual disks
"""
inventory = handler.inventory
if not server.volumes:
return
for volume in server.volumes:
disk_name = f"{server.name}-{volume.name}"[:60]
size_mb = int(volume.size) * 1024 # Hetzner size = GB
disk_data = {
"name": disk_name,
"virtual_machine": vm, # object, не id
"size": size_mb,
}
existing_disk = None
for disk in inventory.get_all_items(NBVirtualDisk):
if (
disk.data.get("name") == disk_name
and disk.data.get("virtual_machine") == vm
):
existing_disk = disk
break
if existing_disk is None:
inventory.add_object(
NBVirtualDisk,
data=disk_data,
source=handler,
)
else:
existing_disk.update(disk_data, source=handler)
+93
View File
@@ -0,0 +1,93 @@
from module.netbox.inventory import NBVMInterface, NBIPAddress
def sync_vm_network(handler, vm, server):
"""
Create interfaces + assign IPs for Hetzner VM
"""
inventory = handler.inventory
interfaces = []
# -----------------------
# interfaces
# -----------------------
# public → eth0
if server.public_net and server.public_net.ipv4:
iface = inventory.add_update_object(
NBVMInterface,
data={
"name": "eth0",
"virtual_machine": vm,
"enabled": True,
},
source=handler,
)
interfaces.append(iface)
# private → ethX
if server.private_net:
start_index = 1 if len(interfaces) > 0 else 0
for idx, net in enumerate(server.private_net, start=start_index):
iface = inventory.add_update_object(
NBVMInterface,
data={
"name": f"eth{idx}",
"virtual_machine": vm,
"enabled": True,
},
source=handler,
)
interfaces.append(iface)
# -----------------------
# IP assignment
# -----------------------
# public ip
if server.public_net and server.public_net.ipv4 and len(interfaces) >= 1:
ip_addr = server.public_net.ipv4.ip
if "/" not in ip_addr:
ip_addr += "/32"
assign_ip(inventory, handler, ip_addr, interfaces[0])
# private ips
if server.private_net:
private_start_index = 1 if (server.public_net and server.public_net.ipv4) else 0
for idx, net in enumerate(server.private_net, start=private_start_index):
if len(interfaces) <= idx:
continue
ip_addr = net.ip
if "/" not in ip_addr:
ip_addr += "/32"
assign_ip(inventory, handler, ip_addr, interfaces[idx])
def assign_ip(inventory, handler, ip_addr, interface):
"""
Safe IP assign without duplicates
"""
ip_data = {
"address": ip_addr,
"assigned_object_type": "virtualization.vminterface",
"assigned_object_id": interface,
}
existing_ip = next(
(ip for ip in inventory.get_all_items(NBIPAddress)
if ip.data.get("address") == ip_addr),
None
)
if existing_ip is None:
inventory.add_object(NBIPAddress, data=ip_data, source=handler)
else:
existing_ip.update(ip_data, source=handler)
+1
View File
@@ -6,3 +6,4 @@ pyvmomi==8.0.2.0.1
aiodns==3.0.0
pycares==4.0.0
pyyaml==6.0.1
hcloud
+7
View File
@@ -450,4 +450,11 @@ inventory_file_path = /full/path/to/inventory/files
; If the device has a tenant then this one will be used. If not, the prefix tenant will be used if defined
;ip_tenant_inheritance_order = device, prefix
[source/hetzner]
; Defines if this source is enabled or not
;enabled = True
; type of source. This defines which source handler to use
;type = hetzner
;api_token =
;EOF