WIP: base for parsing config files of different types #227

This commit is contained in:
ricardo.bartels@telekom.de
2023-02-01 02:23:03 +01:00
parent 76e1ac5a1e
commit 6a84a0226e
9 changed files with 535 additions and 9 deletions

View File

@@ -43,11 +43,10 @@ def parse_command_line(version=None, self_description=None, version_date=None, u
description=description,
formatter_class=RawDescriptionHelpFormatter)
parser.add_argument("-c", "--config", default=default_config_file_path, dest="config_file",
help="points to the config file to read config data from " +
"which is not installed under the default path '" +
default_config_file_path + "'",
metavar="settings.ini")
parser.add_argument("-c", "--config", default=[], dest="config_file", nargs='+',
help=f"points to the config file to read config data from which is not installed "
f"under the default path '{default_config_file_path}'",
metavar=os.path.basename(default_config_file_path))
parser.add_argument("-l", "--log_level", choices=valid_log_levels, dest="log_level",
help="set log level (overrides config)")
@@ -63,8 +62,17 @@ def parse_command_line(version=None, self_description=None, version_date=None, u
args = parser.parse_args()
# fix supplied config file path
if args.config_file != default_config_file_path and args.config_file[0] != os.sep:
args.config_file = os.path.realpath(os.getcwd() + os.sep + args.config_file)
fixed_config_files = list()
for config_file in args.config_file:
if len(config_file) == 0:
continue
if config_file != default_config_file_path and config_file[0] != os.sep:
config_file = os.path.realpath(os.getcwd() + os.sep + config_file)
fixed_config_files.append(config_file)
args.config_file = fixed_config_files
return args

View File

@@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.

View File

@@ -0,0 +1,114 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
import configparser
import os
from module.common.misc import do_error_exit
from module.common.logging import get_logger
log = get_logger()
class ConfigBase:
"""
Base class to parse config data
"""
sensitive_keys = [
"password",
"token",
]
not_config_vars = [
"config_section_name",
"__module__",
"__doc__"
]
parser_error = False
def __init__(self, config_data: configparser.ConfigParser):
if not isinstance(config_data, configparser.ConfigParser):
do_error_exit("config data is not a config parser object")
self.parse_config(config_data)
@staticmethod
def to_bool(value):
"""
converts a string to a boolean
"""
valid = {
'true': True, 't': True, '1': True,
'false': False, 'f': False, '0': False,
}
if isinstance(value, bool):
return value
elif isinstance(value, str):
if value.lower() in valid:
return valid[value.lower()]
raise ValueError
def parse_config(self, config_data):
"""
generic method to parse config data and also takes care of reading equivalent env var
"""
config_section_name = getattr(self.__class__, "config_section_name")
if config_section_name is None:
raise KeyError(f"Class '{self.__class__.__name__}' is missing 'config_section_name' attribute")
for config_option in [x for x in vars(self.__class__) if x not in self.__class__.not_config_vars]:
var_config = getattr(self.__class__, config_option)
if not isinstance(var_config, dict):
continue
var_type = var_config.get("type", str)
var_alt = var_config.get("alt")
var_default = var_config.get("default")
config_value = config_data.get(config_section_name, config_option, fallback=None)
if config_value is None and var_alt is not None:
config_value = config_data.get(config_section_name, var_alt, fallback=None)
config_value = os.environ.get(f"{config_section_name}_{config_option}".upper(), config_value)
if config_value is not None and var_type == bool:
try:
config_value = self.to_bool(config_value)
except ValueError:
log.error(f"Unable to parse '{config_value}' for '{config_option}' as bool")
config_value = var_default
elif config_value is not None and var_type == int:
try:
config_value = int(config_value)
except ValueError:
log.error(f"Unable to parse '{config_value}' for '{config_option}' as int")
config_value = var_default
else:
if config_value is None:
config_value = var_default
debug_value = config_value
if isinstance(debug_value, str) and config_option in self.sensitive_keys:
debug_value = config_value[0:3] + "***"
log.debug(f"Config: {config_section_name}.{config_option} = {debug_value}")
setattr(self, config_option, config_value)

View File

@@ -0,0 +1,249 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
import os
import configparser
from typing import List, Dict
import yaml
import toml
from module.common.logging import get_logger
from module.common.misc import do_error_exit
log = get_logger()
class ConfigFiles:
names = list()
data = dict()
config_file_errors = False
def __init__(self, config_file_list: List, default_config_file: str = None):
"""
Read config files in the given order
Parameters
----------
config_file_list: list
list of paths of config files to parse
default_config_file: str
path to default config file
"""
self.default_config_file = self.get_config_file_path(default_config_file)
# check if default config file actually exists
# and add it to the list of files to parse
if os.path.exists(self.default_config_file) and len(config_file_list) == 0:
self.names.append(self.default_config_file)
else:
self.names = config_file_list
# check if config file exists
for f in self.names:
f = self.get_config_file_path(f)
# check if file exists
if not os.path.exists(f):
log.error(f'Config file "{f}" not found')
self.config_file_errors = True
continue
# check if it's an actual file
if not os.path.isfile(f):
log.error(f'Config file "{f}" is not an actual file')
self.config_file_errors = True
continue
# check if config file is readable
if not os.access(f, os.R_OK):
log.error(f'Config file "{f}" not readable')
self.config_file_errors = True
continue
config_file_type_parser_methods = {
"ini": self.parse_ini,
"yaml": self.parse_yaml,
"yml": self.parse_yaml,
"toml": self.parse_toml
}
for config_file in self.names:
suffix = config_file.lower().split(".")[-1]
parser_method = config_file_type_parser_methods.get(suffix)
if parser_method is None:
log.error(f"Unknown/Unsupported config file type '{suffix}' for {config_file}")
self.config_file_errors = True
continue
# noinspection PyArgumentList
self.add_config_data(parser_method(config_file=config_file), config_file)
if self.config_file_errors:
do_error_exit("Unable to open/parse one or more config files.")
log.info("Done reading config files")
def add_config_data(self, config_data: dict, config_file: str) -> None:
if not isinstance(config_data, dict):
log.error(f"Parsed config data from file '{config_file}' is not a directory")
self.config_file_errors = True
return
for section, section_data in config_data.items():
if section == "sources":
if not isinstance(section_data, list):
log.error(f"Parsed config data from file '{config_file}' for '{section}' is not a list")
self.config_file_errors = True
continue
if self.data.get(section) is None:
self.data[section] = list()
for source in section_data:
current_data = None
for current_sources in self.data.get(section):
# find source by name
if current_sources.get("name") == source.get("name"):
current_data = current_sources
break
if current_data is None:
self.data[section].append(source)
else:
for key, value in source.items():
current_data[key] = value
else:
if not isinstance(section_data, dict):
log.error(f"Parsed config data from file '{config_file}' for '{section}' is not a directory")
self.config_file_errors = True
continue
if self.data.get(section) is None:
self.data[section] = dict()
for key, value in section_data.items():
self.data[section][key] = value
@staticmethod
def get_config_file_path(config_file: str) -> str:
"""
get absolute path to provided config file string
Parameters
----------
config_file: str
config file path
Returns
-------
str: absolute path to config file
"""
if not isinstance(config_file, str):
raise ValueError("value for 'config_file' of 'parse_ini' must be of type str")
if len(config_file) == 0:
raise ValueError(f"value for 'config_file' can't be empty")
base_dir = os.sep.join(__file__.split(os.sep)[0:-3])
if config_file[0] != os.sep:
config_file = f"{base_dir}{os.sep}{config_file}"
return os.path.realpath(config_file)
def parse_ini(self, config_file: str = "") -> Dict:
if not isinstance(config_file, str):
raise ValueError("value for 'config_file' of 'parse_ini' must be of type str")
if len(config_file) == 0:
raise ValueError(f"value for 'config_file' can't be empty")
# setup config parser and read config
config_handler = configparser.ConfigParser(strict=True, allow_no_value=True,
empty_lines_in_values=False, interpolation=None)
return_data = dict()
try:
config_handler.read_file(open(config_file))
except configparser.Error as e:
log.error(f"Problem while config file '{config_file}' parsing: {e}")
self.config_file_errors = True
return return_data
except Exception as e:
log.error(f"Unable to open file '{config_file}': {e}")
self.config_file_errors = True
return return_data
for section in config_handler.sections():
if section.startswith("source/"):
if return_data.get("sources") is None:
return_data["sources"] = list()
source_data = dict(config_handler.items(section))
source_data["name"] = section.replace("source/", "")
return_data["sources"].append(source_data)
else:
return_data[section] = dict(config_handler.items(section))
return return_data
def parse_yaml_or_toml(self, config_file: str = "", config_type: str = "yaml") -> Dict:
if not isinstance(config_file, str):
raise ValueError("value for 'config_file' of 'parse_yaml_or_toml' must be of type str")
if len(config_file) == 0:
raise ValueError(f"value for 'config_file' can't be empty")
return_data = dict()
if config_type == "yaml":
parser = yaml.safe_load
elif config_type == "toml":
parser = toml.load
else:
log.error(f"Unknown config type '{config_type}' for config file '{config_file}'.")
self.config_file_errors = True
return return_data
with open(config_file, "r") as stream:
try:
return_data = parser(stream)
except (yaml.YAMLError, toml.TomlDecodeError) as e:
log.error(f"Problem while config file '{config_file}' parsing: {e}")
self.config_file_errors = True
return return_data
except Exception as e:
log.error(f"Unable to open file '{config_file}': {e}")
self.config_file_errors = True
return return_data
if isinstance(return_data.get("source"), list) and return_data.get("sources") is None:
return_data["sources"] = return_data.get("source")
del return_data["source"]
return return_data
def parse_yaml(self, config_file: str = "") -> Dict:
return self.parse_yaml_or_toml(config_file, "yaml")
def parse_toml(self, config_file: str = "") -> Dict:
return self.parse_yaml_or_toml(config_file, "toml")

View File

@@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
from typing import Any
from textwrap import wrap, fill, indent
class ConfigOption:
def __init__(self,
key: str,
value_type: Any,
description: str = "",
default_value: Any = None,
config_example: Any = None,
mandatory: bool = False,
alt_key: str = None,
deprecated: bool = False,
deprecation_message: str = None):
self.key = key
self.value_type = value_type
self._description = description
self.default_value = default_value
self.config_example = config_example
self.mandatory = mandatory
self.alt_key = alt_key
self.deprecated = deprecated
self.deprecation_message = deprecation_message
if self.config_example is None:
self.config_example = self.default_value
if not isinstance(self._description, str):
raise ValueError(f"value for 'description' of '{self.key}' must be of type str")
if len(self._description) == 0:
raise ValueError(f"value for 'description' of '{self.key}' can't be empty")
if self.config_example is not None and not isinstance(self.config_example, self.value_type):
raise ValueError(f"value for 'config_example' of '{self.key}' must be of '{self.value_type}'")
def description(self, width: int = 80) -> str:
if not isinstance(width, int):
raise ValueError("value for 'width' must be of type int")
return fill(" ".join(wrap(self._description)), width=width)
def config_description(self, prefix: str = "#", width: int = 80) -> str:
if not isinstance(width, int):
raise ValueError("value for 'width' must be of type int")
if not isinstance(prefix, str):
raise ValueError("value for 'prefix' must be of type str")
prefix += " "
if width - len(prefix) < 3:
width = 3
else:
width = width - len(prefix)
return indent(self.description(width), prefix)

View File

@@ -25,10 +25,20 @@ from module.netbox.inventory import (
)
from module.common.logging import get_logger
from module.common.misc import grab
from module.config.config_option import ConfigOption
log = get_logger()
class SourceBaseConfig:
"""
Common config options all sources share
"""
name = ConfigOption("name", str, description="Name of this source")
enabled = ConfigOption("enabled", bool, description="Defines if this source is enabled or not", default_value=True)
class SourceBase:
"""
This is the base class for all import source classes. It provides some helpful common methods.

View File

@@ -0,0 +1,60 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2022 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
from module.config.config_option import ConfigOption
from module.sources.common.source_base import SourceBaseConfig
class VMWareConfig(SourceBaseConfig):
source_type = "vmware"
host_fqdn = ConfigOption("host_fqdn",
str,
description="host name / IP address of the vCenter",
config_example="my-netbox.local",
mandatory=True)
port = ConfigOption("port",
int,
description="TCP port to connect to",
default_value=443,
mandatory=True)
username = ConfigOption("username",
str,
description="username to use to log into vCenter",
config_example="vcenter-admin",
mandatory=True)
password = ConfigOption("password",
str,
description="password to use to log into vCenter",
config_example="super-secret",
mandatory=True)
validate_tls_certs = ConfigOption("validate_tls_certs",
bool,
description="""Enforces TLS certificate validation.
If vCenter uses a valid TLS certificate then this option should be set
to 'true' to ensure a secure connection.""")
proxy_host = ConfigOption("proxy_host",
str,
description="""EXPERIMENTAL: Connect to a vCenter using a proxy server
(socks proxies are not supported).
define a host name or an IP address""",
config_example="10.10.1.10")
proxy_port = ConfigOption("proxy_port",
int,
description="""EXPERIMENTAL: Connect to a vCenter using a proxy server
(socks proxies are not supported).
define proxy server port number""",
config_example=3128)

View File

@@ -15,7 +15,7 @@ Sync objects from various sources to NetBox
from datetime import datetime
from module.common.misc import grab, get_relative_time
from module.common.misc import grab, get_relative_time, dump
from module.common.cli_parser import parse_command_line
from module.common.logging import setup_logging
from module.common.configuration import get_config_file, open_config_file, get_config
@@ -23,7 +23,7 @@ from module.netbox.connection import NetBoxHandler
from module.netbox.inventory import NetBoxInventory
from module.netbox.object_classes import *
from module.sources import instantiate_sources
from module.config.config_files import ConfigFiles
__version__ = "1.3.0"
__version_date__ = "2022-09-06"
@@ -48,6 +48,10 @@ def main():
default_config_file_path=default_config_file_path)
# get config file path
x = ConfigFiles(args.config_file, default_config_file_path)
import pprint
pprint.pprint(x.data)
exit(0)
config_file = get_config_file(args.config_file)
# get config handler

View File

@@ -5,3 +5,5 @@ requests==2.28.1
pyvmomi==7.0.3
aiodns==3.0.0
setuptools>=62.00.0
pyyaml==6.0
toml==0.10.2