adds all new config parsing to include env vars #227

This commit is contained in:
ricardo.bartels@telekom.de
2023-02-17 21:22:52 +01:00
parent 76e1ac5a1e
commit 6f68c428cd
30 changed files with 2647 additions and 1184 deletions
+89 -8
View File
@@ -38,6 +38,7 @@ This ensures stale objects are removed from NetBox keeping an accurate current s
* pyvmomi==7.0.3
* aiodns==2.0.0
* setuptools>=62.00.0
* pyyaml==6.0
### Environment
* NetBox >= 2.9
@@ -113,8 +114,8 @@ Run the containerized application in a kubernetes cluster
```shell
docker build -t netbox-vsphere-sync .
docker image tag netbox-vsphere-sync your-registry.host/netbox-vsphere-sync:v1.2.0
docker image push your-registry.host/netbox-vsphere-sync:v1.2.0
docker image tag netbox-vsphere-sync your-registry.host/netbox-vsphere-sync:latest
docker image push your-registry.host/netbox-vsphere-sync:latest
kubectl create secret generic netbox-vsphere-sync --from-file=settings.ini
kubectl apply -f netbox-vsphere-sync-cronjob.yaml
@@ -132,7 +133,7 @@ A short description can be found [here](https://netbox.readthedocs.io/en/stable/
# Running the script
```
usage: netbox-sync.py [-h] [-c settings.ini]
usage: netbox-sync.py [-h] [-c settings.ini [settings.ini ...]] [-g]
[-l {DEBUG3,DEBUG2,DEBUG,INFO,WARNING,ERROR}] [-n] [-p]
Sync objects from various sources to NetBox
@@ -140,12 +141,14 @@ Sync objects from various sources to NetBox
Version: 1.3.0 (2022-09-06)
Project URL: https://github.com/bb-ricardo/netbox-sync
optional arguments:
options:
-h, --help show this help message and exit
-c settings.ini, --config settings.ini
-c settings.ini [settings.ini ...], --config settings.ini [settings.ini ...]
points to the config file to read config data from
which is not installed under the default path
'./settings.ini'
-g, --generate_config
generates default config file.
-l {DEBUG3,DEBUG2,DEBUG,INFO,WARNING,ERROR}, --log_level {DEBUG3,DEBUG2,DEBUG,INFO,WARNING,ERROR}
set log level (overrides config)
-n, --dry_run Operate as usual but don't change anything in NetBox.
@@ -160,9 +163,87 @@ optional arguments:
It is recommended to set log level to `DEBUG2` this way the program should tell you what is happening and why.
Also use the dry run option `-n` at the beginning to avoid changes directly in NetBox.
## Setup
Copy the [settings-example.ini](https://github.com/bb-Ricardo/netbox-sync/blob/main/settings-example.ini) sample settings file to `settings.ini`.
All options are described in the example file.
## Configuration
There are two ways to define configuration. Any combination of config file(s) and environment variables is possible.
* config files (the [default config](https://github.com/bb-Ricardo/netbox-sync/blob/main/settings-example.ini) file name is set to `./settings.ini`.)
* environment variables
The config from the environment variables will have precedence over the config file definitions.
### Config files
Following config file types are supported:
* ini
* yaml
There is also more than one config file permitted. Example (config file names are also just examples):
```bash
/opt/netbox-sync/netbox-sync.py -c common.ini all-sources.yaml additional-config.yaml
```
All files are parsed in order of the definition and options will overwrite the same options if defined in a
previous config file.
To get config file examples which include descriptions and all default values, the `-g` can be used:
```bash
# this will create an ini example
/opt/netbox-sync/netbox-sync.py -g -c settings-example.ini
# and this will create an example config file in yaml format
/opt/netbox-sync/netbox-sync.py -g -c settings-example.yaml
```
### Environment variables
Each setting which can be defined in a config file can also be defined using an environment variable.
The prefix for all environment variables to be used in netbox-sync is: `NBS`
For configuration in the `common` and `netbox` section a variable is defined like this
```
<PREFIX>_<SECTION_NAME>_<CONFIG_OPTION_KEY>=value
```
Following example represents the same configuration:
```yaml
# yaml config example
common:
log_level: DEBUG2
netbox:
host_fqdn: netbox-host.example.com
prune_enabled: true
```
```bash
# this variable definition is equal to the yaml config sample above
NBS_COMMON_LOG_LEVEL="DEBUG2"
NBS_netbox_host_fqdn="netbox-host.example.com"
NBS_NETBOX_PRUNE_ENABLED="true"
```
This way it is possible to expose for example the `NBS_NETBOX_API_KEY` only via an env variable.
The config definitions for `sources` need to be defined using an index. Following conditions apply:
* a single source needs to use the same index
* the index can be number or a name (but contain any special characters to support env var parsing)
* the source needs to be named with `_NAME` variable
Example of defining a source with config and environment variables.
```ini
; example for a source
[source/example-vcenter]
enabled = True
type = vmware
host_fqdn = vcenter.example.com
username = vcenter-readonly
```
```bash
# define the password on command line
# here we use '1' as index
NBS_SOURCE_1_NAME="example-vcenter"
NBS_SOURCE_1_PASSWORD="super-secret-and-not-saved-to-the-config-file"
NBS_SOURCE_1_custom_dns_servers="10.0.23.23, 10.0.42.42"
```
Even to just define one source variable like `NBS_SOURCE_1_PASSWORD` the `NBS_SOURCE_1_NAME` needs to be defined as
to associate to the according source definition.
## Cron job
In Order to sync all items regularly you can add a cron job like this one
+7
View File
@@ -6,3 +6,10 @@
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
__version__ = "1.3.0"
__version_date__ = "2022-09-06"
__author__ = "Ricardo Bartels <ricardo.bartels@telekom.de>"
__description__ = "NetBox Sync"
__license__ = "MIT"
__url__ = "https://github.com/bb-ricardo/netbox-sync"
+23 -18
View File
@@ -12,24 +12,18 @@ import os
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from module.common.logging import valid_log_levels
from module.config import default_config_file_path
from module import __version__, __version_date__, __url__
def parse_command_line(version=None, self_description=None, version_date=None, url=None, default_config_file_path=None):
def parse_command_line(self_description=None):
"""
parse command line arguments, also add current version and version date to description
Parameters
----------
version: str
version of this program
self_description: str
short self description of this program
version_date: str
release date of this version
url: str
project url
default_config_file_path: str
path to default config file
Returns
-------
@@ -37,19 +31,21 @@ def parse_command_line(version=None, self_description=None, version_date=None, u
"""
# define command line options
description = f"{self_description}\nVersion: {version} ({version_date})\nProject URL: {url}"
description = f"{self_description}\nVersion: {__version__} ({__version_date__})\nProject URL: {__url__}"
parser = ArgumentParser(
description=description,
formatter_class=RawDescriptionHelpFormatter)
parser.add_argument("-c", "--config", default=default_config_file_path, dest="config_file",
help="points to the config file to read config data from " +
"which is not installed under the default path '" +
default_config_file_path + "'",
metavar="settings.ini")
parser.add_argument("-c", "--config", default=[], dest="config_files", nargs='+',
help=f"points to the config file to read config data from which is not installed "
f"under the default path '{default_config_file_path}'",
metavar=os.path.basename(default_config_file_path))
parser.add_argument("-l", "--log_level", choices=valid_log_levels, dest="log_level",
parser.add_argument("-g", "--generate_config", action="store_true",
help="generates default config file.")
parser.add_argument("-l", "--log_level", choices=valid_log_levels,
help="set log level (overrides config)")
parser.add_argument("-n", "--dry_run", action="store_true",
@@ -63,8 +59,17 @@ def parse_command_line(version=None, self_description=None, version_date=None, u
args = parser.parse_args()
# fix supplied config file path
if args.config_file != default_config_file_path and args.config_file[0] != os.sep:
args.config_file = os.path.realpath(os.getcwd() + os.sep + args.config_file)
fixed_config_files = list()
for config_file in args.config_files:
if len(config_file) == 0:
continue
if config_file != default_config_file_path and config_file[0] != os.sep:
config_file = os.path.realpath(os.getcwd() + os.sep + config_file)
fixed_config_files.append(config_file)
args.config_files = fixed_config_files
return args
+58
View File
@@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
from module.config.option import ConfigOption
from module.config.base import ConfigBase
from module.config import common_config_section_name
from module.common.logging import log_file_max_rotation, log_file_max_size_in_mb
class CommonConfig(ConfigBase):
"""Controls the parameters for logging
"""
section_name = common_config_section_name
def __init__(self):
self.options = [
ConfigOption("log_level",
str,
description="""\
Logs will always be printed to stdout/stderr.
Logging can be set to following log levels:
ERROR: Fatal Errors which stops regular a run
WARNING: Warning messages won't stop the syncing process but mostly worth
to have a look at.
INFO: Information about objects that will be create/updated/deleted in NetBox
DEBUG: Will log information about retrieved information, changes in internal
data structure and parsed config
DEBUG2: Will also log information about how/why data is parsed or skipped.
DEBUG3: Logs all source and NetBox queries/results to stdout. Very useful for
troubleshooting, but will log any sensitive data contained within a query.
""",
default_value="INFO"),
ConfigOption("log_to_file",
bool,
description="""Enabling this options will write all
logs to a log file defined in 'log_file'
""",
default_value=False),
ConfigOption("log_file",
str,
description=f"""Destination of the log file if "log_to_file" is enabled.
Log file will be rotated maximum {log_file_max_rotation} times once
the log file reaches size of {log_file_max_size_in_mb} MB
""",
default_value="log/netbox_sync.log")
]
super().__init__()
-164
View File
@@ -1,164 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
import configparser
import os
from module.common.misc import do_error_exit
from module.common.logging import get_logger
log = get_logger()
def get_config_file(config_file):
"""
get absolute path to provided config file string
Parameters
----------
config_file: str
config file path
Returns
-------
str: absolute path to config file
"""
if config_file is None or config_file == "":
do_error_exit("ERROR: Config file not defined.")
base_dir = os.sep.join(__file__.split(os.sep)[0:-3])
if config_file[0] != os.sep:
config_file = f"{base_dir}{os.sep}{config_file}"
return os.path.realpath(config_file)
def open_config_file(config_file):
"""
Open config file with a ConfigParser and return handler. Bail out of opening or parsing fails
Parameters
----------
config_file: str
absolute path of config file to open
Returns
-------
ConfigParser: handler with supplied config file
"""
if config_file is None or config_file == "":
do_error_exit("ERROR: Config file not defined.")
# setup config parser and read config
config_handler = configparser.ConfigParser(strict=True, allow_no_value=True,
empty_lines_in_values=False, interpolation=None)
# noinspection PyBroadException
try:
config_handler.read_file(open(config_file))
except configparser.Error as e:
do_error_exit(f"ERROR: Problem while config file parsing: {e}")
# noinspection PyBroadException
except Exception:
do_error_exit(f"ERROR: Unable to open file '{config_file}'")
return config_handler
def get_config(config_handler=None, section=None, valid_settings=None, deprecated_settings=None, removed_settings=None):
"""
read config items from a defined section
Parameters
----------
config_handler: ConfigParser
a config file handler to read config data from
section: str
name of the section to read
valid_settings: dict
a dictionary with valid config items to read from this section.
key: is the config item name
value: default value if config option is undefined
deprecated_settings: dict
a dictionary of deprecated config settings
key: name of deprecated setting
value: name of superseding setting or None if no substitution applies
removed_settings: dict
a dictionary of removed setting names
key: name of removed setting
value: name of superseding setting or None if no substitution applies
Returns
-------
dict: parsed config items from defined $section
"""
def get_config_option(this_section, item, default=None):
if isinstance(default, bool):
value = config_handler.getboolean(this_section, item, fallback=default)
elif isinstance(default, int):
value = config_handler.getint(this_section, item, fallback=default)
else:
value = config_handler.get(this_section, item, fallback=default)
if value == "":
value = None
config_dict[item] = value
# take care of logging sensitive data
for sensitive_item in ["token", "pass"]:
if sensitive_item.lower() in item.lower():
value = value[0:3] + "***"
log.debug(f"Config: {this_section}.{item} = {value}")
config_dict = {}
if valid_settings is None:
log.error("No valid settings passed to config parser!")
# read specified section section
if section is None:
return config_dict
if section not in config_handler.sections():
log.error(f"Section '{section}' not found in config_file")
return config_dict
for config_item, default_value in valid_settings.items():
get_config_option(section, config_item, default=default_value)
# check for deprecated settings
if isinstance(deprecated_settings, dict):
for deprecated_setting, alternative_setting in deprecated_settings.items():
if config_handler.get(section, deprecated_setting, fallback=None) is not None:
log_text = f"Setting '{deprecated_setting}' is deprecated and will be removed soon."
if alternative_setting is not None:
log_text += f" Consider changing your config to use the '{alternative_setting}' setting."
log.warning(log_text)
# check for removed settings
if isinstance(removed_settings, dict):
for removed_setting, alternative_setting in removed_settings.items():
if config_handler.get(section, removed_setting, fallback=None) is not None:
log_text = f"Setting '{removed_setting}' has been removed " \
f"but is still defined in config section '{section}'."
if alternative_setting is not None:
log_text += f" You need to switch to '{alternative_setting}' setting."
log.warning(log_text)
return config_dict
# EOF
+6 -7
View File
@@ -14,7 +14,6 @@ from logging.handlers import RotatingFileHandler
from module.common.misc import do_error_exit
# define DEBUG2 and DEBUG3 log levels
DEBUG2 = 6 # extended messages
DEBUG3 = 3 # extra extended messages
@@ -27,6 +26,9 @@ logging.addLevelName(DEBUG2, "DEBUG2")
# add log level DEBUG3
logging.addLevelName(DEBUG3, "DEBUG3")
log_file_max_size_in_mb = 10
log_file_max_rotation = 5
def debug2(self, message, *args, **kws):
if self.isEnabledFor(DEBUG2):
@@ -72,17 +74,14 @@ def setup_logging(log_level=None, log_file=None):
log handler to use for logging
"""
log_file_max_size_in_mb = 10
log_file_max_rotation = 5
log_format = '%(asctime)s - %(levelname)s: %(message)s'
if log_level is None or log_level == "":
do_error_exit("ERROR: log level undefined or empty. Check config please.")
do_error_exit("log level undefined or empty. Check config please.")
# check set log level against self defined log level array
if not log_level.upper() in valid_log_levels:
do_error_exit(f"ERROR: Invalid log level: {log_level}")
do_error_exit(f"Passed invalid log level: {log_level}")
# check the provided log level
if log_level == "DEBUG2":
@@ -122,7 +121,7 @@ def setup_logging(log_level=None, log_file=None):
backupCount=log_file_max_rotation
)
except Exception as e:
do_error_exit(f"ERROR: Problems setting up log file: {e}")
do_error_exit(f"Problems setting up log file: {e}")
log_file_handler.setFormatter(log_format)
logger.addHandler(log_file_handler)
-77
View File
@@ -7,7 +7,6 @@
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
from ipaddress import ip_interface, ip_address
import asyncio
import aiodns
@@ -45,82 +44,6 @@ def normalize_mac_address(mac_address=None):
return mac_address
def ip_valid_to_add_to_netbox(ip, permitted_subnets, excluded_subnets=None, interface_name=None):
"""
performs a couple of checks to see if an IP address is valid and allowed
to be added to NetBox
IP address must always be passed as interface notation
* 192.168.0.1/24
* fd00::0/64
* 192.168.23.24/255.255.255.24
Parameters
----------
ip: str
IP address to validate
permitted_subnets: list
list of permitted subnets where each subnet/prefix is an instance of IP4Network or IP6Network
excluded_subnets: list
list of excluded subnets where each subnet/prefix is an instance of IP4Network or IP6Network
interface_name: str
name of the interface this IP shall be added. Important for meaningful log messages
Returns
-------
bool: if IP address is valid
"""
if ip is None:
log.error("No IP address provided")
return False
if permitted_subnets is None:
return False
if excluded_subnets is None:
excluded_subnets = list()
ip_text = f"'{ip}'"
if interface_name is not None:
ip_text = f"{ip_text} for {interface_name}"
try:
if "/" in ip:
ip_a = ip_interface(ip).ip
else:
ip_a = ip_address(ip)
except ValueError:
log.error(f"IP address {ip_text} invalid!")
return False
if ip_a.is_link_local is True:
log.debug(f"IP address {ip_text} is a link local address. Skipping.")
return False
if ip_a.is_loopback is True:
log.debug(f"IP address {ip_text} is a loopback address. Skipping.")
return False
ip_permitted = False
for permitted_subnet in permitted_subnets:
if ip_a in permitted_subnet:
ip_permitted = True
break
for excluded_subnet in excluded_subnets:
if ip_a in excluded_subnet:
ip_permitted = False
break
if ip_permitted is False:
log.debug(f"IP address {ip_text} not part of any permitted subnet. Skipping.")
return False
return True
def perform_ptr_lookups(ips, dns_servers=None):
"""
Perform DNS reverse lookups for IP addresses to find corresponding DNS name
+17
View File
@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
default_config_file_path = "./settings.ini"
common_config_section_name = "common"
netbox_config_section_name = "netbox"
source_config_section_name = "source"
env_var_prefix = "NBS"
env_var_source_prefix = f"{env_var_prefix}_{source_config_section_name.upper()}"
+166
View File
@@ -0,0 +1,166 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
from module.common.misc import grab
from module.common.logging import get_logger
from module.config.parser import ConfigParser
from module.config.option import ConfigOption
from module.config.group import ConfigOptionGroup
log = get_logger()
class ConfigOptions:
def __init__(self, **kwargs):
for name in kwargs:
setattr(self, name, kwargs[name])
def __eq__(self, other):
if not isinstance(other, ConfigOptions):
return NotImplemented
return vars(self) == vars(other)
def __contains__(self, key):
return key in self.__dict__
class ConfigBase:
"""
Base class to parse config data
"""
section_name = None
_parsing_failed = False
options = list()
config_content = dict()
def __init__(self):
config = ConfigParser()
if config.parsing_finished is True:
self.config_content = config.content
# stub function, needs to implemented in each config class
def validate_options(self):
pass
def set_validation_failed(self):
self._parsing_failed = True
def get_option_by_name(self, name: str) -> ConfigOption:
for option in self.options:
if option.key == name:
return option
def parse(self, do_log: bool = True):
def _log(handler, message):
if do_log is True:
handler(message)
def get_value(key: str = None):
separator = "|"
path = [self.section_name]
source_name = getattr(self, "source_name", None)
if source_name is not None:
path.append(source_name)
if key is not None:
path.append(key)
return grab(self.config_content, separator.join(path), separator=separator)
if self.section_name is None:
raise KeyError(f"Class '{self.__class__.__name__}' is missing 'section_name' attribute")
config_option_location = self.section_name
if hasattr(self, "source_name"):
config_option_location += f".{self.source_name}"
options_returned = list()
input_options = list()
for config_object in self.options:
if isinstance(config_object, ConfigOption):
input_options.append(config_object)
elif isinstance(config_object, ConfigOptionGroup):
input_options.extend(config_object.options)
for config_object in input_options:
if not isinstance(config_object, ConfigOption):
continue
config_value = get_value(config_object.key)
alt_key_used = False
if config_value is None and config_object.alt_key is not None:
alt_key_used = True
config_value = get_value(config_object.alt_key)
# check for deprecated settings
if config_value is not None and config_object.deprecated is True:
log_text = f"Setting '{config_object.key}' in '{config_option_location}' is deprecated " \
"and will be removed soon."
if len(config_object.deprecation_message) > 0:
log_text += " " + config_object.deprecation_message
_log(log.warning, log_text)
# check for removed settings
if config_value is not None and config_object.removed is True:
object_key = config_object.key
if alt_key_used is True:
object_key = config_object.alt_key
log_text = f"Setting '{object_key}' has been removed " \
f"but is still defined in config section '{config_option_location}'."
if len(config_object.deprecation_message) > 0:
log_text += " " + config_object.deprecation_message
_log(log.warning, log_text)
continue
if config_object.removed is True:
continue
# set value
config_object.set_value(config_value)
_log(log.debug, f"Config: {config_option_location}.{config_object.key} = {config_object.sensitive_value}")
if config_object.mandatory is True and config_object.value is None:
self._parsing_failed = True
_log(log.error, f"Config option '{config_object.key}' in "
f"'{config_option_location}' can't be empty/undefined")
if config_object.parsing_failed is True:
self._parsing_failed = True
options_returned.append(config_object)
self.options = options_returned
# check for unknown config options
config_options = get_value()
if not isinstance(config_options, dict):
config_options = dict()
for option_key in config_options.keys():
if option_key not in [x.key for x in input_options]:
_log(log.warning, f"Found unknown config option '{option_key}' for '{config_option_location}' config")
# validate parsed config
self.validate_options()
if self._parsing_failed is True:
log.error("Config validation failed. Exit!")
exit(1)
return ConfigOptions(**{x.key: x.value for x in self.options})
+250
View File
@@ -0,0 +1,250 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
import os
from module.config.formatter import DescriptionFormatterMixin
from module.config.group import ConfigOptionGroup
from module.config.option import ConfigOption
from module.common.config import CommonConfig
from module.netbox.config import NetBoxConfig
from module.sources.vmware.config import VMWareConfig
from module.sources.check_redfish.config import CheckRedfishConfig
from module.common.logging import get_logger
from module.config import default_config_file_path, source_config_section_name
from module.config.files import ConfigFile, ConfigFileINI, ConfigFileYAML
from module import __version__, __version_date__, __description__, __url__
log = get_logger()
class ConfigFileOutput(DescriptionFormatterMixin):
base_config_list = [
CommonConfig,
NetBoxConfig
]
source_config_list = [
VMWareConfig,
CheckRedfishConfig
]
header = f"Welcome to the {__description__} configuration file."
_description = """The values in this file override the default values used by the system if
a config option is not specified. The commented out lines are the configuration
field and the default value used. Uncommenting a line and changing the value
will change the value used at runtime when the process is restarted.
"""
source_description = """Controls the parameters of a defined source. The string past the slash
will be used as a sources name. Sources can be defined multiple times to
represent different sources.
"""
config_file_type = None
indentation_level = 0
lines = list()
def __init__(self, args):
if args is None or args.generate_config is False:
return
if len(args.config_files) == 0:
self.output_file = default_config_file_path
else:
self.output_file = args.config_files[0]
if os.path.exists(self.output_file):
log.error(f'ERROR: Config file "{self.output_file}" already present')
exit(1)
self.config_file_type = ConfigFile.get_file_type(self.output_file)
if self.config_file_type is None:
log.error(f"ERROR: Unknown/Unsupported config file type "
f"'{ConfigFile.get_suffix(self.output_file)}' for {self.output_file}")
exit(1)
self.comment_prefix = self.config_file_type.comment_prefix
self.format()
self._set_indent_level(0)
self._add_blank_line()
self._add_line(f"{self.comment_prefix}EOF")
self._add_blank_line()
self.lines = [x.rstrip() for x in self.lines]
try:
with open(self.output_file, "w") as fp:
fp.write('\n'.join(self.lines))
except Exception as e:
log.error(f"Error: Unable to write to file '{self.output_file}': {e}")
exit(1)
exit(0)
def format(self):
self._add_line(f"{self.comment_prefix*3} {self.header}\n")
self._add_line(f"{self.comment_prefix*3} Version: {__version__} ({__version_date__})\n")
self._add_line(f"{self.comment_prefix * 3} Project URL: {__url__}\n")
self._add_blank_line()
self._add_lines(self.config_description(prefix=self.comment_prefix).split("\n"))
self._add_blank_line()
for config_section in self.base_config_list:
config_instance = config_section()
self._format_section_description(config_instance)
self._add_blank_line()
if self.config_file_type is ConfigFileINI:
self._add_line(f"[{config_instance.section_name}]")
elif self.config_file_type is ConfigFileYAML:
self._set_indent_level(0)
self._add_line(f"{config_section.section_name}:")
self._set_indent_level(1)
self._format_options(config_instance.options)
self._set_indent_level(0)
# write out section description
self._format_section_description(section_name="source/*", section_description=self.source_description)
self._add_blank_line()
if self.config_file_type is ConfigFileYAML:
self._add_line(f"{source_config_section_name}:")
for config_section in self.source_config_list:
config_instance = config_section()
self._format_section_description(config_instance)
self._add_blank_line()
if self.config_file_type is ConfigFileINI:
self._add_line(f"[{config_instance.section_name}/{config_instance.source_name_example}]")
elif self.config_file_type is ConfigFileYAML:
self._set_indent_level(1)
self._add_line(f"{config_instance.source_name_example}:")
self._set_indent_level(2)
self._format_options(config_instance.options)
self._set_indent_level(1)
def _add_line(self, line: str):
indent = ""
indent_size = 2
if self.config_file_type is ConfigFileYAML:
indent = " " * indent_size * self.indentation_level
self.lines.append(f"{indent}{line}")
def _add_lines(self, lines: list):
for line in lines:
self._add_line(line)
def _set_indent_level(self, level: int):
self.indentation_level = level
def _format_options(self, option_list: list):
for option in option_list:
if isinstance(option, ConfigOption):
self._format_config_option(option)
elif isinstance(option, ConfigOptionGroup):
self._format_config_option_group(option)
self._add_blank_line()
def _format_section_description(self,
config_instance=None,
section_name: str = None,
section_description: str = None):
wide_prefix = self.comment_prefix * 3
if config_instance is not None:
if section_description is None:
section_description = config_instance.__doc__
if section_name is None:
section_name = f"{config_instance.section_name}"
if section_description is not None and section_name is not None:
section_formatter = DescriptionFormatterMixin()
section_formatter._description = section_description
self._add_blank_line()
self._add_line(wide_prefix)
self._add_line(f"{wide_prefix} [{section_name}]")
self._add_line(wide_prefix)
self._add_lines(section_formatter.config_description(prefix=wide_prefix).split("\n"))
self._add_line(wide_prefix)
def _add_blank_line(self):
if len(self.lines) > 0 and self.lines[-1] != "":
self.lines.append("")
def _format_config_option(self, option):
if option is None or option.removed is True or option.deprecated is True:
return
if len(option.description()) > 0:
self._add_blank_line()
self._add_lines(option.config_description(prefix=self.comment_prefix).split("\n"))
option_key = option.key
option_value = None
if option.mandatory is False:
option_key = f"{self.comment_prefix}{option_key}"
if option_value is None and option.default_value is not None:
option_value = option.default_value
if option_value is None and option.config_example is not None:
option_value = option.config_example
if option_value is None:
option_value = ""
if self.config_file_type is ConfigFileINI:
self._add_line(f"{option_key} = {option_value}")
elif self.config_file_type is ConfigFileYAML:
self._add_line(f"{option_key}: {option_value}")
def _format_config_option_group(self, group):
if group is None:
return
if isinstance(group.title, str) and len(group.title) > 0:
self._add_blank_line()
self._add_line(f"{self.comment_prefix} {group.title} options")
if len(group.description()) > 0:
self._add_blank_line()
self._add_lines(group.config_description(prefix=self.comment_prefix).split("\n"))
if isinstance(group.config_example, str) and len(group.config_example) > 0:
formatter = DescriptionFormatterMixin()
formatter._description = group.config_example
self._add_line(self.comment_prefix)
self._add_lines(formatter.config_description(prefix=self.comment_prefix).split("\n"))
for option in group.options:
self._format_config_option(option)
+51
View File
@@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
class ConfigFileINI:
suffixes = [
"ini"
]
comment_prefix = ";"
class ConfigFileYAML:
suffixes = [
"yml",
"yaml"
]
comment_prefix = "#"
class ConfigFile:
supported_config_file_types = [
ConfigFileINI,
ConfigFileYAML
]
@classmethod
def get_file_type(cls, config_file_name: str):
suffix = cls.get_suffix(config_file_name)
if suffix is None:
return
for possible_file_type in cls.supported_config_file_types:
if suffix in possible_file_type.suffixes:
return possible_file_type
@classmethod
def get_suffix(cls, config_file_name):
if not isinstance(config_file_name, str):
return
return config_file_name.lower().split(".")[-1]
+74
View File
@@ -0,0 +1,74 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
from textwrap import fill, indent, dedent
default_output_width = 90
class DescriptionFormatterMixin:
_description = ""
def description(self, width: int = default_output_width) -> str:
"""
return description as a string wrapped at 'width'
SPECIAL: if self._description starts with a blank character,
the description will be strip of indentation and NO line wrapping will be applied.
Parameters
----------
width: int
max characters per line
Returns
-------
str: single string containing new line characters
"""
if not isinstance(width, int):
raise ValueError("value for 'width' must be of type int")
if self._description.startswith(" "):
return dedent(self._description.rstrip())
else:
return fill(" ".join(self._description.split()), width=width)
def config_description(self, prefix: str = "#", width: int = default_output_width) -> str:
"""
prefixes each description line with one or more 'prefix' characters
and a blank between prefix and description line text
Parameters
----------
prefix: str
string to prefix each line with
width: int
max characters per line
Returns
-------
str: single string containing new line characters
"""
if not isinstance(width, int):
raise ValueError("value for 'width' must be of type int")
if not isinstance(prefix, str):
raise ValueError("value for 'prefix' must be of type str")
prefix += " "
if width - len(prefix) < 3:
width = 3
else:
width = width - len(prefix)
return indent(self.description(width), prefix, lambda line: True)
+32
View File
@@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
from module.config.option import ConfigOption
from module.config.formatter import DescriptionFormatterMixin
class ConfigOptionGroup(DescriptionFormatterMixin):
def __init__(self,
title: str = "",
description: str = "",
config_example: str = "",
options: list = None):
self.title = title
self._description = description
self.config_example = config_example
self.options = options
if not isinstance(self.options, list):
raise AttributeError("parameter options is not a list of config options")
else:
for option in self.options:
if not isinstance(option, ConfigOption):
raise AttributeError(f"option {option} needs to be of type {ConfigOption.__class__.__name__}")
+125
View File
@@ -0,0 +1,125 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
from typing import Any
from module.config.formatter import DescriptionFormatterMixin
from module.common.logging import get_logger
log = get_logger()
class ConfigOption(DescriptionFormatterMixin):
"""
handles all attributes of a single config option
"""
def __init__(self,
key: str,
value_type: Any,
description: str = "",
default_value: Any = None,
config_example: Any = None,
mandatory: bool = False,
alt_key: str = None,
deprecated: bool = False,
deprecation_message: str = "",
removed: bool = False,
sensitive: bool = False):
self.key = key
self._value = None
self.value_type = value_type
self._description = description
self.default_value = default_value
self.config_example = config_example
self.mandatory = mandatory
self.alt_key = alt_key
self.deprecated = deprecated
self.deprecation_message = deprecation_message
self.removed = removed
self.sensitive = sensitive
self.parsing_failed = False
if self.config_example is None:
self.config_example = self.default_value
if self.default_value is not None:
self.set_value(self.default_value)
if not isinstance(self._description, str):
raise ValueError(f"value for 'description' of '{self.key}' must be of type str")
if not isinstance(self.deprecation_message, str):
raise ValueError(f"value for 'deprecation_message' of '{self.key}' must be of type str")
if self.config_example is not None and not isinstance(self.config_example, self.value_type):
raise ValueError(f"value for 'config_example' of '{self.key}' must be of '{self.value_type}'")
def __repr__(self):
return f"{self.key}: {self._value}"
@property
def value(self):
return self._value
@property
def sensitive_value(self):
if self.sensitive is True and self._value is not None:
return self._value[0:3] + "***"
return self._value
def set_value(self, value):
if value is None:
return
if self.value_type == bool:
try:
config_value = self.to_bool(value)
except ValueError:
log.error(f"Unable to parse '{value}' for '{self.key}' as bool")
self.parsing_failed = True
return
elif self.value_type == int:
try:
config_value = int(value)
except ValueError:
log.error(f"Unable to parse '{value}' for '{self.key}' as int")
self.parsing_failed = True
return
else:
if len(str(value)) == 0:
return
config_value = value
self._value = config_value
@staticmethod
def to_bool(value):
"""
converts a string to a boolean
"""
valid = {
'true': True, 't': True, '1': True, 'yes': True,
'false': False, 'f': False, '0': False, 'no': False
}
if isinstance(value, bool):
return value
elif isinstance(value, str):
if value.lower() in valid:
return valid[value.lower()]
raise ValueError
+335
View File
@@ -0,0 +1,335 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
import os
import configparser
from typing import Dict
import yaml
from module.common.logging import get_logger
from module.common.misc import grab, do_error_exit
from module.config import *
from module.config.files import ConfigFile, ConfigFileINI, ConfigFileYAML
log = get_logger()
class ConfigParser:
"""
Singleton class to parse a given list of config files
"""
file_list = list()
content = dict()
config_errors = list()
config_warnings = list()
parsing_finished = False
def __new__(cls):
it = cls.__dict__.get("__it__")
if it is not None:
return it
cls.__it__ = it = object.__new__(cls)
it.init()
return it
def init(self) -> None:
pass
def add_config_file(self, config_file_name: str) -> None:
if isinstance(config_file_name, str) and len(config_file_name) > 0:
self.file_list.append(self.get_config_file_path(config_file_name))
def add_config_file_list(self, config_file_name_list: list) -> None:
if not isinstance(config_file_name_list, list):
return
for config_file_name in config_file_name_list:
self.add_config_file(config_file_name)
def log_end_exit_on_errors(self) -> None:
for error in self.config_errors:
log.error(error)
for warning in self.config_warnings:
log.warning(warning)
if len(self.config_errors) > 0:
do_error_exit("Unable to open/parse one or more config files")
def _add_error(self, message: str = "") -> None:
if isinstance(message, str) and len(message) > 0:
self.config_errors.append(message)
def _add_warning(self, message: str = "") -> None:
if isinstance(message, str) and len(message) > 0:
self.config_warnings.append(message)
def read_config(self) -> None:
"""
Read config files in the given order
"""
if self.parsing_finished is True:
return
default_config_file = self.get_config_file_path(default_config_file_path)
# check if default config file actually exists
# and add it to the list of files to parse
if os.path.exists(default_config_file) and len(self.file_list) == 0:
self.file_list.append(default_config_file)
# check if config file exists
for f in self.file_list:
# check if file exists
if not os.path.exists(f):
self._add_error(f'Config file "{f}" not found')
self.file_list.remove(f)
continue
# check if it's an actual file
if not os.path.isfile(f):
self._add_error(f'Config file "{f}" is not an actual file')
self.file_list.remove(f)
continue
# check if config file is readable
if not os.access(f, os.R_OK):
self._add_error(f'Config file "{f}" not readable')
self.file_list.remove(f)
continue
for config_file in self.file_list:
config_file_type = ConfigFile.get_file_type(config_file)
if config_file_type is None:
self._add_error(f"Unknown/Unsupported config file type "
f"'{ConfigFile.get_suffix(config_file)}' for {config_file}")
continue
if config_file_type == ConfigFileINI:
config_data = self._parse_ini(config_file)
elif config_file_type == ConfigFileYAML:
config_data = self._parse_yaml(config_file)
else:
continue
self._add_config_data(config_data, config_file)
# parse common and netbox config from env
for section in [common_config_section_name, netbox_config_section_name]:
env_config_data = self._parse_section_env_vars(section)
self._add_config_data(env_config_data)
# parse source data from env
env_config_data = self._parse_source_env_vars()
self._add_config_data(env_config_data)
self.parsing_finished = True
def _add_config_data(self, config_data: dict, config_file: str = "") -> None:
if not isinstance(config_data, dict):
self._add_error(f"Parsed config data from file '{config_file}' is not a dictionary")
return
for section, section_data in config_data.items():
if section == source_config_section_name:
if not isinstance(section_data, dict):
self._add_error(f"Parsed config data from file '{config_file}' for '{section}' is not a dictionary")
continue
if self.content.get(section) is None:
self.content[section] = dict()
for source_name, source_data in section_data.items():
current_data = grab(self.content, f"{section}|{source_name}", separator="|")
if current_data is None:
# source_name needs to be a string
self.content[section][str(source_name)] = source_data
else:
for key, value in source_data.items():
current_data[key] = value
else:
if section_data is None:
continue
if not isinstance(section_data, dict):
self._add_error(f"Parsed config data from file '{config_file}' for '{section}' is not a dictionary")
continue
if self.content.get(section) is None:
self.content[section] = dict()
for key, value in section_data.items():
self.content[section][str(key)] = value
@staticmethod
def get_config_file_path(config_file: str) -> str:
"""
get absolute path to provided config file string
Parameters
----------
config_file: str
config file path
Returns
-------
str: absolute path to config file
"""
if not isinstance(config_file, str):
raise ValueError("value for 'config_file' of 'parse_ini' must be of type str")
if len(config_file) == 0:
raise ValueError(f"value for 'config_file' can't be empty")
base_dir = os.sep.join(__file__.split(os.sep)[0:-3])
if config_file[0] != os.sep:
config_file = f"{base_dir}{os.sep}{config_file}"
return os.path.realpath(config_file)
def _parse_ini(self, config_file: str = "") -> Dict:
if not isinstance(config_file, str):
raise ValueError("value for 'config_file' of 'parse_ini' must be of type str")
if len(config_file) == 0:
raise ValueError(f"value for 'config_file' can't be empty")
# setup config parser and read config
config_handler = configparser.ConfigParser(strict=True, allow_no_value=True,
empty_lines_in_values=False, interpolation=None)
return_data = {
source_config_section_name: dict(dict())
}
try:
config_handler.read_file(open(config_file))
except configparser.Error as e:
self._add_error(f"Problem while config file '{config_file}' parsing: {e}")
return return_data
except Exception as e:
self._add_error(f"Unable to open file '{config_file}': {e}")
return return_data
for section in config_handler.sections():
source_data = dict(config_handler.items(section))
if section.startswith(f"{source_config_section_name}/"):
return_data[source_config_section_name][section.replace(f"{source_config_section_name}/", "")] = \
source_data
else:
return_data[section] = source_data
return return_data
def _parse_yaml(self, config_file: str = "") -> Dict:
if not isinstance(config_file, str):
raise ValueError("value for 'config_file' of 'parse_yaml_or_toml' must be of type str")
if len(config_file) == 0:
raise ValueError(f"value for 'config_file' can't be empty")
return_data = dict()
with open(config_file, "r") as stream:
try:
return_data = yaml.safe_load(stream)
except yaml.YAMLError as e:
self._add_error(f"Problem while config file '{config_file}' parsing: {e}")
return return_data
except Exception as e:
self._add_error(f"Unable to open file '{config_file}': {e}")
return return_data
if isinstance(return_data.get("sources"), dict) and return_data.get(source_config_section_name) is None:
return_data[source_config_section_name] = return_data.get("sources")
del return_data["sources"]
return return_data
@staticmethod
def _parse_section_env_vars(section: str) -> Dict:
return_data = {
section: dict()
}
section_prefix = f"{env_var_prefix}_{section}".upper()
for key, value in os.environ.items():
if key.upper().startswith(section_prefix):
return_data[section][key.replace(f"{section_prefix}_", "", 1).lower()] = value
return return_data
def _parse_source_env_vars(self) -> Dict:
source_indexes = set()
env_var_list = dict()
env_var_names = dict() # keep list of var names to point out possible config errors
return_data = {
source_config_section_name: dict(dict())
}
# compile dict of relevant env vars and values
for key, value in os.environ.items():
if key.upper().startswith(f"{env_var_source_prefix}_"):
env_var_list[key.upper()] = value
env_var_names[key.upper()] = key
for env_var in env_var_list.keys():
# try to find a var which contains the source name
if env_var.endswith("_NAME"):
source_indexes.add(env_var.replace(f"{env_var_source_prefix}_", "", 1).replace("_NAME", "", 1))
for source_index in source_indexes:
source_env_config = dict()
source_prefix = f"{env_var_source_prefix}_{source_index}"
source_name = env_var_list.get(f"{source_prefix}_NAME")
if source_name is None:
continue
for key, value in env_var_list.items():
# skip var which contains the name
if key.upper().endswith("NAME"):
continue
if key.startswith(source_prefix):
source_env_config[key.replace(f"{source_prefix}_", "", 1).lower()] = value
del env_var_names[key]
if len(source_env_config) > 0:
return_data[source_config_section_name][source_name] = source_env_config
# point out possible env var config mistakes
for _, key in env_var_names.items():
self._add_warning(f"Found ENV var '{key}' which cannot be associated with any source due to "
f"missing '{env_var_source_prefix}_<index>_NAME' var")
return return_data
+145
View File
@@ -0,0 +1,145 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
from module.config.option import ConfigOption
from module.config.base import ConfigBase
from module.config import netbox_config_section_name
from module.common.logging import get_logger
log = get_logger()
class NetBoxConfig(ConfigBase):
"""Controls the connection parameters to your netBox instance
"""
section_name = netbox_config_section_name
def __init__(self):
self.options = [
ConfigOption("api_token",
str,
description="""Requires an NetBox API token with full permissions on all objects except
'auth', 'secrets' and 'users'
""",
config_example="XYZ",
mandatory=True,
sensitive=True),
ConfigOption("host_fqdn",
str,
description="Requires a hostname or IP which points to your NetBox instance",
config_example="netbox.example.com",
mandatory=True),
ConfigOption("port",
int,
description="""Define the port your NetBox instance is listening on.
If 'disable_tls' is set to "true" this option might be set to 80
""",
default_value=443),
ConfigOption("disable_tls",
bool,
description="Whether TLS encryption is enabled or disabled",
default_value=False),
ConfigOption("validate_tls_certs",
bool,
description="""Enforces TLS certificate validation. If this system doesn't trust the NetBox
web server certificate then this option needs to be changed
""",
default_value=True),
ConfigOption("proxy",
str,
description="""Defines a proxy which will be used to connect to NetBox.
Proxy setting needs to include the schema.
Proxy basic auth example: http://user:pass@10.10.1.10:312
""",
config_example="http://example.com:3128"),
ConfigOption("client_cert",
str,
description="Specify a client certificate which can be used to authenticate to NetBox",
config_example="client.pem"),
ConfigOption("client_cert_key",
str,
description="Specify the client certificate private key belonging to the client cert",
config_example="client.key"),
ConfigOption("prune_enabled",
bool,
description="""Whether items which were created by this program but
can't be found in any source anymore will be deleted or not
""",
default_value=False),
ConfigOption("prune_delay_in_days",
int,
description="""Orphaned objects will first be tagged before they get deleted.
Once the amount of days passed the object will actually be deleted
""",
default_value=30),
ConfigOption("ignore_unknown_source_object_pruning",
bool,
description="""This will tell netbox-sync to ignore objects in NetBox
with tag 'NetBox-synced' from pruning if the source is not defined in
this config file (https://github.com/bb-Ricardo/netbox-sync/issues/176)
""",
default_value=False),
ConfigOption("default_netbox_result_limit",
int,
description="""The maximum number of objects returned in a single request.
If a NetBox instance is very quick responding the value should be raised
""",
default_value=200),
ConfigOption("timeout",
int,
description="""The maximum time a query is allowed to execute before being
killed and considered failed
""",
default_value=30),
ConfigOption("max_retry_attempts",
int,
description="""The amount of times a failed request will be reissued.
Once the maximum is reached the syncing process will be stopped completely.
""",
default_value=4),
ConfigOption("use_caching",
bool,
description="""Defines if caching of NetBox objects is used or not.
If problems with unresolved dependencies occur, switching off caching might help.
""",
default_value=True),
ConfigOption("cache_directory_location",
str,
description="The location of the directory where the cache files should be stored",
default_value="cache")
]
super().__init__()
def validate_options(self):
for option in self.options:
if option.key == "proxy" and option.value is not None:
if "://" not in option.value or \
(not option.value.startswith("http") and not option.value.startswith("socks5")):
log.error(f"Config option 'proxy' in '{NetBoxConfig.section_name}' must contain the schema "
f"http, https, socks5 or socks5h")
self.set_validation_failed()
+45 -105
View File
@@ -21,6 +21,9 @@ from packaging import version
from module.common.logging import get_logger, DEBUG3
from module.common.misc import grab, do_error_exit, plural
from module.netbox.object_classes import *
from module.netbox.inventory import NetBoxInventory
from module.netbox.config import NetBoxConfig
from module import __version__
log = get_logger()
@@ -33,26 +36,6 @@ class NetBoxHandler:
# minimum API version necessary
minimum_api_version = "2.9"
# permitted settings and defaults
settings = {
"api_token": None,
"host_fqdn": None,
"port": None,
"disable_tls": False,
"validate_tls_certs": True,
"proxy": None,
"client_cert": None,
"client_cert_key": None,
"prune_enabled": False,
"prune_delay_in_days": 30,
"default_netbox_result_limit": 200,
"timeout": 30,
"max_retry_attempts": 4,
"use_caching": True,
"cache_directory_location": "cache",
"ignore_unknown_source_object_pruning": False
}
# This tag gets added to all objects create/updated/inherited by this program
primary_tag = "NetBox-synced"
@@ -65,22 +48,13 @@ class NetBoxHandler:
# this is only used to speed up testing, NEVER SET TO True IN PRODUCTION
testing_cache = False
# pointer to inventory object
inventory = None
# keep track of already resolved dependencies
resolved_dependencies = set()
# set bogus default version
version = "0.0.1"
def __init__(self):
def __init__(self, settings=None, inventory=None, nb_sync_version=None):
self.settings = settings
self.inventory = inventory
self.version = nb_sync_version
self.parse_config_settings(settings)
self.settings = NetBoxConfig().parse()
self.inventory = NetBoxInventory()
# flood the console
if log.level == DEBUG3:
@@ -89,18 +63,18 @@ class NetBoxHandler:
HTTPConnection.debuglevel = 1
proto = "https"
if bool(self.disable_tls) is True:
if bool(self.settings.disable_tls) is True:
proto = "http"
# disable TLS insecure warnings if user explicitly switched off validation
if bool(self.validate_tls_certs) is False:
if bool(self.settings.validate_tls_certs) is False:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
port = ""
if self.port is not None:
port = f":{self.port}"
if self.settings.port is not None:
port = f":{self.settings.port}"
self.url = f"{proto}://{self.host_fqdn}{port}/api/"
self.url = f"{proto}://{self.settings.host_fqdn}{port}/api/"
self.session = self.create_session()
@@ -124,10 +98,10 @@ class NetBoxHandler:
If a condition fails, caching is switched of.
"""
if self.use_caching is False:
if self.settings.use_caching is False:
return
cache_folder_name = self.cache_directory_location
cache_folder_name = self.settings.cache_directory_location
base_dir = os.sep.join(__file__.split(os.sep)[0:-3])
if cache_folder_name[0] != os.sep:
@@ -138,7 +112,7 @@ class NetBoxHandler:
# check if directory is a file
if os.path.isfile(self.cache_directory):
log.warning(f"The cache directory ({self.cache_directory}) seems to be file.")
self.use_caching = False
self.settings.use_caching = False
# check if directory exists
if not os.path.exists(self.cache_directory):
@@ -147,58 +121,22 @@ class NetBoxHandler:
os.makedirs(self.cache_directory, 0o700)
except OSError:
log.warning(f"Unable to create cache directory: {self.cache_directory}")
self.use_caching = False
self.settings.use_caching = False
except Exception as e:
log.warning(f"Unknown exception while creating cache directory {self.cache_directory}: {e}")
self.use_caching = False
self.settings.use_caching = False
# check if directory is writable
if not os.access(self.cache_directory, os.X_OK | os.W_OK):
log.warning(f"Error writing to cache directory: {self.cache_directory}")
self.use_caching = False
self.settings.use_caching = False
if self.use_caching is False:
if self.settings.use_caching is False:
log.warning("NetBox caching DISABLED")
else:
log.debug(f"Successfully configured cache directory: {self.cache_directory}")
def parse_config_settings(self, config_settings):
"""
Validate parsed settings from config file
Parameters
----------
config_settings: dict
dict of config settings
"""
validation_failed = False
for setting in ["host_fqdn", "api_token"]:
if config_settings.get(setting) is None:
log.error(f"Config option '{setting}' in 'netbox' can't be empty/undefined")
validation_failed = True
for setting in ["prune_delay_in_days", "default_netbox_result_limit", "timeout", "max_retry_attempts"]:
if not isinstance(config_settings.get(setting), int):
log.error(f"Config option '{setting}' in 'netbox' must be an integer.")
validation_failed = True
proxy = config_settings.get("proxy")
if proxy is not None:
if "://" not in proxy or (not proxy.startswith("http") and not proxy.startswith("socks5")):
log.error(f"Config option 'proxy' in 'netbox' must contain the schema "
f"http, https, socks5 or socks5h")
validation_failed = True
if validation_failed is True:
log.error("Config validation failed. Exit!")
exit(1)
for setting in self.settings.keys():
setattr(self, setting, config_settings.get(setting))
def create_session(self):
def create_session(self) -> requests.Session:
"""
Create a new NetBox session using api_token
@@ -208,8 +146,8 @@ class NetBoxHandler:
"""
header = {
"Authorization": f"Token {self.api_token}",
"User-Agent": f"netbox-sync/{self.version}",
"Authorization": f"Token {self.settings.api_token}",
"User-Agent": f"netbox-sync/{__version__}",
"Content-Type": "application/json"
}
@@ -217,18 +155,18 @@ class NetBoxHandler:
session.headers.update(header)
# adds proxy to the session
if self.proxy is not None:
if self.settings.proxy is not None:
session.proxies.update({
"http": self.proxy,
"https": self.proxy
"http": self.settings.proxy,
"https": self.settings.proxy
})
# adds client cert to session
if self.client_cert is not None:
if self.client_cert_key is not None:
session.cert = (self.client_cert, self.client_cert_key)
if self.settings.client_cert is not None:
if self.settings.client_cert_key is not None:
session.cert = (self.settings.client_cert, self.settings.client_cert_key)
else:
session.cert = self.client_cert
session.cert = self.settings.client_cert
log.debug("Created new requests Session for NetBox.")
@@ -254,14 +192,14 @@ class NetBoxHandler:
try:
response = self.session.get(
self.url,
timeout=self.timeout,
verify=self.validate_tls_certs)
timeout=self.settings.timeout,
verify=self.settings.validate_tls_certs)
except Exception as e:
do_error_exit(f"NetBox connection: {e}")
result = str(response.headers.get("API-Version"))
log.info(f"Successfully connected to NetBox '{self.host_fqdn}'")
log.info(f"Successfully connected to NetBox '{self.settings.host_fqdn}'")
log.debug(f"Detected NetBox API version: {result}")
return result
@@ -307,7 +245,7 @@ class NetBoxHandler:
params = dict()
if "limit" not in params.keys():
params["limit"] = self.default_netbox_result_limit
params["limit"] = self.settings.default_netbox_result_limit
# always exclude config context
params["exclude"] = "config_context"
@@ -390,7 +328,7 @@ class NetBoxHandler:
if log.level == DEBUG3:
pprint.pprint(vars(this_request))
for _ in range(self.max_retry_attempts):
for _ in range(self.settings.max_retry_attempts):
log_message = f"Sending {this_request.method} to '{this_request.url}'"
@@ -400,7 +338,9 @@ class NetBoxHandler:
log.debug2(log_message)
try:
response = self.session.send(this_request, timeout=self.timeout, verify=self.validate_tls_certs)
response = self.session.send(this_request,
timeout=self.settings.timeout,
verify=self.settings.validate_tls_certs)
except (ConnectionError, requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout):
log.warning(f"Request failed, trying again: {log_message}")
@@ -408,7 +348,7 @@ class NetBoxHandler:
else:
break
else:
do_error_exit(f"Giving up after {self.max_retry_attempts} retries.")
do_error_exit(f"Giving up after {self.settings.max_retry_attempts} retries.")
log.debug2("Received HTTP Status %s.", response.status_code)
@@ -457,7 +397,7 @@ class NetBoxHandler:
latest_update = None
# check if cache file is accessible
if self.use_caching is True:
if self.settings.use_caching is True:
cache_this_class = True
if os.path.exists(cache_file) and not os.access(cache_file, os.R_OK):
@@ -534,7 +474,7 @@ class NetBoxHandler:
# read a full set from NetBox
nb_objects = list()
if full_nb_data is not None:
if isinstance(full_nb_data, dict):
nb_objects = full_nb_data.get("results")
elif self.testing_cache is True:
@@ -553,7 +493,7 @@ class NetBoxHandler:
nb_objects.extend(updated_nb_data.get("results"))
if self.use_caching is True:
if self.settings.use_caching is True:
try:
pickle.dump(nb_objects, open(cache_file, "wb"))
if cache_this_class is True:
@@ -578,10 +518,10 @@ class NetBoxHandler:
log.debug("Checking/Adding NetBox Sync dependencies")
prune_text = f"Pruning is enabled and Objects will be automatically " \
f"removed after {self.prune_delay_in_days} days"
f"removed after {self.settings.prune_delay_in_days} days"
if self.prune_enabled is False:
prune_text = f"Objects would be automatically removed after {self.prune_delay_in_days} days " \
if self.settings.prune_enabled is False:
prune_text = f"Objects would be automatically removed after {self.settings.prune_delay_in_days} days " \
f"but pruning is currently disabled."
self.inventory.add_update_object(NBTag, data={
@@ -762,7 +702,7 @@ class NetBoxHandler:
deleted from NetBox.
"""
if self.prune_enabled is False:
if self.settings.prune_enabled is False:
log.debug("Pruning disabled. Skipping")
return
@@ -817,7 +757,7 @@ class NetBoxHandler:
days_since_last_update = (today - last_updated).days
# it seems we need to delete this object
if last_updated is not None and days_since_last_update >= self.prune_delay_in_days:
if last_updated is not None and days_since_last_update >= self.settings.prune_delay_in_days:
log.info(f"{nb_object_sub_class.name.capitalize()} '{this_object.get_display_name()}' is orphaned "
f"for {days_since_last_update} days and will be deleted.")
+16 -7
View File
@@ -16,17 +16,25 @@ log = get_logger()
class NetBoxInventory:
"""
Class to manage a inventory of NetBoxObject objects
Singleton class to manage a inventory of NetBoxObject objects
"""
base_structure = dict()
source_list = list()
def __init__(self):
# track NetBox API version and provided it for all sources
netbox_api_version = "0.0.0"
# track NetBox API version and provided it for all sources
self.netbox_api_version = "0.0.0"
def __new__(cls):
it = cls.__dict__.get("__it__")
if it is not None:
return it
cls.__it__ = it = object.__new__(cls)
it.init()
return it
def init(self):
for object_type in NetBoxObject.__subclasses__():
@@ -240,13 +248,13 @@ class NetBoxInventory:
return self.base_structure.get(object_type.name, list())
def get_all_interfaces(self, this_object):
def get_all_interfaces(self, this_object: (NBVM, NBDevice)):
"""
Return all interfaces items for a NBVM, NBDevice object
Parameters
----------
this_object: (NBVM, NBDevice)
this_object: NBVM, NBDevice
object instance to return interfaces for
Returns
@@ -286,7 +294,8 @@ class NetBoxInventory:
"""
all_sources_tags = [x.source_tag for x in self.source_list]
disabled_sources_tags = [x.source_tag for x in self.source_list if getattr(x, "enabled") is False]
disabled_sources_tags = \
[x.source_tag for x in self.source_list if grab(x, "settings.enabled", fallback=False) is False]
for object_type in NetBoxObject.__subclasses__():
+1 -1
View File
@@ -1477,7 +1477,7 @@ class NBCluster(NetBoxObject):
primary_key = "name"
secondary_key = "site"
prune = False
#include_secondary_key_if_present = True
# include_secondary_key_if_present = True
def __init__(self, *args, **kwargs):
self.data_model = {
+19 -38
View File
@@ -11,9 +11,11 @@
from module.sources.vmware.connection import VMWareHandler
from module.sources.check_redfish.import_inventory import CheckRedfish
from module.common.configuration import get_config
from module.common.logging import get_logger
from module.netbox.inventory import NetBoxInventory
from module.config.parser import ConfigParser
from module.config.base import ConfigOptions
from module.config import source_config_section_name
# list of valid sources
valid_sources = [VMWareHandler, CheckRedfish]
@@ -40,7 +42,7 @@ def validate_source(source_class_object=None, state="pre"):
"init_successful": bool,
"inventory": NetBoxInventory,
"name": str,
"settings": dict,
"settings": ConfigOptions,
"source_tag": str,
"source_type": str,
}
@@ -65,72 +67,51 @@ def validate_source(source_class_object=None, state="pre"):
raise ValueError(f"Value for attribute '{attr}' can't be empty.")
def instantiate_sources(config_handler=None, inventory=None):
def instantiate_sources():
"""
Instantiate a source handler and add necessary attributes. Also
validate source handler pre and post initialization.
Parameters
----------
config_handler: ConfigParser
a config file handler to read config data from
inventory: inventory object
inventory to be passed to source handler
Returns
-------
source handler object: instantiated source handler
"""
config = ConfigParser()
inventory = NetBoxInventory()
log = get_logger()
if config_handler is None:
raise Exception("No config handler defined!")
if inventory is None:
raise Exception("No inventory defined!")
# first validate all available sources
for possible_source_class in valid_sources:
validate_source(possible_source_class)
sources = list()
# iterate over sources and validate them
for source_section in config_handler.sections():
source_config = dict()
if isinstance(config.content, dict):
source_config = config.content.get(source_config_section_name)
# a source section needs to start with "source/"
if not source_section.startswith("source/"):
for source_name, source_config in source_config.items():
source_config_type = source_config.get("type")
if source_config_type is None:
log.error(f"Source {source_name} option 'type' is undefined")
continue
# get type of source
source_type = config_handler.get(source_section, "type", fallback=None)
if source_type is None:
log.error(f"Source {source_section} option 'type' is undefined")
source_class = None
for possible_source_class in valid_sources:
validate_source(possible_source_class)
if possible_source_class.implements(source_type):
if possible_source_class.implements(source_config_type):
source_class = possible_source_class
break
if source_class is None:
log.error(f"Unknown source type '{source_type}' defined for '{source_section}'")
log.error(f"Unknown source type '{source_config_type}' defined for '{source_name}'")
continue
source_config = get_config(config_handler,
section=source_section,
valid_settings=source_class.settings,
deprecated_settings=getattr(source_class, "deprecated_settings", None),
removed_settings=getattr(source_class, "removed_settings", None)
)
source_handler = source_class(name=source_section.replace("source/", ""),
inventory=inventory,
settings=source_config)
source_handler = source_class(name=source_name)
validate_source(source_handler, "post")
+103
View File
@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
import os
from module.config import source_config_section_name
from module.config.base import ConfigBase
from module.config.option import ConfigOption
from module.sources.common.conifg import *
from module.common.logging import get_logger
from module.sources.common.permitted_subnets import PermittedSubnets
log = get_logger()
class CheckRedfishConfig(ConfigBase):
section_name = source_config_section_name
source_name = None
source_name_example = "my-redfish-example"
def __init__(self):
self.options = [
ConfigOption(**config_option_enabled_definition),
ConfigOption("type",
str,
description="type of source. This defines which source handler to use",
config_example="check_redfish",
mandatory=True),
ConfigOption("inventory_file_path",
str,
description="define the full path where the check_redfish inventory json files are located",
config_example="/full/path/to/inventory/files",
mandatory=True),
ConfigOption(**config_option_permitted_subnets_definition),
ConfigOption("overwrite_host_name",
bool,
description="""define if the host name discovered via check_redfish
overwrites the device host name in NetBox""",
default_value=False),
ConfigOption("overwrite_power_supply_name",
bool,
description="""define if the name of the power supply discovered via check_redfish
overwrites the power supply name in NetBox""",
default_value=False),
ConfigOption("overwrite_power_supply_attributes",
bool,
description="""define if existing power supply attributes are overwritten with data discovered
via check_redfish if False only data which is not preset in NetBox will be added""",
default_value=True),
ConfigOption("overwrite_interface_name",
bool,
description="""define if the name of the interface discovered via check_redfish
overwrites the interface name in NetBox""",
default_value=False),
ConfigOption("overwrite_interface_attributes",
bool,
description="""define if existing interface attributes are overwritten with data discovered
via check_redfish if False only data which is not preset in NetBox will be added""",
default_value=True)
]
super().__init__()
def validate_options(self):
for option in self.options:
if option.key == "inventory_file_path":
if not os.path.exists(option.value):
log.error(f"Inventory file path '{option.value}' not found.")
self.set_validation_failed()
if os.path.isfile(option.value):
log.error(f"Inventory file path '{option.value}' needs to be a directory.")
self.set_validation_failed()
if not os.access(option.value, os.X_OK | os.R_OK):
log.error(f"Inventory file path '{option.value}' not readable.")
self.set_validation_failed()
permitted_subnets_option = self.get_option_by_name("permitted_subnets")
if permitted_subnets_option is not None:
permitted_subnets = PermittedSubnets(permitted_subnets_option.value)
if permitted_subnets.validation_failed is True:
self.set_validation_failed()
permitted_subnets_option.set_value(permitted_subnets)
+20 -105
View File
@@ -7,7 +7,6 @@
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
from ipaddress import ip_network
import os
import glob
import json
@@ -15,9 +14,11 @@ import json
from packaging import version
from module.sources.common.source_base import SourceBase
from module.sources.check_redfish.config import CheckRedfishConfig
from module.common.logging import get_logger
from module.common.misc import grab, get_string_or_none
from module.common.support import normalize_mac_address, ip_valid_to_add_to_netbox
from module.common.support import normalize_mac_address
from module.netbox.inventory import NetBoxInventory
from module.netbox.object_classes import (
NetBoxInterfaceType,
NBTag,
@@ -74,40 +75,27 @@ class CheckRedfish(SourceBase):
NBCustomField
]
settings = {
"enabled": True,
"inventory_file_path": None,
"permitted_subnets": None,
"overwrite_host_name": False,
"overwrite_power_supply_name": False,
"overwrite_power_supply_attributes": True,
"overwrite_interface_name": False,
"overwrite_interface_attributes": True,
}
init_successful = False
inventory = None
name = None
source_tag = None
source_type = "check_redfish"
enabled = False
inventory_file_path = None
device_object = None
inventory_file_content = None
def __init__(self, name=None, settings=None, inventory=None):
def __init__(self, name=None):
if name is None:
raise ValueError(f"Invalid value for attribute 'name': '{name}'.")
self.inventory = inventory
self.inventory = NetBoxInventory()
self.name = name
self.parse_config_settings(settings)
# parse settings
settings_handler = CheckRedfishConfig()
settings_handler.source_name = self.name
self.settings = settings_handler.parse()
self.source_tag = f"Source: {name}"
self.set_source_tag()
if self.enabled is False:
if self.settings.enabled is False:
log.info(f"Source '{name}' is currently disabled. Skipping")
return
@@ -115,77 +103,6 @@ class CheckRedfish(SourceBase):
self.interface_adapter_type_dict = dict()
def parse_config_settings(self, config_settings):
"""
Validate parsed settings from config file
Parameters
----------
config_settings: dict
dict of config settings
"""
validation_failed = False
for setting in ["inventory_file_path"]:
if config_settings.get(setting) is None:
log.error(f"Config option '{setting}' in 'source/{self.name}' can't be empty/undefined")
validation_failed = True
inv_path = config_settings.get("inventory_file_path")
if not os.path.exists(inv_path):
log.error(f"Inventory file path '{inv_path}' not found.")
validation_failed = True
if os.path.isfile(inv_path):
log.error(f"Inventory file path '{inv_path}' needs to be a directory.")
validation_failed = True
if not os.access(inv_path, os.X_OK | os.R_OK):
log.error(f"Inventory file path '{inv_path}' not readable.")
validation_failed = True
# check permitted ip subnets
permitted_subnets = list()
excluded_subnets = list()
self.settings["excluded_subnets"] = excluded_subnets
if config_settings.get("permitted_subnets") is None:
log.info(f"Config option 'permitted_subnets' in 'source/{self.name}' is undefined. "
f"No IP addresses will be populated to NetBox!")
else:
config_settings["permitted_subnets"] = \
[x.strip() for x in config_settings.get("permitted_subnets").split(",") if x.strip() != ""]
# add "invisible" config option
self.settings["excluded_subnets"] = None
for subnet in config_settings["permitted_subnets"]:
excluded = False
if subnet[0] == "!":
excluded = True
subnet = subnet[1:].strip()
try:
if excluded is True:
excluded_subnets.append(ip_network(subnet))
else:
permitted_subnets.append(ip_network(subnet))
except Exception as e:
log.error(f"Problem parsing permitted subnet: {e}")
validation_failed = True
config_settings["permitted_subnets"] = permitted_subnets
config_settings["excluded_subnets"] = excluded_subnets
if validation_failed is True:
log.error("Config validation failed. Exit!")
exit(1)
for setting in self.settings.keys():
setattr(self, setting, config_settings.get(setting))
def apply(self):
"""
Main source handler method. This method is called for each source from "main" program
@@ -200,7 +117,7 @@ class CheckRedfish(SourceBase):
# first add all custom fields we need for this source
self.add_necessary_base_objects()
for filename in glob.glob(f"{self.inventory_file_path}/*.json"):
for filename in glob.glob(f"{self.settings.inventory_file_path}/*.json"):
self.reset_inventory_state()
@@ -328,7 +245,7 @@ class CheckRedfish(SourceBase):
if serial is not None:
device_data["serial"] = serial
if name is not None and self.overwrite_host_name is True:
if name is not None and self.settings.overwrite_host_name is True:
device_data["name"] = name
if "dell" in str(manufacturer).lower():
chassi = grab(self.inventory_file_content, "inventory.chassi.0")
@@ -441,10 +358,10 @@ class CheckRedfish(SourceBase):
if ps_object is None:
self.inventory.add_object(NBPowerPort, data=ps_data, source=self)
else:
if self.overwrite_power_supply_name is False:
if self.settings.overwrite_power_supply_name is False:
del(ps_data["name"])
data_to_update = self.patch_data(ps_object, ps_data, self.overwrite_power_supply_attributes)
data_to_update = self.patch_data(ps_object, ps_data, self.settings.overwrite_power_supply_attributes)
ps_object.update(data=data_to_update, source=self)
current_ps.remove(ps_object)
@@ -878,15 +795,13 @@ class CheckRedfish(SourceBase):
# collect ip addresses
nic_ips[port_name] = list()
for ipv4_address in grab(nic_port, "ipv4_addresses", fallback=list()):
if ip_valid_to_add_to_netbox(ipv4_address, self.permitted_subnets,
excluded_subnets=self.excluded_subnets, interface_name=port_name) is False:
if self.settings.permitted_subnets.permitted(ipv4_address, interface_name=port_name) is False:
continue
nic_ips[port_name].append(ipv4_address)
for ipv6_address in grab(nic_port, "ipv6_addresses", fallback=list()):
if ip_valid_to_add_to_netbox(ipv6_address, self.permitted_subnets,
excluded_subnets=self.excluded_subnets, interface_name=port_name) is False:
if self.settings.permitted_subnets.permitted(ipv6_address, interface_name=port_name) is False:
continue
nic_ips[port_name].append(ipv6_address)
@@ -913,12 +828,12 @@ class CheckRedfish(SourceBase):
# create or update interface with data
if nic_object is not None:
if self.overwrite_interface_name is False and port_data.get("name") is not None:
if self.settings.overwrite_interface_name is False and port_data.get("name") is not None:
del(port_data["name"])
this_link_type = port_data.get("type")
mgmt_only = port_data.get("mgmt_only")
data_to_update = self.patch_data(nic_object, port_data, self.overwrite_interface_attributes)
data_to_update = self.patch_data(nic_object, port_data, self.settings.overwrite_interface_attributes)
# always overwrite nic type if discovered
if port_data.get("type") != "other":
+25
View File
@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
config_option_enabled_definition = {
"key": "enabled",
"value_type": bool,
"description": "Defines if this source is enabled or not",
"default_value": True
}
config_option_permitted_subnets_definition = {
"key": "permitted_subnets",
"value_type": str,
"description": """IP networks eligible to be synced to NetBox. If an IP address is not part of
this networks then it WON'T be synced to NetBox. To excluded small blocks from bigger IP blocks
a leading '!' has to be added
""",
"config_example": "172.16.0.0/12, 10.0.0.0/8, 192.168.0.0/16, fd00::/8, !10.23.42.0/24"
}
+113
View File
@@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
from ipaddress import ip_address, ip_network, ip_interface
from module.common.logging import get_logger
log = get_logger()
class PermittedSubnets:
"""
initializes and verifies if an IP address is part of an permitted subnet
"""
def __init__(self, config_string: str):
self._validation_failed = False
self.included_subnets = list()
self.excluded_subnets = list()
if config_string is None:
log.info(f"Config option 'permitted_subnets' is undefined. No IP addresses will be populated to NetBox!")
return
if not isinstance(config_string, str):
raise ValueError("permitted subnets need to be of type string")
subnet_list = [x.strip() for x in config_string.split(",") if x.strip() != ""]
for subnet in subnet_list:
excluded = False
if subnet[0] == "!":
excluded = True
subnet = subnet[1:].strip()
try:
if excluded is True:
self.excluded_subnets.append(ip_network(subnet))
else:
self.included_subnets.append(ip_network(subnet))
except Exception as e:
log.error(f"Problem parsing permitted subnet: {e}")
self._validation_failed = True
@property
def validation_failed(self) -> bool:
return self._validation_failed
def permitted(self, ip, interface_name=None) -> bool:
"""
performs a couple of checks to see if an IP address is valid and allowed
to be added to NetBox
IP address must always be passed as interface notation
* 192.168.0.1/24
* fd00::0/64
* 192.168.23.24/255.255.255.24
Parameters
----------
ip: str
IP address to validate
interface_name: str
name of the interface this IP shall be added. Important for meaningful log messages
Returns
-------
bool: if IP address is valid
"""
if ip is None:
log.warning("No IP address passed to validate if this IP belongs to a permitted subnet")
return False
ip_text = f"'{ip}'"
if interface_name is not None:
ip_text = f"{ip_text} for {interface_name}"
try:
if "/" in ip:
ip_a = ip_interface(ip).ip
else:
ip_a = ip_address(ip)
except ValueError:
log.error(f"IP address {ip_text} invalid!")
return False
if ip_a.is_link_local is True:
log.debug(f"IP address {ip_text} is a link local address. Skipping.")
return False
if ip_a.is_loopback is True:
log.debug(f"IP address {ip_text} is a loopback address. Skipping.")
return False
for excluded_subnet in self.excluded_subnets:
if ip_a in excluded_subnet:
return False
for permitted_subnet in self.included_subnets:
if ip_a in permitted_subnet:
return True
log.debug(f"IP address {ip_text} not part of any permitted subnet. Skipping.")
return False
+7 -1
View File
@@ -36,6 +36,12 @@ class SourceBase:
inventory = None
source_tag = None
settings = None
init_successful = False
name = None
def set_source_tag(self):
self.source_tag = f"Source: {self.name}"
@classmethod
def implements(cls, source_type):
@@ -696,7 +702,7 @@ class SourceBase:
return return_data
def add_update_custom_field(self, data):
def add_update_custom_field(self, data) -> NBCustomField:
"""
Adds/updates a NBCustomField object with data.
Update will only update the 'content_types' attribute.
+483
View File
@@ -0,0 +1,483 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
import re
from ipaddress import ip_address
from module.common.misc import quoted_split
from module.config import source_config_section_name
from module.config.base import ConfigBase
from module.config.option import ConfigOption
from module.config.group import ConfigOptionGroup
from module.sources.common.conifg import *
from module.sources.common.permitted_subnets import PermittedSubnets
from module.common.logging import get_logger
log = get_logger()
class VMWareConfig(ConfigBase):
section_name = source_config_section_name
source_name = None
source_name_example = "my-vcenter-example"
def __init__(self):
self.options = [
ConfigOption(**config_option_enabled_definition),
ConfigOption("type",
str,
description="type of source. This defines which source handler to use",
config_example="vmware",
mandatory=True),
ConfigOption("host_fqdn",
str,
description="host name / IP address of the vCenter",
config_example="vcenter.example.com",
mandatory=True),
ConfigOption("port",
int,
description="TCP port to connect to",
default_value=443),
ConfigOption("username",
str,
description="username to use to log into vCenter",
config_example="vcenter-readonly",
mandatory=True),
ConfigOption("password",
str,
description="password to use to log into vCenter",
config_example="super-secret",
sensitive=True,
mandatory=True),
ConfigOption("validate_tls_certs",
bool,
description="""Enforces TLS certificate validation.
If vCenter uses a valid TLS certificate then this option should be set
to 'true' to ensure a secure connection.""",
default_value=False),
ConfigOption("proxy_host",
str,
description="""EXPERIMENTAL: Connect to a vCenter using a proxy server
(socks proxies are not supported). define a host name or an IP address""",
config_example="10.10.1.10"),
ConfigOption("proxy_port",
int,
description="""EXPERIMENTAL: Connect to a vCenter using a proxy server
(socks proxies are not supported).
define proxy server port number""",
config_example=3128),
ConfigOption(**config_option_permitted_subnets_definition),
ConfigOptionGroup(title="filter",
description="""filters can be used to include/exclude certain objects from importing
into NetBox. Include filters are checked first and exclude filters after.
An object name has to pass both filters to be synced to NetBox.
If a filter is unset it will be ignored. Filters are all treated as regex expressions!
If more then one expression should match, a '|' needs to be used
""",
config_example="""Example: (exclude all VMs with "replica" in their name
and all VMs starting with "backup"): vm_exclude_filter = .*replica.*|^backup.*""",
options=[
ConfigOption("cluster_exclude_filter",
str,
description="""If a cluster is excluded from sync then ALL VMs and HOSTS
inside the cluster will be ignored! a cluster can be specified
as "Cluster-name" or "Datacenter-name/Cluster-name" if
multiple clusters have the same name"""),
ConfigOption("cluster_include_filter", str),
ConfigOption("host_exclude_filter",
str,
description="""This will only include/exclude the host,
not the VM if Host is part of a multi host cluster"""),
ConfigOption("host_include_filter", str),
ConfigOption("vm_exclude_filter",
str, description="simply include/exclude VMs"),
ConfigOption("vm_include_filter", str)
]),
ConfigOptionGroup(title="relations",
options=[
ConfigOption("cluster_site_relation",
str,
description="""\
This option defines which vCenter cluster is part of a NetBox site.
This is done with a comma separated key = value list.
key: defines the cluster name as regex
value: defines the NetBox site name (use quotes if name contains commas)
This is a quite important config setting as IP addresses, prefixes, VLANs
and VRFs are site dependent. In order to assign the correct prefix to an IP
address it is important to pick the correct site.
A VM always depends on the cluster site relation
a cluster can be specified as "Cluster-name" or
"Datacenter-name/Cluster-name" if multiple clusters have the same name
""",
config_example="Cluster_NYC = New York, Cluster_FFM.* = Frankfurt, Datacenter_TOKIO/.* = Tokio"),
ConfigOption("host_site_relation",
str,
description="""Same as cluster site but on host level.
If unset it will fall back to cluster_site_relation""",
config_example="nyc02.* = New York, ffm01.* = Frankfurt"),
ConfigOption("cluster_tenant_relation",
str,
description="""\
This option defines which cluster/host/VM belongs to which tenant.
This is done with a comma separated key = value list.
key: defines a hosts/VM name as regex
value: defines the NetBox tenant name (use quotes if name contains commas)
a cluster can be specified as "Cluster-name" or
"Datacenter-name/Cluster-name" if multiple clusters have the same name
""",
config_example="Cluster_NYC.* = Customer A"),
ConfigOption("host_tenant_relation", str, config_example="esxi300.* = Infrastructure"),
ConfigOption("vm_tenant_relation", str, config_example="grafana.* = Infrastructure"),
ConfigOption("vm_platform_relation",
str,
description="""\
This option defines custom platforms if the VMWare created platforms are not suitable.
Pretty much a mapping of VMWare platform name to your own platform name.
This is done with a comma separated key = value list.
key: defines a VMWare returned platform name
value: defines the desired NetBox platform name""",
config_example="centos-7.* = centos7, microsoft-windows-server-2016.* = Windows2016"),
ConfigOption("host_role_relation",
str,
description="""\
Define the NetBox device role used for hosts and VMs. The default is
set to "Server". This is done with a comma separated key = value list.
key: defines a hosts/VM name as regex
value: defines the NetBox role name (use quotes if name contains commas)
""",
config_example=".* = Server"),
ConfigOption("vm_role_relation", str, config_example=".* = Server"),
ConfigOption("cluster_tag_relation",
str,
description="""\
Define NetBox tags which are assigned to a cluster, host or VM. This is
done with a comma separated key = value list.
key: defines a hosts/VM name as regex
value: defines the NetBox tag (use quotes if name contains commas)
a cluster can be specified as "Cluster-name" or
"Datacenter-name/Cluster-name" if multiple clusters have the same name""",
config_example="Cluster_NYC.* = Infrastructure"),
ConfigOption("host_tag_relation", str, config_example="esxi300.* = Infrastructure"),
ConfigOption("vm_tag_relation", str, config_example="grafana.* = Infrastructure")
]),
ConfigOption("match_host_by_serial",
bool,
description="""Try to find existing host based on serial number. This can cause issues
with blade centers if VMWare does not report the blades serial number properly.""",
default_value=True),
ConfigOption("collect_hardware_asset_tag",
bool,
description="Attempt to collect asset tags from vCenter hosts",
default_value=True),
ConfigOption("dns_name_lookup",
bool,
description="""Perform a reverse lookup for all collected IP addresses.
If a dns name was found it will be added to the IP address object in NetBox
""",
default_value=True),
ConfigOption("custom_dns_servers",
str,
description="use custom DNS server to do the reverse lookups",
config_example="192.168.1.11, 192.168.1.12"),
ConfigOption("set_primary_ip",
str,
description="""\
define how the primary IPs should be set
possible values:
always: will remove primary IP from the object where this address is
currently set as primary and moves it to new object
when-undefined:
only sets primary IP if undefined, will cause ERRORs if same IP is
assigned more then once to different hosts and IP is set as the
objects primary IP
never: don't set any primary IPs, will cause the same ERRORs
as "when-undefined"
""",
default_value="when-undefined"),
ConfigOption("skip_vm_comments",
bool,
description="Do not sync notes from a VM in vCenter to the comments field on a VM in netbox",
default_value=False),
ConfigOption("skip_vm_templates",
bool,
description="Do not sync template VMs",
default_value=True),
ConfigOption("skip_offline_vms",
bool,
description="""\
Skip virtual machines which are reported as offline.
ATTENTION: this option will keep purging stopped VMs if activated!
""",
default_value=False),
ConfigOption("skip_srm_placeholder_vms",
bool,
description="""If the VMware Site Recovery Manager is used to can skip syncing
placeholder/replicated VMs from fail-over site to NetBox.""",
default_value=False),
ConfigOption("strip_host_domain_name",
bool,
description="strip domain part from host name before syncing device to NetBox",
default_value=False),
ConfigOption("strip_vm_domain_name",
bool,
description="strip domain part from VM name before syncing VM to NetBox",
default_value=False),
ConfigOptionGroup(title="tag source",
description="""\
sync tags assigned to clusters, hosts and VMs in vCenter to NetBox
INFO: this requires the installation of the 'vsphere-automation-sdk',
see docs about installation possible values:
* object : the host or VM itself
* parent_folder_1 : the direct folder this object is organized in (1 level up)
* parent_folder_2 : the indirect folder this object is organized in (2 levels up)
* cluster : the cluster this object is organized in
* datacenter : the datacenter this object is organized in
this is a comma separated list of options. example: vm_tag_source = object, cluster
""",
config_example="Example: vm_tag_source = object, cluster",
options=[
ConfigOption("cluster_tag_source", str),
ConfigOption("host_tag_source", str),
ConfigOption("vm_tag_source", str)
]),
ConfigOption("sync_custom_attributes",
bool,
description="""sync custom attributes defined for hosts and VMs
in vCenter to NetBox as custom fields""",
default_value=False),
ConfigOptionGroup(title="custom object attributes",
description="""\
add arbitrary host/vm object attributes as custom fields to NetBox.
multiple attributes can be defined comma separated.
to get a list of available attributes use '-l DEBUG3' as cli param (CAREFUL: output might be long)
and here 'https://gist.github.com/bb-Ricardo/538768487bdac4efafabe56e005cb4ef' can be seen how to
access these attributes
""",
options=[
ConfigOption("host_custom_object_attributes",
str,
config_example="summary.runtime.bootTime"),
ConfigOption("vm_custom_object_attributes",
str,
config_example="config.uuid")
]),
ConfigOption("set_source_name_as_cluster_group",
bool,
description="""this will set the sources name as cluster group name instead of the datacenter.
This works if the vCenter has ONLY ONE datacenter configured.
Otherwise it will rename all datacenters to the source name!""",
default_value=False),
ConfigOption("sync_vm_dummy_interfaces",
bool,
description="""activating this option will also include "dummy/virtual" interfaces
which are only visible inside the VM and are exposed through VM guest tools.
Dummy interfaces without an IP address will be skipped.""",
default_value=False),
ConfigOption("disable_vlan_sync",
bool,
description="disables syncing of any VLANs visible in vCenter to NetBox",
default_value=False),
ConfigOption("track_vm_host",
bool,
description="""enabling this option will add the ESXi host
this VM is running on to the VM details""",
default_value=False),
ConfigOption("overwrite_device_interface_name",
bool,
description="""define if the name of the device interface discovered overwrites the
interface name in NetBox. The interface will only be matched by identical MAC address""",
default_value=True),
ConfigOption("overwrite_vm_interface_name",
bool,
description="""define if the name of the VM interface discovered overwrites the
interface name in NetBox. The interface will only be matched by identical MAC address""",
default_value=True),
ConfigOption("host_management_interface_match",
str,
description="""set a matching value for ESXi host management interface description
(case insensitive, comma separated). Used to figure out the ESXi primary IP address""",
default_value="management, mgmt"),
ConfigOption("ip_tenant_inheritance_order",
str,
description="""\
define in which order the IP address tenant will be assigned if tenant is undefined.
possible values:
* device : host or VM tenant will be assigned to the IP address
* prefix : if the IP address belongs to an existing prefix and this prefix has a tenant assigned, then this one is used
* disabled : no tenant assignment to the IP address will be performed
the order of the definition is important, the default is "device, prefix" which means:
If the device has a tenant then this one will be used. If not, the prefix tenant will be used if defined
""",
default_value="device, prefix"
),
# removed settings
ConfigOption("netbox_host_device_role",
str,
deprecation_message="You need to switch to 'host_role_relation'.",
removed=True),
ConfigOption("netbox_vm_device_role",
str,
deprecation_message="You need to switch to 'vm_role_relation'.",
removed=True),
ConfigOption("sync_tags",
bool,
deprecation_message="You need to switch to 'host_tag_source', 'vm_tag_source' or 'cluster_tag_source'",
removed=True),
ConfigOption("sync_parent_tags",
bool,
deprecation_message="You need to switch to 'host_tag_source', 'vm_tag_source' or 'cluster_tag_source'",
removed=True)
]
super().__init__()
def validate_options(self):
valid_tag_sources = [
"object", "parent_folder_1", "parent_folder_2", "cluster", "datacenter"
]
for option in self.options:
if option.value is None:
continue
if "filter" in option.key:
re_compiled = None
try:
re_compiled = re.compile(option.value)
except Exception as e:
log.error(f"Problem parsing regular expression for '{self.source_name}.{option.key}': {e}")
self.set_validation_failed()
option.set_value(re_compiled)
continue
if "relation" in option.key:
relation_data = list()
relation_type = option.key.split("_")[1]
for relation in quoted_split(option.value):
object_name = relation.split("=")[0].strip(' "')
relation_name = relation.split("=")[1].strip(' "')
if len(object_name) == 0 or len(relation_name) == 0:
log.error(f"Config option '{relation}' malformed got '{object_name}' for "
f"object name and '{relation_name}' for {relation_type} name.")
self.set_validation_failed()
continue
try:
re_compiled = re.compile(object_name)
except Exception as e:
log.error(f"Problem parsing regular expression '{object_name}' for '{relation}': {e}")
self.set_validation_failed()
continue
relation_data.append({
"object_regex": re_compiled,
"assigned_name": relation_name
})
option.set_value(relation_data)
continue
if "custom_object_attributes" in option.key:
option.set_value(quoted_split(option.value))
continue
if "tag_source" in option.key:
option.set_value(quoted_split(option.value))
for tag_source_option in option.value:
if tag_source_option not in valid_tag_sources:
log.error(f"Tag source '{tag_source_option}' for '{option.key}' option invalid.")
self.set_validation_failed()
continue
if option.key == "set_primary_ip":
if option.value not in ["always", "when-undefined", "never"]:
log.error(f"Primary IP option '{option.key}' value '{option.value}' invalid.")
self.set_validation_failed()
if option.key == "custom_dns_servers":
dns_name_lookup = self.get_option_by_name("dns_name_lookup")
if not isinstance(dns_name_lookup, ConfigOption) or dns_name_lookup.value is False:
continue
custom_dns_servers = quoted_split(option.value)
tested_custom_dns_servers = list()
for custom_dns_server in custom_dns_servers:
try:
tested_custom_dns_servers.append(str(ip_address(custom_dns_server)))
except ValueError:
log.error(f"Config option 'custom_dns_servers' value '{custom_dns_server}' "
f"does not appear to be an IP address.")
self.set_validation_failed()
option.set_value(tested_custom_dns_servers)
continue
if option.key == "host_management_interface_match":
option.set_value(quoted_split(option.value))
continue
if option.key == "ip_tenant_inheritance_order":
option.set_value(quoted_split(option.value))
for ip_tenant_inheritance in option.value:
if ip_tenant_inheritance not in ["device", "prefix", "disabled"]:
log.error(f"Config value '{ip_tenant_inheritance}' invalid for "
f"config option 'ip_tenant_inheritance_order'!")
self.set_validation_failed()
if len(option.value) > 2:
log.error("Config option 'ip_tenant_inheritance_order' can contain only 2 items max")
self.set_validation_failed()
permitted_subnets_option = self.get_option_by_name("permitted_subnets")
if permitted_subnets_option is not None:
permitted_subnets = PermittedSubnets(permitted_subnets_option.value)
if permitted_subnets.validation_failed is True:
self.set_validation_failed()
permitted_subnets_option.set_value(permitted_subnets)
+117 -340
View File
@@ -9,9 +9,8 @@
import datetime
import pprint
import re
import ssl
from ipaddress import ip_address, ip_network, ip_interface
from ipaddress import ip_address, ip_interface
from urllib.parse import unquote
import urllib3
@@ -26,9 +25,11 @@ from pyVmomi import vim
from pyVmomi.VmomiSupport import VmomiJSONEncoder
from module.sources.common.source_base import SourceBase
from module.sources.vmware.config import VMWareConfig
from module.common.logging import get_logger, DEBUG3
from module.common.misc import grab, dump, get_string_or_none, plural, quoted_split
from module.common.support import normalize_mac_address, ip_valid_to_add_to_netbox
from module.common.misc import grab, dump, get_string_or_none, plural
from module.common.support import normalize_mac_address
from module.netbox.inventory import NetBoxInventory
from module.netbox.object_classes import (
NetBoxInterfaceType,
NBTag,
@@ -92,73 +93,6 @@ class VMWareHandler(SourceBase):
NBCustomField
]
settings = {
"enabled": True,
"host_fqdn": None,
"port": 443,
"username": None,
"password": None,
"validate_tls_certs": False,
"proxy_host": None,
"proxy_port": None,
"cluster_exclude_filter": None,
"cluster_include_filter": None,
"host_exclude_filter": None,
"host_include_filter": None,
"vm_exclude_filter": None,
"vm_include_filter": None,
"permitted_subnets": None,
"collect_hardware_asset_tag": True,
"match_host_by_serial": True,
"cluster_site_relation": None,
"cluster_tag_relation": None,
"cluster_tenant_relation": None,
"cluster_tag_source": None,
"host_role_relation": None,
"host_site_relation": None,
"host_tag_relation": None,
"host_tenant_relation": None,
"host_tag_source": None,
"vm_platform_relation": None,
"vm_role_relation": None,
"vm_tag_relation": None,
"vm_tenant_relation": None,
"vm_tag_source": None,
"dns_name_lookup": False,
"custom_dns_servers": None,
"set_primary_ip": "when-undefined",
"skip_vm_comments": False,
"skip_vm_templates": True,
"skip_offline_vms": False,
"skip_srm_placeholder_vms": False,
"strip_host_domain_name": False,
"strip_vm_domain_name": False,
"sync_custom_attributes": False,
"host_custom_object_attributes": None,
"vm_custom_object_attributes": None,
"set_source_name_as_cluster_group": False,
"sync_vm_dummy_interfaces": False,
"disable_vlan_sync": False,
"overwrite_device_interface_name": True,
"overwrite_vm_interface_name": True,
"track_vm_host": False,
"host_management_interface_match": "management, mgmt",
"ip_tenant_inheritance_order": "device, prefix"
}
deprecated_settings = {}
removed_settings = {
"netbox_host_device_role": "host_role_relation",
"netbox_vm_device_role": "vm_role_relation",
"sync_tags": "host_tag_source', 'vm_tag_source' or 'cluster_tag_source",
"sync_parent_tags": "host_tag_source', 'vm_tag_source' or 'cluster_tag_source"
}
init_successful = False
inventory = None
name = None
source_tag = None
source_type = "vmware"
recursion_level = 0
@@ -169,20 +103,23 @@ class VMWareHandler(SourceBase):
site_name = None
def __init__(self, name=None, settings=None, inventory=None):
def __init__(self, name=None):
if name is None:
raise ValueError(f"Invalid value for attribute 'name': '{name}'.")
self.inventory = inventory
self.inventory = NetBoxInventory()
self.name = name
self.parse_config_settings(settings)
# parse settings
settings_handler = VMWareConfig()
settings_handler.source_name = self.name
self.settings = settings_handler.parse()
self.source_tag = f"Source: {name}"
self.set_source_tag()
self.site_name = f"vCenter: {name}"
if self.enabled is False:
if self.settings.enabled is False:
log.info(f"Source '{name}' is currently disabled. Skipping")
return
@@ -213,166 +150,6 @@ class VMWareHandler(SourceBase):
self.objects_to_reevaluate = list()
self.parsing_objects_to_reevaluate = False
def parse_config_settings(self, config_settings):
"""
Validate parsed settings from config file
Parameters
----------
config_settings: dict
dict of config settings
"""
validation_failed = False
for setting in ["host_fqdn", "port", "username", "password"]:
if config_settings.get(setting) is None:
log.error(f"Config option '{setting}' in 'source/{self.name}' can't be empty/undefined")
validation_failed = True
# check permitted ip subnets
permitted_subnets = list()
excluded_subnets = list()
self.settings["excluded_subnets"] = excluded_subnets
if config_settings.get("permitted_subnets") is None:
log.info(f"Config option 'permitted_subnets' in 'source/{self.name}' is undefined. "
f"No IP addresses will be populated to NetBox!")
else:
config_settings["permitted_subnets"] = \
[x.strip() for x in config_settings.get("permitted_subnets").split(",") if x.strip() != ""]
# add "invisible" config option
self.settings["excluded_subnets"] = None
for subnet in config_settings["permitted_subnets"]:
excluded = False
if subnet[0] == "!":
excluded = True
subnet = subnet[1:].strip()
try:
if excluded is True:
excluded_subnets.append(ip_network(subnet))
else:
permitted_subnets.append(ip_network(subnet))
except Exception as e:
log.error(f"Problem parsing permitted subnet: {e}")
validation_failed = True
config_settings["permitted_subnets"] = permitted_subnets
config_settings["excluded_subnets"] = excluded_subnets
# check include and exclude filter expressions
for setting in [x for x in config_settings.keys() if "filter" in x]:
if config_settings.get(setting) is None or config_settings.get(setting).strip() == "":
continue
re_compiled = None
try:
re_compiled = re.compile(config_settings.get(setting))
except Exception as e:
log.error(f"Problem parsing regular expression for '{setting}': {e}")
validation_failed = True
config_settings[setting] = re_compiled
for relation_option in [x for x in self.settings.keys() if "relation" in x]:
if config_settings.get(relation_option) is None:
continue
relation_data = list()
relation_type = relation_option.split("_")[1]
for relation in quoted_split(config_settings.get(relation_option)):
object_name = relation.split("=")[0].strip(' "')
relation_name = relation.split("=")[1].strip(' "')
if len(object_name) == 0 or len(relation_name) == 0:
log.error(f"Config option '{relation}' malformed got '{object_name}' for "
f"object name and '{relation_name}' for {relation_type} name.")
validation_failed = True
try:
re_compiled = re.compile(object_name)
except Exception as e:
log.error(f"Problem parsing regular expression '{object_name}' for '{relation}': {e}")
validation_failed = True
continue
relation_data.append({
"object_regex": re_compiled,
f"assigned_name": relation_name
})
config_settings[relation_option] = relation_data
for custom_object_option in [x for x in self.settings.keys() if "custom_object_attributes" in x]:
if config_settings.get(custom_object_option) is None:
continue
config_settings[custom_object_option] = quoted_split(config_settings.get(custom_object_option))
for tag_source in [x for x in self.settings.keys() if "tag_source" in x]:
config_settings[tag_source] = quoted_split(config_settings.get(tag_source))
valid_tag_sources = [
"object", "parent_folder_1", "parent_folder_2", "cluster", "datacenter"
]
for tag_source_option in config_settings[tag_source]:
if tag_source_option not in valid_tag_sources:
log.error(f"Tag source '{tag_source_option}' for '{tag_source}' option invalid.")
validation_failed = True
continue
if config_settings.get("dns_name_lookup") is True and config_settings.get("custom_dns_servers") is not None:
custom_dns_servers = \
[x.strip() for x in config_settings.get("custom_dns_servers").split(",") if x.strip() != ""]
tested_custom_dns_servers = list()
for custom_dns_server in custom_dns_servers:
try:
tested_custom_dns_servers.append(str(ip_address(custom_dns_server)))
except ValueError:
log.error(f"Config option 'custom_dns_servers' value '{custom_dns_server}' "
f"does not appear to be an IP address.")
validation_failed = True
config_settings["custom_dns_servers"] = tested_custom_dns_servers
if len(config_settings.get("host_management_interface_match") or "") > 0:
host_management_interface_match = config_settings.get("host_management_interface_match")
else:
host_management_interface_match = self.settings.get("host_management_interface_match")
config_settings["host_management_interface_match"] = \
[x.strip() for x in host_management_interface_match.split(",")]
config_settings["ip_tenant_inheritance_order"] = \
[x.strip() for x in grab(config_settings, "ip_tenant_inheritance_order", fallback="").split(",")]
for ip_tenant_inheritance in config_settings["ip_tenant_inheritance_order"]:
if ip_tenant_inheritance not in ["device", "prefix", "disabled"]:
log.error(f"Config value '{ip_tenant_inheritance}' invalid for "
f"config option 'ip_tenant_inheritance_order'!")
validation_failed = True
if len(config_settings["ip_tenant_inheritance_order"]) > 2:
log.error("Config option 'ip_tenant_inheritance_order' can contain only 2 items max")
validation_failed = True
if validation_failed is True:
log.error("Config validation failed. Exit!")
exit(1)
for setting in self.settings.keys():
setattr(self, setting, config_settings.get(setting))
def create_sdk_session(self):
"""
Initialize SDK session with vCenter
@@ -385,41 +162,42 @@ class VMWareHandler(SourceBase):
if self.session is not None:
return True
log.debug(f"Starting vCenter SDK connection to '{self.host_fqdn}'")
log.debug(f"Starting vCenter SDK connection to '{self.settings.host_fqdn}'")
ssl_context = ssl.create_default_context()
if bool(self.validate_tls_certs) is False:
if self.settings.validate_tls_certs is False:
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
connection_params = dict(
host=self.host_fqdn,
port=self.port,
host=self.settings.host_fqdn,
port=self.settings.port,
sslContext=ssl_context
)
# uses connect.SmartStubAdapter
if self.proxy_host is not None and self.proxy_port is not None:
if self.settings.proxy_host is not None and self.settings.proxy_port is not None:
connection_params.update(
httpProxyHost=self.proxy_host,
httpProxyPort=int(self.proxy_port),
httpProxyHost=self.settings.proxy_host,
httpProxyPort=self.settings.proxy_port,
)
# uses connect.SmartConnect
else:
connection_params.update(
user=self.username,
pwd=self.password,
user=self.settings.username,
pwd=self.settings.password,
)
def_exception_text = f"Unable to connect to vCenter instance '{self.host_fqdn}' on port {self.port}."
def_exception_text = f"Unable to connect to vCenter instance " \
f"'{self.settings.host_fqdn}' on port {self.settings.port}."
try:
if self.proxy_host is not None and self.proxy_port is not None:
if self.settings.proxy_host is not None and self.settings.proxy_port is not None:
smart_stub = connect.SmartStubAdapter(**connection_params)
self._sdk_instance = vim.ServiceInstance('ServiceInstance', smart_stub)
content = self._sdk_instance.RetrieveContent()
content.sessionManager.Login(self.username, self.password, None)
content.sessionManager.Login(self.settings.username, self.settings.password, None)
else:
self._sdk_instance = connect.SmartConnect(**connection_params)
@@ -430,13 +208,13 @@ class VMWareHandler(SourceBase):
log.error(f"{def_exception_text} {e.msg}")
return False
except vim.fault.NoPermission as e:
log.error(f"{def_exception_text} User {self.username} does not have required permission. {e.msg}")
log.error(f"{def_exception_text} User {self.settings.username} does not have required permission. {e.msg}")
return False
except Exception as e:
log.error(f"{def_exception_text} Reason: {e}")
return False
log.info(f"Successfully connected to vCenter SDK '{self.host_fqdn}'")
log.info(f"Successfully connected to vCenter SDK '{self.settings.host_fqdn}'")
return True
@@ -452,51 +230,51 @@ class VMWareHandler(SourceBase):
if self.tag_session is not None:
return True
if len(self.cluster_tag_source) + len(self.host_tag_source) + len(self.vm_tag_source) == 0:
source_tag_settings_list = [
self.settings.cluster_tag_source,
self.settings.host_tag_source,
self.settings.vm_tag_source
]
# check if vm tag syncing is configured
if source_tag_settings_list.count(None) == len(source_tag_settings_list):
return False
log.debug(f"Starting vCenter API connection to '{self.host_fqdn}'")
if vsphere_automation_sdk_available is False:
log.warning(f"Unable to import Python 'vsphere-automation-sdk'. Tag syncing will be disabled.")
return False
if len(self.cluster_tag_source) > 0 or len(self.host_tag_source) > 0 or len(self.vm_tag_source) > 0:
if vsphere_automation_sdk_available is False:
self.__setattr__("cluster_tag_source", list())
self.__setattr__("host_tag_source", list())
self.__setattr__("vm_tag_source", list())
log.warning(f"Unable to import Python 'vsphere-automation-sdk'. Tag syncing will be disabled.")
return False
log.debug(f"Starting vCenter API connection to '{self.settings.host_fqdn}'")
# create a requests session to enable/disable TLS verification
session = requests.session()
session.verify = bool(self.validate_tls_certs)
session.verify = self.settings.validate_tls_certs
# disable TLS insecure warnings if user explicitly switched off validation
if bool(self.validate_tls_certs) is False:
if self.settings.validate_tls_certs is False:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# adds proxy to the session
if self.proxy_host is not None and self.proxy_port is not None:
if self.settings.proxy_host is not None and self.settings.proxy_port is not None:
session.proxies.update({
"http": f"http://{self.proxy_host}:{self.proxy_port}",
"https": f"http://{self.proxy_host}:{self.proxy_port}",
"http": f"http://{self.settings.proxy_host}:{self.settings.proxy_port}",
"https": f"http://{self.settings.proxy_host}:{self.settings.proxy_port}",
})
try:
self.tag_session = create_vsphere_client(
server=f"{self.host_fqdn}:{self.port}",
username=self.username,
password=self.password,
server=f"{self.settings.host_fqdn}:{self.settings.port}",
username=self.settings.username,
password=self.settings.password,
session=session)
except Exception as e:
self.__setattr__("cluster_tag_source", list())
self.__setattr__("host_tag_source", list())
self.__setattr__("vm_tag_source", list())
log.warning(f"Unable to connect to vCenter API instance '{self.host_fqdn}' on port {self.port}: {e}")
log.warning(f"Unable to connect to vCenter API instance "
f"'{self.settings.host_fqdn}' on port {self.settings.port}: {e}")
log.warning("Tag syncing will be disabled.")
return False
log.info(f"Successfully connected to vCenter API '{self.host_fqdn}'")
log.info(f"Successfully connected to vCenter API '{self.settings.host_fqdn}'")
return True
@@ -524,7 +302,7 @@ class VMWareHandler(SourceBase):
Every update of new/existing objects fot this source has to happen here.
"""
log.info(f"Query data from vCenter: '{self.host_fqdn}'")
log.info(f"Query data from vCenter: '{self.settings.host_fqdn}'")
"""
Mapping of object type keywords to view types and handlers
@@ -580,7 +358,7 @@ class VMWareHandler(SourceBase):
}
# skip virtual machines which are reported offline
if self.skip_offline_vms is True:
if self.settings.skip_offline_vms is True:
log.info("Skipping offline VMs")
del object_mapping["offline virtual machine"]
@@ -899,30 +677,30 @@ class VMWareHandler(SourceBase):
return
tag_list = list()
if vsphere_automation_sdk_available is True:
if len(self.cluster_tag_source) + len(self.host_tag_source) + len(self.vm_tag_source) > 0:
if self.tag_session is not None:
# noinspection PyBroadException
try:
object_tag_ids = self.tag_session.tagging.TagAssociation.list_attached_tags(
DynamicID(type=grab(obj, "_wsdlName"), id=grab(obj, "_moId")))
except Exception as e:
log.error(f"Unable to retrieve vCenter tags for '{obj.name}': {e}")
return
for tag_id in object_tag_ids:
# noinspection PyBroadException
try:
object_tag_ids = self.tag_session.tagging.TagAssociation.list_attached_tags(
DynamicID(type=grab(obj, "_wsdlName"), id=grab(obj, "_moId")))
tag_name = self.tag_session.tagging.Tag.get(tag_id).name
tag_description = self.tag_session.tagging.Tag.get(tag_id).description
except Exception as e:
log.error(f"Unable to retrieve vCenter tags for '{obj.name}': {e}")
object_tag_ids = list()
log.error(f"Unable to retrieve vCenter tag '{tag_id}' for '{obj.name}': {e}")
continue
for tag_id in object_tag_ids:
# noinspection PyBroadException
try:
tag_name = self.tag_session.tagging.Tag.get(tag_id).name
tag_description = self.tag_session.tagging.Tag.get(tag_id).description
except Exception as e:
log.error(f"Unable to retrieve vCenter tag '{tag_id}' for '{obj.name}': {e}")
continue
tag_list.append(self.inventory.add_update_object(NBTag, data={
"name": tag_name,
"description": tag_description
}))
tag_list.append(self.inventory.add_update_object(NBTag, data={
"name": tag_name,
"description": tag_description
}))
return tag_list
@@ -947,15 +725,15 @@ class VMWareHandler(SourceBase):
tag_list = list()
if isinstance(obj, (vim.ClusterComputeResource, vim.ComputeResource)):
tag_source = self.cluster_tag_source
tag_source = self.settings.cluster_tag_source
elif isinstance(obj, vim.HostSystem):
tag_source = self.host_tag_source
tag_source = self.settings.host_tag_source
elif isinstance(obj, vim.VirtualMachine):
tag_source = self.vm_tag_source
tag_source = self.settings.vm_tag_source
else:
raise ValueError(f"Tags for '{grab(obj, '_wsdlName')}' are not supported")
if len(tag_source) == 0 or vsphere_automation_sdk_available is False:
if tag_source is None or self.tag_session is None:
return tag_list
log.debug2(f"Collecting tags for {obj.name}")
@@ -1003,16 +781,16 @@ class VMWareHandler(SourceBase):
return_custom_fields = dict()
custom_value = list()
if self.sync_custom_attributes is True:
if self.settings.sync_custom_attributes is True:
custom_value = grab(obj, "customValue", fallback=list())
if grab(obj, "_wsdlName") == "VirtualMachine":
content_type = "virtualization.virtualmachine"
custom_object_attributes = self.vm_custom_object_attributes or list()
custom_object_attributes = self.settings.vm_custom_object_attributes or list()
object_attribute_prefix = "vm"
else:
content_type = "dcim.device"
custom_object_attributes = self.host_custom_object_attributes or list()
custom_object_attributes = self.settings.host_custom_object_attributes or list()
object_attribute_prefix = "host"
# add basic host data to device
@@ -1132,7 +910,7 @@ class VMWareHandler(SourceBase):
"""
resolved_list = list()
for single_relation in grab(self, relation, fallback=list()):
for single_relation in grab(self.settings, relation, fallback=list()):
object_regex = single_relation.get("object_regex")
match_found = False
if object_regex.match(name):
@@ -1269,7 +1047,7 @@ class VMWareHandler(SourceBase):
if object_type == NBDevice:
if device_vm_object is None and object_data.get("serial") is not None and \
bool(self.match_host_by_serial) is True:
self.settings.match_host_by_serial is True:
log.debug2(f"No match found. Trying to find {object_type.name} based on serial number")
device_vm_object = self.inventory.get_by_data(object_type, data={"serial": object_data.get("serial")})
@@ -1338,9 +1116,9 @@ class VMWareHandler(SourceBase):
for int_name, int_data in nic_data.items():
if object_type == NBDevice and self.overwrite_device_interface_name is False:
if object_type == NBDevice and self.settings.overwrite_device_interface_name is False:
del int_data["name"]
if object_type == NBVM and self.overwrite_vm_interface_name is False:
if object_type == NBVM and self.settings.overwrite_vm_interface_name is False:
del int_data["name"]
# add/update interface with retrieved data
@@ -1363,7 +1141,7 @@ class VMWareHandler(SourceBase):
# set/update/remove primary IP addresses
set_this_primary_ip = False
ip_version = ip_interface_object.ip.version
if self.set_primary_ip == "always":
if self.settings.set_primary_ip == "always":
for object_type in [NBDevice, NBVM]:
@@ -1386,7 +1164,8 @@ class VMWareHandler(SourceBase):
set_this_primary_ip = True
elif self.set_primary_ip != "never" and grab(device_vm_object, f"data.primary_ip{ip_version}") is None:
elif self.settings.set_primary_ip != "never" and \
grab(device_vm_object, f"data.primary_ip{ip_version}") is None:
set_this_primary_ip = True
if set_this_primary_ip is True:
@@ -1472,7 +1251,7 @@ class VMWareHandler(SourceBase):
datacenter object
"""
if self.set_source_name_as_cluster_group is True:
if self.settings.set_source_name_as_cluster_group is True:
name = self.name
else:
name = get_string_or_none(grab(obj, "name"))
@@ -1484,7 +1263,7 @@ class VMWareHandler(SourceBase):
object_data = {"name": name}
if self.set_source_name_as_cluster_group is True:
if self.settings.set_source_name_as_cluster_group is True:
label = "Datacenter Name"
custom_field = self.add_update_custom_field({
"name": f"vcsa_{label}",
@@ -1498,8 +1277,7 @@ class VMWareHandler(SourceBase):
grab(custom_field, "data.name"): get_string_or_none(grab(obj, "name"))
}
self.add_object_to_cache(obj,
self.inventory.add_update_object(NBClusterGroup, data=object_data, source=self))
self.add_object_to_cache(obj, self.inventory.add_update_object(NBClusterGroup, data=object_data, source=self))
def add_cluster(self, obj):
"""
@@ -1513,7 +1291,7 @@ class VMWareHandler(SourceBase):
"""
name = get_string_or_none(grab(obj, "name"))
if self.set_source_name_as_cluster_group is True:
if self.settings.set_source_name_as_cluster_group is True:
group = self.inventory.get_by_data(NBClusterGroup, data={"name": self.name})
else:
group = self.get_object_from_cache(self.get_parent_object_by_class(obj, vim.Datacenter))
@@ -1523,7 +1301,7 @@ class VMWareHandler(SourceBase):
# if we're parsing a single host "cluster" and the hosts domain name should be stripped,
# then the ComputeResources domain name gets stripped as well
if isinstance(obj, vim.ComputeResource) and self.strip_host_domain_name is True:
if isinstance(obj, vim.ComputeResource) and self.settings.strip_host_domain_name is True:
name = name.split(".")[0]
group_name = grab(group, "data.name")
@@ -1532,8 +1310,12 @@ class VMWareHandler(SourceBase):
log.debug(f"Parsing vCenter cluster: {full_cluster_name}")
# check for full name and then for cluster name only
if self.passes_filter(full_cluster_name, self.cluster_include_filter, self.cluster_exclude_filter) is False \
or self.passes_filter(name, self.cluster_include_filter, self.cluster_exclude_filter) is False:
if self.passes_filter(full_cluster_name,
self.settings.cluster_include_filter,
self.settings.cluster_exclude_filter) is False \
or self.passes_filter(name,
self.settings.cluster_include_filter,
self.settings.cluster_exclude_filter) is False:
return
site_name = self.get_site_name(NBCluster, full_cluster_name)
@@ -1704,7 +1486,7 @@ class VMWareHandler(SourceBase):
name = get_string_or_none(grab(obj, "name"))
if name is not None and self.strip_host_domain_name is True:
if name is not None and self.settings.strip_host_domain_name is True:
name = name.split(".")[0]
# parse data
@@ -1741,7 +1523,7 @@ class VMWareHandler(SourceBase):
cluster_name = get_string_or_none(grab(nb_cluster_object, "data.name"))
# get a site for this host
if self.set_source_name_as_cluster_group is True:
if self.settings.set_source_name_as_cluster_group is True:
group = self.inventory.get_by_data(NBClusterGroup, data={"name": self.name})
else:
group = self.get_object_from_cache(self.get_parent_object_by_class(obj, vim.Datacenter))
@@ -1760,7 +1542,7 @@ class VMWareHandler(SourceBase):
self.processed_host_names[site_name].append(name)
# filter hosts by name
if self.passes_filter(name, self.host_include_filter, self.host_exclude_filter) is False:
if self.passes_filter(name, self.settings.host_include_filter, self.settings.host_exclude_filter) is False:
return
#
@@ -1806,7 +1588,7 @@ class VMWareHandler(SourceBase):
# add asset tag if desired and present
asset_tag = None
if bool(self.collect_hardware_asset_tag) is True and "AssetTag" in identifier_dict.keys():
if self.settings.collect_hardware_asset_tag is True and "AssetTag" in identifier_dict.keys():
banned_tags = ["Default string", "NA", "N/A", "None", "Null", "oem", "o.e.m",
"to be filled by o.e.m.", "Unknown"]
@@ -2132,7 +1914,7 @@ class VMWareHandler(SourceBase):
# check if interface has the default route or is described as management interface
vnic_is_primary = False
for management_match in self.host_management_interface_match:
for management_match in self.settings.host_management_interface_match:
if management_match in vnic_description.lower():
vnic_is_primary = True
@@ -2145,7 +1927,7 @@ class VMWareHandler(SourceBase):
int_v4 = "{}/{}".format(grab(vnic, "spec.ip.ipAddress"), grab(vnic, "spec.ip.subnetMask"))
if ip_valid_to_add_to_netbox(int_v4, self.permitted_subnets, self.excluded_subnets, vnic_name) is True:
if self.settings.permitted_subnets.permitted(int_v4, interface_name=vnic_name) is True:
vnic_ips[vnic_name].append(int_v4)
if vnic_is_primary is True and host_primary_ip4 is None:
@@ -2155,7 +1937,7 @@ class VMWareHandler(SourceBase):
int_v6 = "{}/{}".format(grab(ipv6_entry, "ipAddress"), grab(ipv6_entry, "prefixLength"))
if ip_valid_to_add_to_netbox(int_v6, self.permitted_subnets, self.excluded_subnets, vnic_name) is True:
if self.settings.permitted_subnets.permitted(int_v6, interface_name=vnic_name) is True:
vnic_ips[vnic_name].append(int_v6)
# set first valid IPv6 address as primary IPv6
@@ -2200,7 +1982,7 @@ class VMWareHandler(SourceBase):
name = get_string_or_none(grab(obj, "name"))
if name is not None and self.strip_vm_domain_name is True:
if name is not None and self.settings.strip_vm_domain_name is True:
name = name.split(".")[0]
#
@@ -2220,11 +2002,11 @@ class VMWareHandler(SourceBase):
# check if vm is template
template = grab(obj, "config.template")
if bool(self.skip_vm_templates) is True and template is True:
if bool(self.settings.skip_vm_templates) is True and template is True:
log.debug2(f"VM '{name}' is a template. Skipping")
return
if bool(self.skip_srm_placeholder_vms) is True \
if bool(self.settings.skip_srm_placeholder_vms) is True \
and f"{grab(obj, 'config.managedBy.extensionKey')}".startswith("com.vmware.vcDr"):
log.debug2(f"VM '{name}' is a SRM placeholder VM. Skipping")
return
@@ -2244,7 +2026,7 @@ class VMWareHandler(SourceBase):
if cluster_object is None:
cluster_object = self.get_parent_object_by_class(parent_host, vim.ComputeResource)
if self.set_source_name_as_cluster_group is True:
if self.settings.set_source_name_as_cluster_group is True:
group = self.inventory.get_by_data(NBClusterGroup, data={"name": self.name})
else:
group = self.get_parent_object_by_class(cluster_object, vim.Datacenter)
@@ -2276,7 +2058,7 @@ class VMWareHandler(SourceBase):
self.processed_vm_names[cluster_full_name].append(name)
# filter VMs by name
if self.passes_filter(name, self.vm_include_filter, self.vm_exclude_filter) is False:
if self.passes_filter(name, self.settings.vm_include_filter, self.settings.vm_exclude_filter) is False:
return
#
@@ -2302,7 +2084,7 @@ class VMWareHandler(SourceBase):
]) / 1024 / 1024)
annotation = None
if bool(self.skip_vm_comments) is False:
if self.settings.skip_vm_comments is False:
annotation = get_string_or_none(grab(obj, "config.annotation"))
# assign vm_tenant_relation
@@ -2328,7 +2110,7 @@ class VMWareHandler(SourceBase):
if version.parse(self.inventory.netbox_api_version) >= version.parse("3.3.0"):
vm_data["site"] = {"name": site_name}
if self.track_vm_host:
if self.settings.track_vm_host:
vm_data["device"] = {
"name": parent_name,
"cluster": nb_cluster_object,
@@ -2494,8 +2276,7 @@ class VMWareHandler(SourceBase):
int_ip_address = f"{int_ip.ipAddress}/{int_ip.prefixLength}"
if ip_valid_to_add_to_netbox(int_ip_address, self.permitted_subnets,
self.excluded_subnets, int_full_name) is False:
if self.settings.permitted_subnets.permitted(int_ip_address, interface_name=int_full_name) is False:
continue
nic_ips[int_full_name].append(int_ip_address)
@@ -2559,7 +2340,7 @@ class VMWareHandler(SourceBase):
nic_data[int_full_name] = vm_nic_data
# find dummy guest NIC interfaces
if self.sync_vm_dummy_interfaces is True:
if self.settings.sync_vm_dummy_interfaces is True:
for guest_nic in grab(obj, "guest.net", fallback=list()):
# get matching guest NIC MAC
@@ -2583,11 +2364,8 @@ class VMWareHandler(SourceBase):
int_ip_address = f"{int_ip.ipAddress}/{int_ip.prefixLength}"
if ip_valid_to_add_to_netbox(int_ip_address, self.permitted_subnets,
self.excluded_subnets, int_full_name) is False:
continue
nic_ips[int_full_name].append(int_ip_address)
if self.settings.permitted_subnets.permitted(int_ip_address, interface_name=int_full_name) is True:
nic_ips[int_full_name].append(int_ip_address)
vm_nic_data = {
"name": int_full_name,
@@ -2621,7 +2399,7 @@ class VMWareHandler(SourceBase):
self.inventory.add_update_object(NBTag, data={
"name": self.source_tag,
"description": f"Marks objects synced from vCenter '{self.name}' "
f"({self.host_fqdn}) to this NetBox Instance."
f"({self.settings.host_fqdn}) to this NetBox Instance."
})
# update virtual site if present
@@ -2644,5 +2422,4 @@ class VMWareHandler(SourceBase):
"vm_role": True
})
# EOF
+27 -37
View File
@@ -15,25 +15,16 @@ Sync objects from various sources to NetBox
from datetime import datetime
from module.common.misc import grab, get_relative_time
from module.common.misc import grab, get_relative_time, do_error_exit
from module.common.cli_parser import parse_command_line
from module.common.logging import setup_logging
from module.common.configuration import get_config_file, open_config_file, get_config
from module.netbox.connection import NetBoxHandler
from module.netbox.inventory import NetBoxInventory
from module.netbox.object_classes import *
from module.sources import instantiate_sources
__version__ = "1.3.0"
__version_date__ = "2022-09-06"
__author__ = "Ricardo Bartels <ricardo.bartels@telekom.de>"
__description__ = "NetBox Sync"
__license__ = "MIT"
__url__ = "https://github.com/bb-ricardo/netbox-sync"
default_log_level = "INFO"
default_config_file_path = "./settings.ini"
from module.config.parser import ConfigParser
from module.common.config import CommonConfig
from module.config.file_output import ConfigFileOutput
from module import __version__, __version_date__, __description__
def main():
@@ -41,46 +32,45 @@ def main():
start_time = datetime.now()
# parse command line
args = parse_command_line(self_description=self_description,
version=__version__,
version_date=__version_date__,
url=__url__,
default_config_file_path=default_config_file_path)
args = parse_command_line(self_description=self_description)
# get config file path
config_file = get_config_file(args.config_file)
# write out default config file and exit if "generate_config" is defined
ConfigFileOutput(args)
# get config handler
config_handler = open_config_file(config_file)
# parse config files and environment variables
config_parse_handler = ConfigParser()
config_parse_handler.add_config_file_list(args.config_files)
config_parse_handler.read_config()
# get logging configuration
# read common config
common_config = CommonConfig().parse(do_log=False)
# set log level
log_level = default_log_level
# config overwrites default
log_level = config_handler.get("common", "log_level", fallback=log_level)
# cli option overwrites config file
log_level = grab(args, "log_level", fallback=log_level)
log_level = grab(args, "log_level", fallback=common_config.log_level)
log_file = None
if bool(config_handler.getboolean("common", "log_to_file", fallback=False)) is True:
log_file = config_handler.get("common", "log_file", fallback=None)
if common_config.log_to_file is True:
log_file = common_config.log_file
# setup logging
log = setup_logging(log_level, log_file)
# now we are ready to go
log.info(f"Starting {__description__} v{__version__} ({__version_date__})")
log.debug(f"Using config file: {config_file}")
for config_file in config_parse_handler.file_list:
log.debug(f"Using config file: {config_file}")
# exit if any parser errors occurred here
config_parse_handler.log_end_exit_on_errors()
# just to print config options to log/console
CommonConfig().parse()
# initialize an empty inventory which will be used to hold and reference all objects
inventory = NetBoxInventory()
# get config for NetBox handler
netbox_settings = get_config(config_handler, section="netbox", valid_settings=NetBoxHandler.settings)
# establish NetBox connection
nb_handler = NetBoxHandler(settings=netbox_settings, inventory=inventory, nb_sync_version=__version__)
nb_handler = NetBoxHandler()
# if purge was selected we go ahead and remove all items which were managed by this tools
if args.purge is True:
@@ -95,7 +85,7 @@ def main():
# instantiate source handlers and get attributes
log.info("Initializing sources")
sources = instantiate_sources(config_handler, inventory)
sources = instantiate_sources()
# all sources are unavailable
if len(sources) == 0:
+1
View File
@@ -5,3 +5,4 @@ requests==2.28.1
pyvmomi==7.0.3
aiodns==3.0.0
setuptools>=62.00.0
pyyaml==6.0
+292 -276
View File
@@ -1,363 +1,379 @@
### Welcome to the NetBox-Sync configuration file.
;;; Welcome to the NetBox Sync configuration file.
;;; Version: 1.3.0 (2022-09-06)
;;; Project URL: https://github.com/bb-ricardo/netbox-sync
# The values in this file override the default values used by the system if
# a config option is not specified. The commented out lines are the configuration
# field and the default value used. Uncommenting a line and changing the value
# will change the value used at runtime when the process is restarted.
; The values in this file override the default values used by the system if a config
; option is not specified. The commented out lines are the configuration field and the
; default value used. Uncommenting a line and changing the value will change the value
; used at runtime when the process is restarted.
###
### [common]
###
### Controls the parameters for logging.
###
;;;
;;; [common]
;;;
;;; Controls the parameters for logging
;;;
[common]
# Logs will always be printed to stdout/stderr.
# Logging can be set to following log levels
# ERROR: Fatal Errors which stops regular a run
# WARNING: Warning messages won't stop the syncing process but mostly worth
# to have a look at.
# INFO: Information about objects that will be create/updated/deleted in NetBox
# DEBUG: Will log information about retrieved information, changes in internal
# data structure and parsed config
# DEBUG2: Will also log information about how/why data is parsed or skipped.
# DEBUG3: Logs all source and NetBox queries/results to stdout. Very useful for
# troubleshooting, but will log any sensitive data contained within a query.
#
#log_level = INFO
; Logs will always be printed to stdout/stderr.
; Logging can be set to following log levels:
; ERROR: Fatal Errors which stops regular a run
; WARNING: Warning messages won't stop the syncing process but mostly worth
; to have a look at.
; INFO: Information about objects that will be create/updated/deleted in NetBox
; DEBUG: Will log information about retrieved information, changes in internal
; data structure and parsed config
; DEBUG2: Will also log information about how/why data is parsed or skipped.
; DEBUG3: Logs all source and NetBox queries/results to stdout. Very useful for
; troubleshooting, but will log any sensitive data contained within a query.
;log_level = INFO
# Enabling this options will write all logs to a log file defined in "log_file"
#log_to_file = False
; Enabling this options will write all logs to a log file defined in 'log_file'
;log_to_file = False
# Destination of the log file if "log_to_file" is enabled.
# Log file will be rotated maximum 5 times once the log file reaches size of 10 MB
#log_file = log/netbox_sync.log
; Destination of the log file if "log_to_file" is enabled. Log file will be rotated
; maximum 5 times once the log file reaches size of 10 MB
;log_file = log/netbox_sync.log
###
### [netbox]
###
### Controls the connection parameters to your netBox instance
###
;;;
;;; [netbox]
;;;
;;; Controls the connection parameters to your netBox instance
;;;
[netbox]
# Requires an NetBox API token with full permissions on all objects except:
# * auth
# * secrets
# * users
; Requires an NetBox API token with full permissions on all objects except 'auth',
; 'secrets' and 'users'
api_token = XYZ
# Requires a hostname or IP which points to your NetBox instance
; Requires a hostname or IP which points to your NetBox instance
host_fqdn = netbox.example.com
# Define the port your NetBox instance is listening on. If "disable_tls" is
# set to "true" this option might be set to 80
#port = 443
; Define the port your NetBox instance is listening on. If 'disable_tls' is set to "true"
; this option might be set to 80
;port = 443
# Weather TLS encryption is enabled or disabled.
#disable_tls = false
; Whether TLS encryption is enabled or disabled
;disable_tls = False
# Enforces TLS certificate validation. If this system doesn't trust the NetBox
# web server certificate then this option needs to be changed
#validate_tls_certs = true
; Enforces TLS certificate validation. If this system doesn't trust the NetBox web server
; certificate then this option needs to be changed
;validate_tls_certs = True
# Defines a proxy which will be used to connect to NetBox.
# proxy setting needs to include the scheme.
# proxy basic auth example: http://user:pass@10.10.1.10:3128
#proxy = http://example.com:3128
; Defines a proxy which will be used to connect to NetBox. Proxy setting needs to include
; the schema. Proxy basic auth example: http://user:pass@10.10.1.10:312
;proxy = http://example.com:3128
# Specify a client certificate which can be used to authenticate to NetBox.
#client_cert = client.pem
; Specify a client certificate which can be used to authenticate to NetBox
;client_cert = client.pem
# Specify the client certificate private key belonging to the client cert.
#client_cert_key = client.key
; Specify the client certificate private key belonging to the client cert
;client_cert_key = client.key
# Whether items which were created by this program but can't be found in any
# source anymore will be deleted or not.
#prune_enabled = false
; Whether items which were created by this program but can't be found in any source
; anymore will be deleted or not
;prune_enabled = False
# Orphaned objects will first be tagged before they get deleted. Once the amount
# of days passed the object will actually be deleted.
#prune_delay_in_days = 30
; Orphaned objects will first be tagged before they get deleted. Once the amount of days
; passed the object will actually be deleted
;prune_delay_in_days = 30
# This will tell netbox-sync to ignore objects in NetBox with tag 'NetBox-synced' from pruning
# if the source is not defined in this config file.
# https://github.com/bb-Ricardo/netbox-sync/issues/176
#ignore_unknown_source_object_pruning = False
; This will tell netbox-sync to ignore objects in NetBox with tag 'NetBox-synced' from
; pruning if the source is not defined in this config file (https://github.com/bb-
; Ricardo/netbox-sync/issues/176)
;ignore_unknown_source_object_pruning = False
# The maximum number of objects returned in a single request. If a NetBox instance
# is very quick responding the value should be raised.
#default_netbox_result_limit = 200
; The maximum number of objects returned in a single request. If a NetBox instance is very
; quick responding the value should be raised
;default_netbox_result_limit = 200
# The maximum time a query is allowed to execute before being killed and considered failed.
#timeout = 30
; The maximum time a query is allowed to execute before being killed and considered failed
;timeout = 30
# The amount of times a failed request will be reissued. Once the maximum is reached the
# syncing process will be stopped completely.
#max_retry_attempts = 4
; The amount of times a failed request will be reissued. Once the maximum is reached the
; syncing process will be stopped completely.
;max_retry_attempts = 4
# Defines if caching of NetBox objects is used or not. If problems with unresolved
# dependencies occur, switching off caching might help.
#use_caching = true
; Defines if caching of NetBox objects is used or not. If problems with unresolved
; dependencies occur, switching off caching might help.
;use_caching = True
# The location of the directory where the cache files should be stored
#cache_directory_location = cache
###
### [sources/*]
###
### Controls the parameters of a defined source. The string past the slash
### will be used as a sources name. Sources can be defined multiple times to
### represent different sources. It is planned to support different types of sources.
; The location of the directory where the cache files should be stored
;cache_directory_location = cache
;;;
;;; [source/*]
;;;
;;; Controls the parameters of a defined source. The string past the slash will be used as
;;; a sources name. Sources can be defined multiple times to represent different sources.
;;;
[source/my-vcenter-example]
# Defines if this source is enabled or not
#enabled = true
; Defines if this source is enabled or not
;enabled = True
# type of source. This defines which source handler to use.
; type of source. This defines which source handler to use
type = vmware
# host name / IP address of the vCenter
; host name / IP address of the vCenter
host_fqdn = vcenter.example.com
# TCP port to connect to
#port = 443
; TCP port to connect to
;port = 443
# EXPERIMENTAL: Connect to a vCenter using a proxy (socks proxies are not supported)
#proxy_host = 10.10.1.10
#proxy_port = 3128
; username to use to log into vCenter
username = vcenter-readonly
# Enforces TLS certificate validation. If vCenter uses a valid TLS certificate then
# this option should be set to 'true' to ensure a secure connection.
#validate_tls_certs = false
; password to use to log into vCenter
password = super-secret
# username and password to use to log into vCenter
username = vcenteruser
password = supersecret
; Enforces TLS certificate validation. If vCenter uses a valid TLS certificate then this
; option should be set to 'true' to ensure a secure connection.
;validate_tls_certs = False
# IP networks eligible to be synced to NetBox.
# If an IP address is not part of this networks then it WON'T be synced to NetBox.
# To excluded small blocks from bigger IP blocks a leading '!' has to be added.
# example: 10.0.0.0/8, !10.23.42.0/24
permitted_subnets = 172.16.0.0/12, 10.0.0.0/8, 192.168.0.0/16, fd00::/8
; EXPERIMENTAL: Connect to a vCenter using a proxy server (socks proxies are not
; supported). define a host name or an IP address
;proxy_host = 10.10.1.10
# filters can be used to include/exclude certain objects from importing into NetBox
# Include filters are checked first and exclude filters after. An object name has to
# pass both filters to be synced to NetBox. If a filter is unset it will be ignored.
# Filters are all treated as regex expressions!
# If more then one expression should match a '|' needs to be used
# Example: (exclude all VMs with "replica" in their name and all VMs starting with "backup")
# vm_exclude_filter = .*replica.*|^backup.*
; EXPERIMENTAL: Connect to a vCenter using a proxy server (socks proxies are not
; supported). define proxy server port number
;proxy_port = 3128
# If a cluster is excluded from sync then ALL VMs and HOSTS inside the cluster will
# be ignored!
# a cluster can be specified as "Cluster-name" or "Datacenter-name/Cluster-name" if
# multiple clusters have the same name
#cluster_exclude_filter =
#cluster_include_filter =
; IP networks eligible to be synced to NetBox. If an IP address is not part of this
; networks then it WON'T be synced to NetBox. To excluded small blocks from bigger IP
; blocks a leading '!' has to be added
;permitted_subnets = 172.16.0.0/12, 10.0.0.0/8, 192.168.0.0/16, fd00::/8, !10.23.42.0/24
# This will only include/exclude the host, not the VM if Host is part of a multi host
# cluster.
#host_exclude_filter =
#host_include_filter =
; filter options
# simply include/exclude VMs
#vm_exclude_filter =
#vm_include_filter =
; filters can be used to include/exclude certain objects from importing into NetBox.
; Include filters are checked first and exclude filters after. An object name has to pass
; both filters to be synced to NetBox. If a filter is unset it will be ignored. Filters
; are all treated as regex expressions! If more then one expression should match, a '|'
; needs to be used
;
; Example: (exclude all VMs with "replica" in their name and all VMs starting with
; "backup"): vm_exclude_filter = .*replica.*|^backup.*
# This option defines which vCenter cluster is part of a NetBox site. This is done
# with a comma separated key = value list.
# key: defines the cluster name as regex
# value: defines the NetBox site name (use quotes if name contains commas)
# This is a quite important config setting as IP addresses, prefixes, VLANs and
# VRFs are site dependent. In order to assign the correct prefix to an IP
# address it is important to pick the correct site.
# A VM always depends on the cluster site relation
# a cluster can be specified as "Cluster-name" or "Datacenter-name/Cluster-name" if
# multiple clusters have the same name
#cluster_site_relation = Cluster_NYC = New York, Cluster_FFM.* = Frankfurt, Datacenter_TOKIO/.* = Tokio
; If a cluster is excluded from sync then ALL VMs and HOSTS inside the cluster will be
; ignored! a cluster can be specified as "Cluster-name" or "Datacenter-name/Cluster-name"
; if multiple clusters have the same name
;cluster_exclude_filter =
;cluster_include_filter =
# Same as cluster site but on host level. If unset it will fall back
# to cluster_site_relation.
#host_site_relation = nyc02.* = New York, ffm01.* = Frankfurt
; This will only include/exclude the host, not the VM if Host is part of a multi host
; cluster
;host_exclude_filter =
;host_include_filter =
# This option defines which cluster/host/VM belongs to which tenant. This is done
# with a comma separated key = value list.
# key: defines a hosts/VM name as regex
# value: defines the NetBox tenant name (use quotes if name contains commas)
# a cluster can be specified as "Cluster-name" or "Datacenter-name/Cluster-name" if
# multiple clusters have the same name
#cluster_tenant_relation = Cluster_NYC.* = Customer A
#host_tenant_relation = esxi300.* = Infrastructure
#vm_tenant_relation = grafana.* = Infrastructure
; simply include/exclude VMs
;vm_exclude_filter =
;vm_include_filter =
# This option defines custom platforms if the VMWare created platforms are not suitable.
# Pretty much a mapping of VMWare platform name to your own platform name.
# This is done with a comma separated key = value list.
# key: defines a VMWare returned platform name
# value: defines the desired NetBox platform name
#vm_platform_relation = centos-7.* = centos7, microsoft-windows-server-2016.* = Windows2016
; relations options
# Define the NetBox device role used for hosts and VMs. The default is set to "Server". This is done
# with a comma separated key = value list.
# key: defines a hosts/VM name as regex
# value: defines the NetBox role name (use quotes if name contains commas)
#host_role_relation = .* = Server
#vm_role_relation = .* = Server
; This option defines which vCenter cluster is part of a NetBox site.
; This is done with a comma separated key = value list.
; key: defines the cluster name as regex
; value: defines the NetBox site name (use quotes if name contains commas)
; This is a quite important config setting as IP addresses, prefixes, VLANs
; and VRFs are site dependent. In order to assign the correct prefix to an IP
; address it is important to pick the correct site.
; A VM always depends on the cluster site relation
; a cluster can be specified as "Cluster-name" or
; "Datacenter-name/Cluster-name" if multiple clusters have the same name
;cluster_site_relation = Cluster_NYC = New York, Cluster_FFM.* = Frankfurt, Datacenter_TOKIO/.* = Tokio
# Define NetBox tags which are assigned to a cluster, host or VM. This is done
# with a comma separated key = value list.
# key: defines a hosts/VM name as regex
# value: defines the NetBox tag (use quotes if name contains commas)
# a cluster can be specified as "Cluster-name" or "Datacenter-name/Cluster-name" if
# multiple clusters have the same name
#cluster_tag_relation = Cluster_NYC.* = Infrastructure
#host_tag_relation = esxi300.* = Infrastructure
#vm_tag_relation = grafana.* = Infrastructure
; Same as cluster site but on host level. If unset it will fall back to
; cluster_site_relation
;host_site_relation = nyc02.* = New York, ffm01.* = Frankfurt
# Try to find existing host based on serial number. This can cause issues with blade centers if VMWare does not
# report the blades serial number properly.
#match_host_by_serial = True
; This option defines which cluster/host/VM belongs to which tenant.
; This is done with a comma separated key = value list.
; key: defines a hosts/VM name as regex
; value: defines the NetBox tenant name (use quotes if name contains commas)
; a cluster can be specified as "Cluster-name" or
; "Datacenter-name/Cluster-name" if multiple clusters have the same name
;cluster_tenant_relation = Cluster_NYC.* = Customer A
;host_tenant_relation = esxi300.* = Infrastructure
;vm_tenant_relation = grafana.* = Infrastructure
# Attempt to collect asset tags from vCenter hosts
#collect_hardware_asset_tag = True
; This option defines custom platforms if the VMWare created platforms are not suitable.
; Pretty much a mapping of VMWare platform name to your own platform name.
; This is done with a comma separated key = value list.
; key: defines a VMWare returned platform name
; value: defines the desired NetBox platform name
;vm_platform_relation = centos-7.* = centos7, microsoft-windows-server-2016.* = Windows2016
# Perform a reverse lookup for all collected IP addresses. If a dns name
# was found it will be added to the IP address object in NetBox
#dns_name_lookup = True
; Define the NetBox device role used for hosts and VMs. The default is
; set to "Server". This is done with a comma separated key = value list.
; key: defines a hosts/VM name as regex
; value: defines the NetBox role name (use quotes if name contains commas)
;host_role_relation = .* = Server
;vm_role_relation = .* = Server
# use custom DNS server to do the reverse lookups
#custom_dns_servers = 192.168.1.11, 192.168.1.12
; Define NetBox tags which are assigned to a cluster, host or VM. This is
; done with a comma separated key = value list.
; key: defines a hosts/VM name as regex
; value: defines the NetBox tag (use quotes if name contains commas)
; a cluster can be specified as "Cluster-name" or
; "Datacenter-name/Cluster-name" if multiple clusters have the same name
;cluster_tag_relation = Cluster_NYC.* = Infrastructure
;host_tag_relation = esxi300.* = Infrastructure
;vm_tag_relation = grafana.* = Infrastructure
# define how the primary IPs should be set
# possible values
#
# always: will remove primary IP from the object where this address is
# currently set as primary and moves it to new object
#
# when-undefined: (default)
# only sets primary IP if undefined, will cause ERRORs if same IP is
# assigned more then once to different hosts and IP is set as the
# objects primary IP
#
# never: don't set any primary IPs, will cause the same ERRORs
# as "when-undefined"
; Try to find existing host based on serial number. This can cause issues with blade
; centers if VMWare does not report the blades serial number properly.
;match_host_by_serial = True
#set_primary_ip = when-undefined
; Attempt to collect asset tags from vCenter hosts
;collect_hardware_asset_tag = True
# Do not sync notes from a VM in vCenter to the comments field on a VM in netbox
#skip_vm_comments = False
; Perform a reverse lookup for all collected IP addresses. If a dns name was found it will
; be added to the IP address object in NetBox
;dns_name_lookup = True
# Do not sync template VMs
#skip_vm_templates = True
; use custom DNS server to do the reverse lookups
;custom_dns_servers = 192.168.1.11, 192.168.1.12
# Skip virtual machines which are reported as offline.
# ATTENTION: this option will keep purging stopped VMs if activated!
#skip_offline_vms = False
; define how the primary IPs should be set
; possible values:
;
; always: will remove primary IP from the object where this address is
; currently set as primary and moves it to new object
;
; when-undefined:
; only sets primary IP if undefined, will cause ERRORs if same IP is
; assigned more then once to different hosts and IP is set as the
; objects primary IP
;
; never: don't set any primary IPs, will cause the same ERRORs
; as "when-undefined"
;set_primary_ip = when-undefined
# If the VMware Site Recovery Manager is used to can skip syncing
# placeholder/replicated VMs from fail-over site to NetBox.
#skip_srm_placeholder_vms = False
; Do not sync notes from a VM in vCenter to the comments field on a VM in netbox
;skip_vm_comments = False
# strip domain part from host name before syncing device to NetBox
#strip_host_domain_name = False
; Do not sync template VMs
;skip_vm_templates = True
# strip domain part from VM name before syncing VM to NetBox
#strip_vm_domain_name = False
; Skip virtual machines which are reported as offline.
; ATTENTION: this option will keep purging stopped VMs if activated!
;skip_offline_vms = False
# sync tags assigned to clusters, hosts and VMs in vCenter to NetBox
# INFO: this requires the installation of the 'vsphere-automation-sdk', ses docs about installation
# possible values:
# * object : the host or VM itself
# * parent_folder_1 : the direct folder this object is organized in (1 level up)
# * parent_folder_2 : the indirect folder this object is organized in (2 levels up)
# * cluster : the cluster this object is organized in
# * datacenter : the datacenter this object is organized in
# this is a comma separated list of options. example: vm_tag_source = object, cluster
#cluster_tag_source =
#host_tag_source =
#vm_tag_source =
; If the VMware Site Recovery Manager is used to can skip syncing placeholder/replicated
; VMs from fail-over site to NetBox.
;skip_srm_placeholder_vms = False
# sync custom attributes defined for hosts and VMs in vCenter to NetBox as custom fields
#sync_custom_attributes = False
; strip domain part from host name before syncing device to NetBox
;strip_host_domain_name = False
# add arbitrary host/vm object attributes as custom fields to NetBox.
# multiple attributes can be defined comma separated.
# to get a list of available attributes use '-l DEBUG3' as cli param (CAREFUL: output might be long)
# and here 'https://gist.github.com/bb-Ricardo/538768487bdac4efafabe56e005cb4ef' can be seen how to
# access these attributes
#host_custom_object_attributes = summary.runtime.bootTime
#vm_custom_object_attributes = config.uuid
; strip domain part from VM name before syncing VM to NetBox
;strip_vm_domain_name = False
# this will set the sources name as cluster group name instead of the datacenter. This works if the
# vCenter has ONLY ONE datacenter configured. Otherwise it will rename all datacenters to the source name!
#set_source_name_as_cluster_group = False
; tag source options
# activating this option will also include "dummy/virtual" interfaces which are only visible inside the VM
# and are exposed through VM guest tools. Dummy interfaces without an IP address will be skipped.
#sync_vm_dummy_interfaces = False
; sync tags assigned to clusters, hosts and VMs in vCenter to NetBox
; INFO: this requires the installation of the 'vsphere-automation-sdk',
; see docs about installation possible values:
; * object : the host or VM itself
; * parent_folder_1 : the direct folder this object is organized in (1 level up)
; * parent_folder_2 : the indirect folder this object is organized in (2 levels up)
; * cluster : the cluster this object is organized in
; * datacenter : the datacenter this object is organized in
; this is a comma separated list of options. example: vm_tag_source = object, cluster
;
; Example: vm_tag_source = object, cluster
;cluster_tag_source =
;host_tag_source =
;vm_tag_source =
# disables syncing of any VLANs visible in vCenter to NetBox
#disable_vlan_sync = False
; sync custom attributes defined for hosts and VMs in vCenter to NetBox as custom fields
;sync_custom_attributes = False
# enabling this option will add the ESXi host this VM is running on to the VM details
#track_vm_host = False
; custom object attributes options
# define if the name of the (device/VM) interface discovered overwrites the interface name in NetBox
# the interface will only be matched by identical MAC address
#overwrite_device_interface_name = True
#overwrite_vm_interface_name = True
; add arbitrary host/vm object attributes as custom fields to NetBox.
; multiple attributes can be defined comma separated.
; to get a list of available attributes use '-l DEBUG3' as cli param (CAREFUL: output might be long)
; and here 'https://gist.github.com/bb-Ricardo/538768487bdac4efafabe56e005cb4ef' can be seen how to
; access these attributes
;host_custom_object_attributes = summary.runtime.bootTime
;vm_custom_object_attributes = config.uuid
# set a matching value for ESXi host management interface description (case insensitive, comma separated).
# used to figure out the ESXi primary IP address
# default: management, mgmt
#host_management_interface_match = management, mgmt
; this will set the sources name as cluster group name instead of the datacenter. This
; works if the vCenter has ONLY ONE datacenter configured. Otherwise it will rename all
; datacenters to the source name!
;set_source_name_as_cluster_group = False
# define in which order the IP address tenant will be assigned if tenant is undefined.
# possible values:
# * device : host or VM tenant will be assigned to the IP address
# * prefix : if the IP address belongs to an existing prefix and this prefix has a tenant assigned, then this one is used
# * disabled : no tenant assignment to the IP address will be performed
# the order of the definition is important, the default is "device, prefix" which means:
# If the device has a tenant then this one will be used. If not, the prefix tenant will be used if defined
#ip_tenant_inheritance_order = device, prefix
; activating this option will also include "dummy/virtual" interfaces which are only
; visible inside the VM and are exposed through VM guest tools. Dummy interfaces without
; an IP address will be skipped.
;sync_vm_dummy_interfaces = False
; disables syncing of any VLANs visible in vCenter to NetBox
;disable_vlan_sync = False
; enabling this option will add the ESXi host this VM is running on to the VM details
;track_vm_host = False
; define if the name of the device interface discovered overwrites the interface name in
; NetBox. The interface will only be matched by identical MAC address
;overwrite_device_interface_name = True
; define if the name of the VM interface discovered overwrites the interface name in
; NetBox. The interface will only be matched by identical MAC address
;overwrite_vm_interface_name = True
; set a matching value for ESXi host management interface description (case insensitive,
; comma separated). Used to figure out the ESXi primary IP address
;host_management_interface_match = management, mgmt
; define in which order the IP address tenant will be assigned if tenant is undefined.
; possible values:
; * device : host or VM tenant will be assigned to the IP address
; * prefix : if the IP address belongs to an existing prefix and this prefix has a tenant assigned, then this one is used
; * disabled : no tenant assignment to the IP address will be performed
; the order of the definition is important, the default is "device, prefix" which means:
; If the device has a tenant then this one will be used. If not, the prefix tenant will be used if defined
;ip_tenant_inheritance_order = device, prefix
[source/my-redfish-example]
# Defines if this source is enabled or not
#enabled = true
; Defines if this source is enabled or not
;enabled = True
# type of source. This defines which source handler to use.
; type of source. This defines which source handler to use
type = check_redfish
# define the full path where the check_redfish inventory json files are located
; define the full path where the check_redfish inventory json files are located
inventory_file_path = /full/path/to/inventory/files
# IP networks eligible to be synced to NetBox.
# If an IP address is not part of this networks then it WON'T be synced to NetBox.
# To excluded small blocks from bigger IP blocks a leading '!' has to be added.
# example: 10.0.0.0/8, !10.23.42.0/24
permitted_subnets = 172.16.0.0/12, 10.0.0.0/8, 192.168.0.0/16, fd00::/8
; IP networks eligible to be synced to NetBox. If an IP address is not part of this
; networks then it WON'T be synced to NetBox. To excluded small blocks from bigger IP
; blocks a leading '!' has to be added
;permitted_subnets = 172.16.0.0/12, 10.0.0.0/8, 192.168.0.0/16, fd00::/8, !10.23.42.0/24
# define if the host name discovered via check_redfish overwrites the device host name in NetBox
#overwrite_host_name = False
; define if the host name discovered via check_redfish overwrites the device host name in
; NetBox
;overwrite_host_name = False
# define if the name of the power supply discovered via check_redfish overwrites the power supply name in NetBox
#overwrite_power_supply_name = False
; define if the name of the power supply discovered via check_redfish overwrites the power
; supply name in NetBox
;overwrite_power_supply_name = False
# define if existing power supply attributes are overwritten with data discovered via check_redfish
# if False only data which is not preset in NetBox will be added
#overwrite_power_supply_attributes = True
; define if existing power supply attributes are overwritten with data discovered via
; check_redfish if False only data which is not preset in NetBox will be added
;overwrite_power_supply_attributes = True
# define if the name of the interface discovered via check_redfish overwrites the interface name in NetBox
#overwrite_interface_name = False
; define if the name of the interface discovered via check_redfish overwrites the
; interface name in NetBox
;overwrite_interface_name = False
# define if existing interface attributes are overwritten with data discovered via check_redfish
# if False only data which is not preset in NetBox will be added
#overwrite_interface_attributes = True
; define if existing interface attributes are overwritten with data discovered via
; check_redfish if False only data which is not preset in NetBox will be added
;overwrite_interface_attributes = True
# EOF
;EOF