fixes orphan tagging if ignore_unknown_source_object_puning is set to true #273

This commit is contained in:
ricardo.bartels@telekom.de
2023-01-26 21:12:58 +01:00
parent a7e2b8184b
commit affd3875bb
3 changed files with 59 additions and 39 deletions

View File

@@ -22,6 +22,7 @@ class NetBoxInventory:
base_structure = dict()
source_tags_of_disabled_sources = list()
source_tags_of_enabled_sources = list()
def __init__(self):
@@ -32,6 +33,19 @@ class NetBoxInventory:
self.base_structure[object_type.name] = list()
def add_enabled_source_tag(self, source_tag=None):
"""
adds $source_tag to list of enabled sources
Parameters
----------
source_tag: str
source tag of disabled source
"""
if source_tag is not None:
self.source_tags_of_enabled_sources.append(source_tag)
def add_disabled_source_tag(self, source_tag=None):
"""
adds $source_tag to list of disabled sources
@@ -305,40 +319,40 @@ class NetBoxInventory:
f"from a currently disabled source. Skipping orphaned tagging.")
continue
if getattr(this_object, "prune", False) is True:
if getattr(this_object, "prune", False) is False:
# test for different conditions.
if netbox_handler.primary_tag not in this_object.get_tags():
# or just remove primary tag if pruning is disabled
this_object.remove_tags(netbox_handler.primary_tag)
this_object.remove_tags(netbox_handler.orphaned_tag)
continue
# test for different conditions.
if netbox_handler.primary_tag not in this_object.get_tags():
continue
if bool(set(this_object.get_tags()).intersection(self.source_tags_of_enabled_sources)) is False \
and netbox_handler.ignore_unknown_source_object_pruning is True:
continue
# don't mark IPs as orphaned if vm/device is only switched off
if isinstance(this_object, NBIPAddress):
device_vm_object = this_object.get_device_vm()
if device_vm_object is not None and \
grab(device_vm_object, "data.status") is not None and \
"active" not in str(grab(device_vm_object, "data.status")):
if netbox_handler.orphaned_tag in this_object.get_tags():
this_object.remove_tags(netbox_handler.orphaned_tag)
log.debug2(f"{device_vm_object.name} '{device_vm_object.get_display_name()}' has IP "
f"'{this_object.get_display_name()}' assigned but is in status "
f"{grab(device_vm_object, 'data.status')}. "
f"IP address will not marked as orphaned.")
continue
if netbox_handler.ignore_unknown_source_object_pruning is True:
continue
# don't mark IPs as orphaned if vm/device is only switched off
if isinstance(this_object, NBIPAddress):
device_vm_object = this_object.get_device_vm()
if device_vm_object is not None and \
grab(device_vm_object, "data.status") is not None and \
"active" not in str(grab(device_vm_object, "data.status")):
if netbox_handler.orphaned_tag in this_object.get_tags():
this_object.remove_tags(netbox_handler.orphaned_tag)
log.debug2(f"{device_vm_object.name} '{device_vm_object.get_display_name()}' has IP "
f"'{this_object.get_display_name()}' assigned but is in status "
f"{grab(device_vm_object, 'data.status')}. "
f"IP address will not marked as orphaned.")
continue
this_object.add_tags(netbox_handler.orphaned_tag)
# or just remove primary tag if pruning is disabled
else:
if netbox_handler.primary_tag in this_object.get_tags():
this_object.remove_tags(netbox_handler.primary_tag)
if netbox_handler.orphaned_tag in this_object.get_tags():
this_object.remove_tags(netbox_handler.orphaned_tag)
this_object.add_tags(netbox_handler.orphaned_tag)
def query_ptr_records_for_all_ips(self):
"""

View File

@@ -8,8 +8,8 @@
# repository or visit: <https://opensource.org/licenses/MIT>.
# define all available sources here
from .vmware.connection import VMWareHandler
from .check_redfish.import_inventory import CheckRedfish
from module.sources.vmware.connection import VMWareHandler
from module.sources.check_redfish.import_inventory import CheckRedfish
from module.common.configuration import get_config
from module.common.logging import get_logger
@@ -56,7 +56,7 @@ def validate_source(source_class_object=None, state="pre"):
# post initialization validation
for attr, value_type in necessary_attributes.items():
value = getattr(source_class_object, attr)
value = getattr(source_class_object, attr, None)
if not isinstance(value, value_type):
raise ValueError(f"Value for attribute '{attr}' needs to be {value_type}")
@@ -112,11 +112,8 @@ def instantiate_sources(config_handler=None, inventory=None):
source_class = None
for possible_source_class in valid_sources:
validate_source(possible_source_class)
source_class_type = getattr(possible_source_class, "source_type", None)
if source_class_type is None:
raise AttributeError("'%s' class attribute 'source_type' not defined." % source_class_type.__name__)
if source_class_type == source_type:
if possible_source_class.implements(source_type):
source_class = possible_source_class
break
@@ -140,6 +137,7 @@ def instantiate_sources(config_handler=None, inventory=None):
# add to list of source handlers
if source_handler.init_successful is True:
sources.append(source_handler)
inventory.add_enabled_source_tag(source_handler.source_tag)
elif getattr(source_handler, "enabled") is False:
inventory.add_disabled_source_tag(source_handler.source_tag)

View File

@@ -37,7 +37,15 @@ class SourceBase:
inventory = None
source_tag = None
# dummy function to implement a finish call for each source
@classmethod
def implements(cls, source_type):
if getattr(cls, "source_type", None) == source_type:
return True
return False
# stub function to implement a finish call for each source
def finish(self):
pass