WIP: finishes common and netbox config

This commit is contained in:
ricardo.bartels@telekom.de
2023-02-04 01:54:08 +01:00
parent 6a84a0226e
commit 214d5c9c25
7 changed files with 386 additions and 157 deletions
+53
View File
@@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
from module.config.config_option import ConfigOption
from module.config.config_base import ConfigBase
class CommonConfig(ConfigBase):
"""
Controls the parameters for logging
"""
section_name = "common"
options = [
ConfigOption("log_level",
str,
description="""\
Logs will always be printed to stdout/stderr.
Logging can be set to following log levels:
ERROR: Fatal Errors which stops regular a run
WARNING: Warning messages won't stop the syncing process but mostly worth
to have a look at.
INFO: Information about objects that will be create/updated/deleted in NetBox
DEBUG: Will log information about retrieved information, changes in internal
content structure and parsed config
DEBUG2: Will also log information about how/why content is parsed or skipped.
DEBUG3: Logs all source and NetBox queries/results to stdout. Very useful for
troubleshooting, but will log any sensitive content contained within a query.
""",
default_value="INFO"),
ConfigOption("log_to_file",
bool,
description="""Enabling this options will write all
logs to a log file defined in 'log_file'
""",
default_value=True),
ConfigOption("log_file",
str,
description="""Destination of the log file if "log_to_file" is enabled.
Log file will be rotated maximum 5 times once the log file reaches size of 10 MB
""",
default_value="log/netbox_sync.log")
]
-57
View File
@@ -16,63 +16,6 @@ from module.common.logging import get_logger
log = get_logger()
def get_config_file(config_file):
"""
get absolute path to provided config file string
Parameters
----------
config_file: str
config file path
Returns
-------
str: absolute path to config file
"""
if config_file is None or config_file == "":
do_error_exit("ERROR: Config file not defined.")
base_dir = os.sep.join(__file__.split(os.sep)[0:-3])
if config_file[0] != os.sep:
config_file = f"{base_dir}{os.sep}{config_file}"
return os.path.realpath(config_file)
def open_config_file(config_file):
"""
Open config file with a ConfigParser and return handler. Bail out of opening or parsing fails
Parameters
----------
config_file: str
absolute path of config file to open
Returns
-------
ConfigParser: handler with supplied config file
"""
if config_file is None or config_file == "":
do_error_exit("ERROR: Config file not defined.")
# setup config parser and read config
config_handler = configparser.ConfigParser(strict=True, allow_no_value=True,
empty_lines_in_values=False, interpolation=None)
# noinspection PyBroadException
try:
config_handler.read_file(open(config_file))
except configparser.Error as e:
do_error_exit(f"ERROR: Problem while config file parsing: {e}")
# noinspection PyBroadException
except Exception:
do_error_exit(f"ERROR: Unable to open file '{config_file}'")
return config_handler
def get_config(config_handler=None, section=None, valid_settings=None, deprecated_settings=None, removed_settings=None):
"""
read config items from a defined section
+63 -71
View File
@@ -7,11 +7,12 @@
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
import configparser
import os
from module.common.misc import do_error_exit
from module.common.misc import do_error_exit, grab
from module.common.logging import get_logger
from module.config.config_files import ConfigFilesParser
from module.config.config_option import ConfigOption
log = get_logger()
@@ -21,94 +22,85 @@ class ConfigBase:
Base class to parse config data
"""
sensitive_keys = [
"password",
"token",
]
env_var_prefix = "NBS"
section_name = None
not_config_vars = [
"config_section_name",
"__module__",
"__doc__"
]
options = list()
parser_error = False
def __init__(self, config_file_handler: ConfigFilesParser):
def __init__(self, config_data: configparser.ConfigParser):
if not isinstance(config_data, configparser.ConfigParser):
if not isinstance(config_file_handler, ConfigFilesParser):
do_error_exit("config data is not a config parser object")
self.parse_config(config_data)
self._parse_config_data(config_file_handler.content)
@staticmethod
def to_bool(value):
"""
converts a string to a boolean
"""
valid = {
'true': True, 't': True, '1': True,
'false': False, 'f': False, '0': False,
}
if isinstance(value, bool):
return value
elif isinstance(value, str):
if value.lower() in valid:
return valid[value.lower()]
raise ValueError
def parse_config(self, config_data):
def _parse_config_data(self, config_data):
"""
generic method to parse config data and also takes care of reading equivalent env var
"""
config_section_name = getattr(self.__class__, "config_section_name")
if self.section_name is None:
raise KeyError(f"Class '{self.__class__.__name__}' is missing 'section_name' attribute")
if config_section_name is None:
raise KeyError(f"Class '{self.__class__.__name__}' is missing 'config_section_name' attribute")
for config_object in self.options:
for config_option in [x for x in vars(self.__class__) if x not in self.__class__.not_config_vars]:
var_config = getattr(self.__class__, config_option)
if not isinstance(var_config, dict):
if not isinstance(config_object, ConfigOption):
continue
var_type = var_config.get("type", str)
var_alt = var_config.get("alt")
var_default = var_config.get("default")
config_value = grab(config_data, f"{self.section_name}.{config_object.key}")
config_value = config_data.get(config_section_name, config_option, fallback=None)
if config_value is None and var_alt is not None:
config_value = config_data.get(config_section_name, var_alt, fallback=None)
alt_key_used = False
if config_value is None and config_object.alt_key is not None:
alt_key_used = True
config_value = grab(config_data, f"{self.section_name}.{config_object.alt_key}")
config_value = os.environ.get(f"{config_section_name}_{config_option}".upper(), config_value)
# check for deprecated settings
if config_object.deprecated is True:
log_text = f"Setting '{config_object.key}' is deprecated and will be removed soon."
if len(config_object.deprecation_message) > 0:
log_text += " " + config_object.deprecation_message
log.warning(log_text)
if config_value is not None and var_type == bool:
try:
config_value = self.to_bool(config_value)
except ValueError:
log.error(f"Unable to parse '{config_value}' for '{config_option}' as bool")
config_value = var_default
# check for removed settings
if config_value is not None and config_object.removed is True:
object_key = config_object.key
if alt_key_used is True:
object_key = config_object.alt_key
log_text = f"Setting '{object_key}' has been removed " \
f"but is still defined in config section '{self.section_name}'."
if len(config_object.deprecation_message) > 0:
log_text += " " + config_object.deprecation_message
log.warning(log_text)
continue
elif config_value is not None and var_type == int:
try:
config_value = int(config_value)
except ValueError:
log.error(f"Unable to parse '{config_value}' for '{config_option}' as int")
config_value = var_default
# parse env
env_var_name = f"{self.env_var_prefix}_{self.section_name}_{config_object.key}".upper()
config_value = os.environ.get(env_var_name, config_value)
else:
if config_value is None:
config_value = var_default
# set value
config_object.set_value(config_value)
debug_value = config_value
if isinstance(debug_value, str) and config_option in self.sensitive_keys:
debug_value = config_value[0:3] + "***"
def parse(self):
log.debug(f"Config: {config_section_name}.{config_option} = {debug_value}")
options = dict()
for config_object in self.options:
if isinstance(config_object, ConfigOption) and config_object.removed is False:
log.debug(f"Config: {self.section_name}.{config_object.key} = {config_object.sensitive_value}")
options[config_object.key] = config_object.value
setattr(self, config_option, config_value)
return ConfigOptions(**options)
class ConfigOptions:
def __init__(self, **kwargs):
for name in kwargs:
setattr(self, name, kwargs[name])
def __eq__(self, other):
if not isinstance(other, ConfigOptions):
return NotImplemented
return vars(self) == vars(other)
def __contains__(self, key):
return key in self.__dict__
+12 -9
View File
@@ -20,10 +20,13 @@ from module.common.misc import do_error_exit
log = get_logger()
class ConfigFiles:
class ConfigFilesParser:
"""
parses a given list of config files
"""
names = list()
data = dict()
content = dict()
config_file_errors = False
def __init__(self, config_file_list: List, default_config_file: str = None):
@@ -111,20 +114,20 @@ class ConfigFiles:
self.config_file_errors = True
continue
if self.data.get(section) is None:
self.data[section] = list()
if self.content.get(section) is None:
self.content[section] = list()
for source in section_data:
current_data = None
for current_sources in self.data.get(section):
for current_sources in self.content.get(section):
# find source by name
if current_sources.get("name") == source.get("name"):
current_data = current_sources
break
if current_data is None:
self.data[section].append(source)
self.content[section].append(source)
else:
for key, value in source.items():
current_data[key] = value
@@ -135,10 +138,10 @@ class ConfigFiles:
self.config_file_errors = True
continue
if self.data.get(section) is None:
self.data[section] = dict()
if self.content.get(section) is None:
self.content[section] = dict()
for key, value in section_data.items():
self.data[section][key] = value
self.content[section][key] = value
@staticmethod
def get_config_file_path(config_file: str) -> str:
+111 -3
View File
@@ -8,10 +8,17 @@
# repository or visit: <https://opensource.org/licenses/MIT>.
from typing import Any
from textwrap import wrap, fill, indent
from textwrap import wrap, fill, indent, dedent
from module.common.logging import get_logger
log = get_logger()
class ConfigOption:
"""
handles all attributes of a single config option
"""
def __init__(self,
key: str,
@@ -22,9 +29,12 @@ class ConfigOption:
mandatory: bool = False,
alt_key: str = None,
deprecated: bool = False,
deprecation_message: str = None):
deprecation_message: str = "",
removed: bool = False,
sensitive: bool = False):
self.key = key
self._value = None
self.value_type = value_type
self._description = description
self.default_value = default_value
@@ -33,27 +43,125 @@ class ConfigOption:
self.alt_key = alt_key
self.deprecated = deprecated
self.deprecation_message = deprecation_message
self.removed = removed
self.sensitive = sensitive
if self.config_example is None:
self.config_example = self.default_value
if self.default_value is not None:
self.set_value(self.default_value)
if not isinstance(self._description, str):
raise ValueError(f"value for 'description' of '{self.key}' must be of type str")
if not isinstance(self.deprecation_message, str):
raise ValueError(f"value for 'deprecation_message' of '{self.key}' must be of type str")
if len(self._description) == 0:
raise ValueError(f"value for 'description' of '{self.key}' can't be empty")
if self.config_example is not None and not isinstance(self.config_example, self.value_type):
raise ValueError(f"value for 'config_example' of '{self.key}' must be of '{self.value_type}'")
def __repr__(self):
return f"{self.key}: {self._value}"
@property
def value(self):
return self._value
@property
def sensitive_value(self):
if self.sensitive is True:
return self._value[0:3] + "***"
return self._value
def set_value(self, value):
if value is None:
return
if self.value_type == bool:
try:
config_value = self.to_bool(value)
except ValueError:
log.error(f"Unable to parse '{value}' for '{self.key}' as bool")
return
elif self.value_type == int:
try:
config_value = int(value)
except ValueError:
log.error(f"Unable to parse '{value}' for '{self.key}' as int")
return
else:
config_value = value
self._value = config_value
@staticmethod
def to_bool(value):
"""
converts a string to a boolean
"""
valid = {
'true': True, 't': True, '1': True, 'yes': True,
'false': False, 'f': False, '0': False, 'no': False
}
if isinstance(value, bool):
return value
elif isinstance(value, str):
if value.lower() in valid:
return valid[value.lower()]
raise ValueError
def description(self, width: int = 80) -> str:
"""
return description as a string wrapped at 'width'
SPECIAL: if self._description starts with a blank character,
the description will be dedented and NO line wrapping will be applied.
Parameters
----------
width: int
max characters per line
Returns
-------
str: single string containing new line characters
"""
if not isinstance(width, int):
raise ValueError("value for 'width' must be of type int")
return fill(" ".join(wrap(self._description)), width=width)
if self._description.startswith(" "):
return dedent(self._description)
else:
return fill(" ".join(wrap(self._description)), width=width)
def config_description(self, prefix: str = "#", width: int = 80) -> str:
"""
prefixes each description line with one or more 'prefix' characters
and a blank between prefix and description line text
Parameters
----------
prefix: str
string to prefix each line with
width: int
max characters per line
Returns
-------
str: single string containing new line characters
"""
if not isinstance(width, int):
raise ValueError("value for 'width' must be of type int")
+127
View File
@@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
from module.config.config_option import ConfigOption
from module.config.config_base import ConfigBase
class NetBoxConfig(ConfigBase):
"""
Controls the parameters for logging
"""
section_name = "netbox"
options = [
ConfigOption("api_token",
str,
description="""Requires an NetBox API token with full permissions on all objects except
'auth', 'secrets' and 'users'
""",
config_example="XYZ",
mandatory=True,
sensitive=True),
ConfigOption("host_fqdn",
str,
description="Requires a hostname or IP which points to your NetBox instance",
config_example="netbox.example.com",
mandatory=True),
ConfigOption("port",
int,
description="""Define the port your NetBox instance is listening on.
If 'disable_tls' is set to "true" this option might be set to 80
""",
default_value=443),
ConfigOption("disable_tls",
bool,
description="Whether TLS encryption is enabled or disabled",
default_value=False),
ConfigOption("validate_tls_certs",
bool,
description="""Enforces TLS certificate validation. If this system doesn't trust the NetBox
web server certificate then this option needs to be changed
""",
default_value=True),
ConfigOption("proxy",
str,
description="""Defines a proxy which will be used to connect to NetBox.
Proxy setting needs to include the schema.
Proxy basic auth example: http://user:pass@10.10.1.10:312
""",
config_example="http://example.com:3128"),
ConfigOption("client_cert",
str,
description="Specify a client certificate which can be used to authenticate to NetBox",
config_example="client.pem"),
ConfigOption("client_cert_key",
str,
description="Specify the client certificate private key belonging to the client cert",
config_example="client.key"),
ConfigOption("prune_enabled",
bool,
description="""Whether items which were created by this program but
can't be found in any source anymore will be deleted or not
""",
default_value=False),
ConfigOption("prune_delay_in_days",
int,
description="""Orphaned objects will first be tagged before they get deleted.
Once the amount of days passed the object will actually be deleted
""",
default_value=30),
ConfigOption("ignore_unknown_source_object_pruning",
bool,
description="""This will tell netbox-sync to ignore objects in NetBox
with tag 'NetBox-synced' from pruning if the source is not defined in
this config file (https://github.com/bb-Ricardo/netbox-sync/issues/176)
""",
default_value=False),
ConfigOption("default_netbox_result_limit",
int,
description="""The maximum number of objects returned in a single request.
If a NetBox instance is very quick responding the value should be raised
"""),
ConfigOption("timeout",
int,
description="""The maximum time a query is allowed to execute before being
killed and considered failed
""",
default_value=30),
ConfigOption("max_retry_attempts",
int,
description="""The amount of times a failed request will be reissued.
Once the maximum is reached the syncing process will be stopped completely.
""",
default_value=4),
ConfigOption("use_caching",
bool,
description="""Defines if caching of NetBox objects is used or not.
If problems with unresolved dependencies occur, switching off caching might help.
""",
default_value=True),
ConfigOption("cache_directory_location",
str,
description="The location of the directory where the cache files should be stored",
default_value="cache")
]
+20 -17
View File
@@ -15,15 +15,17 @@ Sync objects from various sources to NetBox
from datetime import datetime
from module.common.misc import grab, get_relative_time, dump
from module.common.misc import grab, get_relative_time
from module.common.cli_parser import parse_command_line
from module.common.logging import setup_logging
from module.common.configuration import get_config_file, open_config_file, get_config
from module.common.configuration import get_config
from module.netbox.connection import NetBoxHandler
from module.netbox.inventory import NetBoxInventory
from module.netbox.object_classes import *
from module.sources import instantiate_sources
from module.config.config_files import ConfigFiles
from module.config.config_files import ConfigFilesParser
from module.common.config import CommonConfig
from module.netbox.config import NetBoxConfig
__version__ = "1.3.0"
__version_date__ = "2022-09-06"
@@ -48,44 +50,45 @@ def main():
default_config_file_path=default_config_file_path)
# get config file path
x = ConfigFiles(args.config_file, default_config_file_path)
import pprint
pprint.pprint(x.data)
exit(0)
config_file = get_config_file(args.config_file)
config_file_handler = ConfigFilesParser(args.config_file, default_config_file_path)
# get config handler
config_handler = open_config_file(config_file)
import pprint
pprint.pprint(config_file_handler.content)
common_config = CommonConfig(config_file_handler).parse()
# get logging configuration
# set log level
log_level = default_log_level
# config overwrites default
log_level = config_handler.get("common", "log_level", fallback=log_level)
log_level = common_config.log_level
# cli option overwrites config file
log_level = grab(args, "log_level", fallback=log_level)
log_file = None
if bool(config_handler.getboolean("common", "log_to_file", fallback=False)) is True:
log_file = config_handler.get("common", "log_file", fallback=None)
if common_config.log_to_file is True:
log_file = common_config.log_file
# setup logging
log = setup_logging(log_level, log_file)
# now we are ready to go
log.info(f"Starting {__description__} v{__version__} ({__version_date__})")
log.debug(f"Using config file: {config_file}")
for config_file in config_file_handler.names:
log.debug(f"Using config file: {config_file}")
# just to print config options to log/console
CommonConfig(config_file_handler).parse()
# initialize an empty inventory which will be used to hold and reference all objects
inventory = NetBoxInventory()
# get config for NetBox handler
netbox_settings = get_config(config_handler, section="netbox", valid_settings=NetBoxHandler.settings)
netbox_settings = NetBoxConfig(config_file_handler).parse()
exit(0)
# establish NetBox connection
nb_handler = NetBoxHandler(settings=netbox_settings, inventory=inventory, nb_sync_version=__version__)
# if purge was selected we go ahead and remove all items which were managed by this tools
if args.purge is True: