Files
netbox-sync/module/netbox/connection.py
2020-11-15 22:24:59 +01:00

703 lines
25 KiB
Python

import json
from datetime import datetime
import requests
from http.client import HTTPConnection
import urllib3
import pickle
import os
from packaging import version
import pprint
from module import plural
from module.common.misc import grab, do_error_exit, dump
from module.netbox.object_classes import *
from module.common.logging import get_logger, DEBUG3
log = get_logger()
class NetBoxHandler:
"""
Handles NetBox connection state and interaction with API
"""
minimum_api_version = "2.9"
# permitted settings and defaults
settings = {
"api_token": None,
"host_fqdn": None,
"port": None,
"disable_tls": False,
"validate_tls_certs": True,
"prune_enabled": False,
"prune_delay_in_days": 30,
"default_netbox_result_limit": 200,
"timeout": 30,
"max_retry_attempts": 4
}
primary_tag = "NetBox-synced"
orphaned_tag = f"{primary_tag}: Orphaned"
cache_directory = None
use_netbox_caching = True
inventory = None
instance_tags = None
instance_interfaces = {}
instance_virtual_interfaces = {}
resolved_dependencies = set()
testing_cache = False
def __init__(self, settings=None, inventory=None):
self.settings = settings
self.inventory = inventory
# set primary tag
setattr(self.inventory, "primary_tag", self.primary_tag)
self.parse_config_settings(settings)
# flood the console
if log.level == DEBUG3:
log.warning("Log level is set to DEBUG3, Request logs will only be printed to console")
HTTPConnection.debuglevel = 1
proto = "https"
if bool(self.disable_tls) is True:
proto = "http"
# disable TLS insecure warnings if user explicitly switched off validation
if bool(self.validate_tls_certs) is False:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
port = ""
if self.port is not None:
port = f":{self.port}"
self.url = f"{proto}://{self.host_fqdn}{port}/api/"
self.session = self.create_session()
# check for minimum version
if version.parse(self.get_api_version()) < version.parse(self.minimum_api_version):
do_error_exit(f"Netbox API version '{self.api_version}' not supported. "
f"Minimum API version: {self.minimum_api_version}")
self.setup_caching()
def setup_caching(self):
cache_folder_name = "cache"
base_dir = os.sep.join(__file__.split(os.sep)[0:-3])
if cache_folder_name[0] != os.sep:
cache_folder_name = f"{base_dir}/{cache_folder_name}"
self.cache_directory = os.path.realpath(cache_folder_name)
# check if directory is a file
if os.path.isfile(self.cache_directory):
log.warning(f"The cache directory ({self.cache_directory}) seems to be file.")
self.use_netbox_caching = False
# check if directory exists
if not os.path.exists(self.cache_directory):
# try to create directory
try:
os.makedirs(self.cache_directory, 0o700)
except OSError:
log.warning(f"Unable to create cache directory: {self.cache_directory}")
self.use_netbox_caching = False
except Exception as e:
log.warning(f"Unknown exception while creating cache directory {self.cache_directory}: {e}")
self.use_netbox_caching = False
# check if directory is writable
if not os.access(self.cache_directory, os.X_OK | os.W_OK):
log.warning(f"Error writing to cache directory: {self.cache_directory}")
self.use_netbox_caching = False
if self.use_netbox_caching is False:
log.warning("NetBox caching DISABLED")
else:
log.debug(f"Successfully configured cache directory: {self.cache_directory}")
def parse_config_settings(self, config_settings):
validation_failed = False
for setting in ["host_fqdn", "api_token"]:
if config_settings.get(setting) is None:
log.error(f"Config option '{setting}' in 'netbox' can't be empty/undefined")
validation_failed = True
for setting in ["prune_delay_in_days", "default_netbox_result_limit", "timeout", "max_retry_attempts"]:
if not isinstance(config_settings.get(setting), int):
log.error(f"Config option '{setting}' in 'netbox' must be an integer.")
validation_failed = True
if validation_failed is True:
log.error("Config validation failed. Exit!")
exit(1)
for setting in self.settings.keys():
setattr(self, setting, config_settings.get(setting))
def create_session(self):
"""
Creates a session with NetBox
:return: `True` if session created else `False`
:rtype: bool
"""
header = {
"Authorization": f"Token {self.api_token}",
"User-Agent": "netbox-sync/0.0.1"
}
session = requests.Session()
session.headers.update(header)
log.debug("Created new requests Session for NetBox.")
return session
def get_api_version(self):
"""
Determines the current NetBox API Version
:return: NetBox API version
:rtype: float
"""
response = None
try:
response = self.session.get(
self.url,
timeout=self.timeout,
verify=self.validate_tls_certs)
except Exception as e:
do_error_exit(str(e))
result = str(response.headers["API-Version"])
log.info(f"Successfully connected to NetBox '{self.host_fqdn}'")
log.debug(f"Detected NetBox API version: {result}")
return result
def request(self, object_class, req_type="GET", data=None, params=dict(), nb_id=None):
result = None
request_url = f"{self.url}{object_class.api_path}/"
# append NetBox ID
if nb_id is not None:
request_url += f"{nb_id}/"
if not isinstance(params, dict):
log.debug(f"Params passed to NetBox request need to be a dict, got: {params}")
params = dict()
if req_type == "GET":
if "limit" not in params.keys():
params["limit"] = self.default_netbox_result_limit
# always exclude config context
params["exclude"] = "config_context"
# prepare request
this_request = self.session.prepare_request(
requests.Request(req_type, request_url, params=params, json=data)
)
# issue request
response = self.single_request(this_request)
try:
result = response.json()
except json.decoder.JSONDecodeError:
pass
if response.status_code == 200:
# retrieve paginated results
if this_request.method == "GET" and result is not None:
while response.json().get("next") is not None:
this_request.url = response.json().get("next")
log.debug2("NetBox results are paginated. Getting next page")
response = self.single_request(this_request)
result["results"].extend(response.json().get("results"))
elif response.status_code in [201, 204]:
action = "created" if response.status_code == 201 else "deleted"
object_name = None
if req_type == "DELETE":
object_name = self.inventory.get_by_id(object_class, nb_id)
if object_name is not None:
object_name = object_name.get_display_name()
else:
object_name = result.get(object_class.primary_key)
log.info(f"NetBox successfully {action} {object_class.name} object '{object_name}'.")
if response.status_code == 204:
result = True
# token issues
elif response.status_code == 403:
do_error_exit("NetBox returned: %s: %s" % (response.reason, grab(result, "detail")))
# we screw up something else
elif response.status_code >= 400 and response.status_code < 500:
log.error(f"NetBox returned: {this_request.method} {this_request.path_url} {response.reason}")
log.error(f"NetBox returned body: {result}")
result = None
elif response.status_code >= 500:
do_error_exit(f"NetBox returned: {response.status_code} {response.reason}")
return result
def single_request(self, this_request):
response = None
if log.level == DEBUG3:
pprint.pprint(vars(this_request))
for _ in range(self.max_retry_attempts):
log_message = f"Sending {this_request.method} to '{this_request.url}'"
if this_request.body is not None:
log_message += f" with data '{this_request.body}'."
log.debug2(log_message)
try:
response = self.session.send(this_request,
timeout=self.timeout, verify=self.validate_tls_certs)
except (ConnectionError, requests.exceptions.ConnectionError,
requests.exceptions.ReadTimeout):
log.warning(f"Request failed, trying again: {log_message}")
continue
else:
break
else:
do_error_exit(f"Giving up after {self.max_retry_attempts} retries.")
log.debug2("Received HTTP Status %s.", response.status_code)
# print debugging information
if log.level == DEBUG3:
log.debug("Response Body:")
try:
pprint.pprint(response.json())
except json.decoder.JSONDecodeError as e:
log.error(e)
return response
def query_current_data(self, netbox_objects_to_query=None):
if netbox_objects_to_query is None:
raise AttributeError(f"Attribute netbox_objects_to_query is: '{netbox_objects_to_query}'")
# query all dependencies
for nb_object_class in netbox_objects_to_query:
if nb_object_class not in NetBoxObject.__subclasses__():
raise AttributeError(f"Class '{nb_object_class.__name__}' must be a subclass of '{NetBoxObject.__name__}'")
# if objects are multiple times requested but already retrieved
if nb_object_class in self.resolved_dependencies:
continue
# initialize cache variables
cached_nb_data = None
cache_file = f"{self.cache_directory}{os.sep}{nb_object_class.__name__}.cache"
cache_this_class = False
latest_update = None
# check if cache file is accessible
if self.use_netbox_caching is True:
cache_this_class = True
if os.path.exists(cache_file) and not os.access(cache_file, os.R_OK):
log.warning("Got no permission to read existing cache file: {cache_file}")
cache_this_class = False
if os.path.exists(cache_file) and not os.access(cache_file, os.W_OK):
log.warning("Got no permission to write to existing cache file: {cache_file}")
cache_this_class = False
# read data from cache file
if cache_this_class is True:
try:
cached_nb_data = pickle.load( open( cache_file, "rb" ) )
except Exception:
pass
# get date of latest update in cache file
if cached_nb_data is not None:
latest_update_list = [x.get("last_updated") for x in cached_nb_data if x.get("last_updated") is not None]
if len(latest_update_list) > 0:
latest_update = sorted(latest_update_list)[-1]
log.debug(f"Successfully read cached data with {len(cached_nb_data)} '{nb_object_class.name}%s', last updated '{latest_update}'" % plural(len(cached_nb_data)))
elif self.testing_cache is False:
cache_this_class = False
if self.testing_cache is True and cached_nb_data is not None:
for object_data in cached_nb_data:
self.inventory.add_item_from_netbox(nb_object_class, data=object_data)
# mark this object class as retrieved
self.resolved_dependencies.add(nb_object_class)
continue
full_nb_data = None
brief_nb_data = None
updated_nb_data = None
# no cache data found
if latest_update is None:
# get all objects of this class
log.debug(f"Requesting all {nb_object_class.name}s from NetBox")
full_nb_data = self.request(nb_object_class)
if full_nb_data.get("results") is None:
log.error(f"Result data from NetBox for object {nb_object_class.__name__} missing!")
do_error_exit("Reading data from NetBox failed.")
else:
# request a brief list of existing objects
log.debug(f"Requesting a brief list of {nb_object_class.name}s from NetBox")
brief_nb_data = self.request(nb_object_class, params={"brief":1, "limit": 500})
log.debug("NetBox returned %d results." % len(brief_nb_data.get("results", list())))
log.debug(f"Requesting the last updates since {latest_update} of {nb_object_class.name}s from NetBox")
updated_nb_data = self.request(nb_object_class, params={"last_updated__gte": latest_update})
log.debug("NetBox returned %d results." % len(updated_nb_data.get("results", list())))
if brief_nb_data.get("results") is None or updated_nb_data.get("results") is None:
log.error(f"Result data from NetBox for object {nb_object_class.__name__} missing!")
do_error_exit("Reading data from NetBox failed.")
# read a full set from NetBox
nb_objects = list()
if full_nb_data is not None:
nb_objects = full_nb_data.get("results")
elif self.testing_cache is True:
nb_objects = cached_nb_data
# read the delta from NetBox and
else:
currently_existing_ids = [x.get("id") for x in brief_nb_data.get("results")]
changed_ids = [x.get("id") for x in updated_nb_data.get("results")]
for object in cached_nb_data:
if object.get("id") in currently_existing_ids and object.get("id") not in changed_ids:
nb_objects.append(object)
nb_objects.extend(updated_nb_data.get("results"))
if cache_this_class is True:
try:
pickle.dump(nb_objects, open( cache_file, "wb" ) )
log.debug("Successfully cached %d objects." % (len(nb_objects)))
except Exception as e:
log.warning(f"Failed to write NetBox data to cache file: {e}")
log.debug(f"Processing %s returned {nb_object_class.name}%s" % (len(nb_objects),plural(len(nb_objects))))
for object_data in nb_objects:
self.inventory.add_item_from_netbox(nb_object_class, data=object_data)
# mark this object class as retrieved
self.resolved_dependencies.add(nb_object_class)
return
def inizialize_basic_data(self):
"""
Adds the two basic tags to keep track of objects and see which
objects are no longer exists in source to automatically remove them
"""
log.debug("Checking/Adding NetBox Sync dependencies")
prune_text = f"Pruning is enabled and Objects will be automatically removed after {self.prune_delay_in_days} days"
if self.prune_enabled is False:
prune_text = f"Objects would be automatically removed after {self.prune_delay_in_days} days but pruning is currently disabled."
self.inventory.add_update_object(NBTags, data = {
"name": self.orphaned_tag,
"color": "607d8b",
"description": "A source which has previously provided this object no "
f"longer states it exists. {prune_text}"
})
self.inventory.add_update_object(NBTags, data = {
"name": self.primary_tag,
"description": "Created and used by NetBox Sync Script to keep track of created items. "
"DO NOT change this tag, otherwise syncing can't keep track of deleted objects."
})
def update_object(self, nb_object_sub_class, unset=False):
for object in self.inventory.get_all_items(nb_object_sub_class):
# resolve dependencies
for dependency in object.get_dependencies():
if dependency not in self.resolved_dependencies:
log.debug2("Resolving dependency: %s" % (dependency.name))
self.update_object(dependency)
# unset data if requested
if unset is True:
if len(object.unset_items) == 0:
continue
unset_data = {x: None for x in object.unset_items}
log.info("Updating NetBox '%s' object '%s' with data: %s" % (object.name, object.get_display_name(), unset_data))
returned_object_data = self.request(nb_object_sub_class, req_type="PATCH", data=unset_data, nb_id=object.nb_id)
if returned_object_data is not None:
object.update(data = returned_object_data, read_from_netbox=True)
object.resolve_relations()
else:
log.error(f"Request Failed for {nb_object_sub_class.name}. Used data: {unset_data}")
continue
data_to_patch = dict()
unresolved_dependency_data = dict()
for key, value in object.data.items():
if key in object.updated_items:
if isinstance(value, (NetBoxObject,NBObjectList)):
if value.get_nb_reference() is None:
unresolved_dependency_data[key] = value
else:
data_to_patch[key] = value.get_nb_reference()
else:
data_to_patch[key] = value
issued_request = False
returned_object_data = None
if len(data_to_patch.keys()) > 0:
# default is a new object
nb_id = None
req_type = "POST"
action = "Creating new"
# if its not a new object then update it
if object.is_new is False:
nb_id = object.nb_id
req_type = "PATCH"
action = "Updating"
log.info("%s NetBox '%s' object '%s' with data: %s" % (action, object.name, object.get_display_name(), data_to_patch))
returned_object_data = self.request(nb_object_sub_class, req_type=req_type, data=data_to_patch, nb_id=nb_id)
issued_request = True
if returned_object_data is not None:
object.update(data = returned_object_data, read_from_netbox=True)
elif issued_request is True:
log.error(f"Request Failed for {nb_object_sub_class.name}. Used data: {data_to_patch}")
# add unresolved dependencies back to object
if len(unresolved_dependency_data.keys()) > 0:
log.debug2("Adding unresolved dependencies back to object: %s" % list(unresolved_dependency_data.keys()))
object.update(data=unresolved_dependency_data)
object.resolve_relations()
# add class to resolved dependencies
self.resolved_dependencies.add(nb_object_sub_class)
def update_instance(self):
log.info("Updating changed data in NetBox")
# update all items in NetBox but unset items first
log.debug("First run, unset attributes if necessary.")
self.resolved_dependencies = set()
for nb_object_sub_class in NetBoxObject.__subclasses__():
self.update_object(nb_object_sub_class, unset=True)
# update all items
log.debug("Second run, update all items")
self.resolved_dependencies = set()
for nb_object_sub_class in NetBoxObject.__subclasses__():
self.update_object(nb_object_sub_class)
# run again to updated objects with previous unresolved dependencies
log.debug("Third run, update all items with previous unresolved items")
self.resolved_dependencies = set()
for nb_object_sub_class in NetBoxObject.__subclasses__():
self.update_object(nb_object_sub_class)
# ToDo: check for objects with unresolved relations
def prune_data(self):
if self.prune_enabled == False:
log.debug("Pruning disabled. Skipping")
return
log.info("Pruning orphaned data in NetBox")
# update all items in NetBox accordingly
today = datetime.now()
for nb_object_sub_class in reversed(self.inventory.resolved_dependencies):
for object in self.inventory.get_all_items(nb_object_sub_class):
if object.source is not None:
continue
if self.orphaned_tag not in object.get_tags():
continue
date_last_update = grab(object, "data.last_updated")
if date_last_update is None:
continue
# only need the date including seconds
date_last_update = date_last_update[0:19]
log.debug2(f"Object '{object.name}' '{object.get_display_name()}' is Orphaned. Last time changed: {date_last_update}")
# check prune delay.
last_updated = None
try:
last_updated = datetime.strptime(date_last_update,"%Y-%m-%dT%H:%M:%S")
except Exception:
continue
days_since_last_update = (today - last_updated).days
# it seems we need to delete this object
if last_updated is not None and days_since_last_update >= self.prune_delay_in_days:
log.info(f"{nb_object_sub_class.name.capitalize()} '{object.get_display_name()}' is orphaned for {days_since_last_update} days and will be deleted.")
self.request(nb_object_sub_class, req_type="DELETE", nb_id=object.nb_id)
return
def just_delete_all_the_things(self):
"""
Using a brute force approach. Try to delete everything 10 times.
This way we don't need to care about dependencies.
"""
log.info("Querying necessary objects from Netbox. This might take a while.")
self.query_current_data(NetBoxObject.__subclasses__())
log.info("Finished querying necessary objects from Netbox")
self.inventory.resolve_relations()
log.warning(f"Starting purge now. All objects with the tag '{self.primary_tag}' will be deleted!!!")
for iteration in range(10):
log.debug("Iteration %d trying to deleted all the objects." % (iteration + 1))
found_objects_to_delete = False
for nb_object_sub_class in reversed(NetBoxObject.__subclasses__()):
# tags need to be deleted at the end
if nb_object_sub_class == NBTags:
continue
# object has no tags so we can't be sure it was created with this program
if NBTagList not in nb_object_sub_class.data_model.values():
continue
for object in self.inventory.get_all_items(nb_object_sub_class):
# already deleted
if getattr(object, "deleted", False) is True:
continue
found_objects_to_delete = True
if self.primary_tag in object.get_tags():
log.info(f"{nb_object_sub_class.name} '{object.get_display_name()}' will be deleted now")
result = self.request(nb_object_sub_class, req_type="DELETE", nb_id=object.nb_id)
if result is not None:
object.deleted = True
if found_objects_to_delete is False:
# ToDo: test delete all
# get tag objects
primary_tag = self.inventory.get_by_data(NBTags, data = {"name": self.primary_tag})
orpahned_tag = self.inventory.get_by_data(NBTags, data = {"name": self.orphaned_tag})
# try to delete them
log.info(f"{NBTags.name} '{primary_tag.get_display_name()}' will be deleted now")
self.request(NBTags, req_type="DELETE", nb_id=primary_tag.nb_id)
log.info(f"{NBTags.name} '{orpahned_tag.get_display_name()}' will be deleted now")
self.request(NBTags, req_type="DELETE", nb_id=orpahned_tag.nb_id)
log.info("Successfully deleted all objects which were synced and tagged by this program.")
break
else:
log.warning("Unfortunately we were not able to delete all objects. Sorry")
return
# EOF