#!/usr/local/munki/munki-python # encoding: utf-8 # # Copyright 2011-2025 Greg Neagle. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ installhelper Created 2024-04-23. A helper tool for handling launchd items when installing Munki """ from __future__ import absolute_import # standard libs import glob import os import logging import logging.handlers import re import subprocess import time import sys from pathlib import Path # Append munki's path, to load our libs when not running under munki # pylint: disable = C0413 sys.path.append('/usr/local/munki') # our libs from munkilib import osutils from munkilib import prefs from munkilib import FoundationPlist # globals LAUNCHD_PREFIX = 'com.googlecode.munki.' APPUSAGE_AGENT = LAUNCHD_PREFIX + 'app_usage_monitor' APPUSAGE_DAEMON = LAUNCHD_PREFIX + 'appusaged' INSTALL_HELPER = LAUNCHD_PREFIX + 'installhelper' PROG_NAME = 'managedsoftwareupdate' APP_NAME = 'installhelper' APP_VERSION = '0.1' def main(): '''Main function''' # If INSTALLHELPER_RUN_TYPE isn't set if not os.environ.get('INSTALLHELPER_RUN_TYPE'): # Set launchd_group to the first value passed launchd_group = sys.argv[1].lower() # If INSTALLHELPER_RUN_TYPE is set and we have a valid value passed elif (os.environ.get('INSTALLHELPER_RUN_TYPE') == 'appusage' or os.environ.get('INSTALLHELPER_RUN_TYPE') == 'launchd'): # Set launchd_group to the value of INSTALLHELPER_RUN_TYPE launchd_group = os.environ.get('INSTALLHELPER_RUN_TYPE') # If we've got here, something isn't right else: # Error message logging.info("ERROR: Cannot ascertain correct value for passed argument") # Exit sys.exit(1) # Setup logging setup_logging() # If not launched via launchd if not os.environ.get('INSTALLHELPER_RUN_TYPE'): # Launched manually logging.info("Launched manually - arg: %s", launchd_group) create_launch_daemon(launchd_group) # If launched via launchd else: # Launched via launchd logging.info("Launched via launchd - arg: %s", launchd_group) process_munki_launchd(launchd_group) def create_launch_daemon(launchd_group): '''Creates and loads the launch daemon''' # Set name and path based on the value of launchd_group launch_daemon_name = 'com.googlecode.munki.installhelper-' + launchd_group launch_daemon_path = '/Library/LaunchDaemons/' + launch_daemon_name + '.plist' # Progress notification logging.info("Creating launch daemon") # Retrieve all running launch daemons cmd = ['/bin/launchctl', 'list'] cmd_out = subprocess.run(cmd, capture_output = True, check = False) launchd_labels = split_launchctl_list(cmd_out, launchd_group) # Check to see if launch_daemon_name is already loaded, and stop if so if launch_daemon_name in launchd_labels: # Stop the launch daemon at the login window cmd = ['/bin/launchctl', 'bootout', 'system/' + launch_daemon_name] subprocess.call(cmd) # Populate launch deamon dict launch_daemon = {} launch_daemon['EnvironmentVariables'] = {'INSTALLHELPER_RUN_TYPE': launchd_group} launch_daemon['Label'] = launch_daemon_name launch_daemon['ProgramArguments'] = ['/usr/local/munki/installhelper'] launch_daemon['RunAtLoad'] = True # Create the launch daemon FoundationPlist.writePlist(launch_daemon, launch_daemon_path) # Set the launch daemons owner and perms os.chown(launch_daemon_path, 0, 0) os.chmod(launch_daemon_path, int('644', 8)) # Load the launch daemon logging.info("Starting: %s", launch_daemon_path) cmd = ['/bin/launchctl', 'bootstrap', 'system', launch_daemon_path] subprocess.call(cmd) def get_logged_in_users(): '''Returns a dict containing any logged in users uid and username''' # Var declaration user_details = {} # Retrieve details of all running processes cmd = ['/bin/ps', '-axo', 'uid,login,args'] cmd_out = subprocess.run(cmd, capture_output = True, check = False) # Split out the above output for some_process in cmd_out.stdout.decode().splitlines(): # Retrieve details for users that are running loginwindow.app, excluding root if "loginwindow.app" in some_process.lower() and some_process.split()[0] != "0": # Capture the userid and username for all users running the loginwindow user_details[some_process.split()[0]] = some_process.split()[1] # Returns either an empty dict, or a dict with userid and usernames return user_details def is_managedsoftwareupdate_running(): '''If managedsoftwareupdate is running, check again in x seconds''' # Check to see if managedsoftwareupdate is running managedsoftwareupdate_pid = osutils.pythonScriptRunning(PROG_NAME) # An instance of managedsoftwareupdate is running, so we need to try again later while managedsoftwareupdate_pid: managedsoftwareupdate_pid = osutils.pythonScriptRunning(PROG_NAME) logging.info("*" * 60) logging.info("%s is running as pid %s.", PROG_NAME, os.getpid()) logging.info("checking again in 10 seconds") logging.info("*" * 60) time.sleep(10) # If managedsoftwareupdate is not running.. return from function to proceed logging.info("%s is not running, proceeding...", (PROG_NAME)) def process_launch_daemons(launch_daemons, launchd_group, loginwindow_launchds): '''Processes Munki's Launch Daemons''' # Progress notification logging.info("Processing Launch Daemons") # Var declaration matched_labels = [] # Retrieve all running launch daemons cmd = ['/bin/launchctl', 'list'] cmd_out = subprocess.run(cmd, capture_output = True, check = False) launchd_labels = split_launchctl_list(cmd_out, launchd_group) # For matched launchd item in /Library/LaunchDaemons/ for launch_daemon in launch_daemons: try: # Make sure content is valid, before proceeding to reload daemon_content = FoundationPlist.readPlist(launch_daemon) except FoundationPlist.NSPropertyListSerializationException: break # Get the Label daemon_label = daemon_content.get('Label') # If we have a label and it's not an INSTALL_HELPER launchd item if daemon_label and not INSTALL_HELPER in daemon_label: # Check to see if the launch daemon is loaded, before stopping if daemon_label in launchd_labels: # Stop the launch daemon logging.info("Stopping: system/%s", daemon_label) cmd = ['/bin/launchctl', 'bootout', 'system/' + daemon_label] subprocess.call(cmd) # Enable the launch daemon logging.info("Enabling: system/%s", daemon_label) cmd = ['/bin/launchctl', 'enable', 'system/' + daemon_label] subprocess.call(cmd) # Load the launch daemon logging.info("Starting: %s", launch_daemon) cmd = ['/bin/launchctl', 'bootstrap', 'system', launch_daemon] subprocess.call(cmd) # Append the launchd items label to matched_labels matched_labels.append(daemon_label) # If we're to interact with all non-appusage launchd items if launchd_group == 'launchd': # For each label in launchd_labels for launchd_label in launchd_labels: # If we have a label, it's not in matched_labels or loginwindow_launchds if launchd_label not in matched_labels + loginwindow_launchds: # Stop the launch daemon logging.info("Stopping orphaned launch daemon: system/%s", launchd_label) cmd = ['/bin/launchctl', 'bootout', 'system/' + launchd_label] subprocess.call(cmd) def process_loginwindow_launch_agents(launch_agents): '''Processes Munki's LoginWindow Launch Agents''' # Progress notification logging.info("Processing LoginWindow Launch Agents") # Var declaration loginwindow_launchds = [] # Get the labels for any launchd running at the LoginWindow, this will only list LoginWindow # launch agents when ran at the LoginWindow cmd = ['/bin/launchctl', 'dumpstate'] cmd_out = subprocess.run(cmd, capture_output = True, check = False) launchd_labels = re.findall("loginwindow/.*/(" + LAUNCHD_PREFIX + ".*) =", cmd_out.stdout.decode()) # For matched launchd item in /Library/LaunchAgents/ for launch_agent in launch_agents: try: # Make sure content is valid, before proceeding to reload agent_content = FoundationPlist.readPlist(launch_agent) except FoundationPlist.NSPropertyListSerializationException: break # If we're to limit loading if agent_content.get('LimitLoadToSessionType'): # If the launch agent includes LimitLoadToSessionType, and it contains LoginWindow if 'LoginWindow' in agent_content.get('LimitLoadToSessionType'): # If the launch agent is to be loaded at the login window, get the Label agent_label = agent_content.get('Label') # If we have a label and it's not an INSTALL_HELPER launchd item if agent_label and not INSTALL_HELPER in agent_label: # Check to see if the launch agent is loaded, before stopping if agent_label in launchd_labels: # Stop the launch agent at the login window logging.info("Stopping: %s", launch_agent) cmd = ['/bin/launchctl', 'unload', '-S', 'LoginWindow', launch_agent] subprocess.call(cmd) # Load the launch agent at the login window logging.info("Starting: %s", launch_agent) cmd = ['/bin/launchctl', 'load', '-S', 'LoginWindow', launch_agent] subprocess.call(cmd) # Append the launchd items label to loginwindow_launchds loginwindow_launchds.append(agent_label) # For each label in launchd_labels for launchd_label in launchd_labels: # If we have a label, it's not in loginwindow_launchds and it's not an INSTALL_HELPER # launchd item if (launchd_label not in loginwindow_launchds and not launchd_label.startswith(INSTALL_HELPER)): # Stop the launch agent logging.info("Stopping orphaned LoginWindow launch agent: %s", launchd_label) cmd = ['/bin/launchctl', 'bootout', 'loginwindow/' + launchd_label] subprocess.call(cmd) # Returns loginwindow_launchds return loginwindow_launchds def process_munki_launchd(launchd_group): '''Reload Munki's launchd''' # If we're to interact only with the appusage launchd items if launchd_group == 'appusage': # returns a list containing just the path to the app_usage_monitor launch agent launch_agents = ['/Library/LaunchAgents/' + APPUSAGE_AGENT+ '.plist'] # returns a list containing just the path to the appusaged launch daemon launch_daemons = ['/Library/LaunchDaemons/' + APPUSAGE_DAEMON + '.plist'] # If we're to interact with all non-appusage launchd items if launchd_group == 'launchd': # Returns a list of Munki's launch agents, excluding the APPUSAGE_AGENT agent launch_agents = [launch_agent for launch_agent in glob.glob('/Library/LaunchAgents/' + LAUNCHD_PREFIX + '*') if not APPUSAGE_AGENT in launch_agent] # Returns a list of Munki's launch daemons, excluding the APPUSAGE_DAEMON and # INSTALL_HELPER launchd items launch_daemons = [launch_daemon for launch_daemon in glob.glob('/Library/LaunchDaemons/' + LAUNCHD_PREFIX + '*') if not (APPUSAGE_DAEMON and INSTALL_HELPER) in launch_daemon] # Only proceed if PROG_NAME isn't running, looping until it's not running is_managedsoftwareupdate_running() # Get the userid and username of any logged in users user_details = get_logged_in_users() # If users are logged in if user_details: # If users are logged in, reload the required launch agents as them process_user_launch_agents(launch_agents, launchd_group, user_details) # If we're not only looking to reload the appusage launchd's if launchd_group != 'appusage': # reload munki's loginwindow launch agents, returning a list of labels loginwindow_launchds = process_loginwindow_launch_agents(launch_agents) # If we're just looking to reload the appusage launchd's else: # Set to an empty list loginwindow_launchds = [] # Reload launch daemons as needed process_launch_daemons(launch_daemons, launchd_group, loginwindow_launchds) # Set name and path based on the value of launchd_group launch_daemon_name = INSTALL_HELPER + '-' + launchd_group launch_daemon_path = '/Library/LaunchDaemons/' + launch_daemon_name + '.plist' # If the launch daemon exists if os.path.isfile(launch_daemon_path): # Progress notification logging.info("Completed tasks, tidying up") # Delete the launch daemon logging.info("Deleting: %s", launch_daemon_path) os.unlink(launch_daemon_path) # Unload the launch daemon logging.info("Unloading: %s", launch_daemon_name) cmd = ['/bin/launchctl', 'bootout', 'system/' + launch_daemon_name] subprocess.call(cmd) def process_user_launch_agents(launch_agents, launchd_group, user_details): '''Processes Munki's user Launch Agents''' # For each logged in user for userid, username in user_details.items(): # Var declaration matched_labels = [] # Log who we're reloading the Launch Agents as logging.info("Processing Launch Agents for: %s", username) # Retrieve all running launch agents for the user cmd = ['/bin/launchctl', 'asuser', str(userid), '/bin/launchctl', 'list'] cmd_out = subprocess.run(cmd, capture_output = True, check = False) launchd_labels = split_launchctl_list(cmd_out, launchd_group) # For each plist in /Library/LaunchAgents/ prefixed with LAUNCHD_PREFIX for launch_agent in launch_agents: try: # Make sure content is valid, before proceeding to reload agent_content = FoundationPlist.readPlist(launch_agent) except FoundationPlist.NSPropertyListSerializationException: break # If the launch agent doesn't contain LimitLoadToSessionType or # LimitLoadToSessionType exists and it contains Aqua if (not agent_content.get('LimitLoadToSessionType') or 'Aqua' in agent_content.get('LimitLoadToSessionType')): # If the launch agent is to be loaded at the login window, get the Label agent_label = agent_content.get('Label') # If we have a label if agent_label: # Check to see if the launch agent is loaded, before stopping if agent_label in launchd_labels: # Stop the launch agent logging.info("Stopping: %s", launch_agent) cmd = ['/bin/launchctl', 'bootout', 'gui/' + str(userid) + '/' + agent_label] subprocess.call(cmd) # Enable the launch agent logging.info("Enabling: gui/%s", agent_label) cmd = ['/bin/launchctl', 'enable', 'gui/' + agent_label] subprocess.call(cmd) # Load the launch agent logging.info("Starting: %s", launch_agent) cmd = ['/bin/launchctl', 'bootstrap', 'gui/' + str(userid), launch_agent] subprocess.call(cmd) # Append the launchd items label to matched_labels matched_labels.append(agent_label) # If we're to interact with all non-appusage launchd items if launchd_group == 'launchd': # For each label in launchd_labels for launchd_label in launchd_labels: # If we have a label, it's not in matched_labels if launchd_label not in matched_labels + [APPUSAGE_AGENT]: # Stop the launch agent logging.info("Stopping orphaned launch agent: %s", launchd_label) cmd = ['/bin/launchctl', 'bootout', 'gui/' + str(userid) + '/' + launchd_label] subprocess.call(cmd) def setup_logging(): '''Configure logging''' # Creates the logging dir if doesn't exist Path(os.path.dirname(prefs.pref('LogFile'))).mkdir(parents=True, exist_ok=True) # Store the log in the same directory as ManagedSoftwareUpdate.log log_path = os.path.join(os.path.dirname(prefs.pref('LogFile')), APP_NAME + '.log') # Generic settings log_level = logging.getLevelName('INFO') # Log format log_format = logging.Formatter('%(asctime)s %(levelname)s %(message)s', '%b %d %Y %H:%M:%S %z') # Change root logger level from WARNING to NOTSET in order for all messages to be delegated. # https://stackoverflow.com/a/59431653 logging.getLogger('').setLevel(logging.NOTSET) # Create a rotating log file handler, rotate at 10MB, keeping 9 backups file_logger = logging.handlers.RotatingFileHandler(log_path, 'a', 100000, 9, 'utf-8') # Set file logging level file_logger.setLevel(log_level) # Set file log format file_logger.setFormatter(log_format) # Add to logger logging.getLogger('').addHandler(file_logger) # Add console handler to logging object console_logger = logging.StreamHandler() # Set console logging level console_logger.setLevel(log_level) # Set console logging format console_logger.setFormatter(log_format) # Add to logger logging.getLogger('').addHandler(console_logger) # Start log logging.info("Starting %s - %s", APP_NAME, APP_VERSION) def split_launchctl_list(cmd_out, launchd_group): '''Takes launchctl list output and returns a list with just the labels''' # Var declaration launchd_labels = [] # If we're to interact only with the appusage launchd items if launchd_group == 'appusage': # Exclude list exclude_items = [INSTALL_HELPER + '-' + launchd_group] # If we're to interact with all non-appusage launchd items if launchd_group == 'launchd': # Exclude list exclude_items = [APPUSAGE_DAEMON, APPUSAGE_AGENT, INSTALL_HELPER + '-' + launchd_group] # Strip cmd_out to just the labels of the running launch agents for launchd_item in cmd_out.stdout.decode().splitlines(): # Get the launchd label from the output launchd_label = launchd_item.split('\t')[2] # If the launchd_item starts with LAUNCHD_PREFIX, and doesn't include some others if launchd_label.startswith(LAUNCHD_PREFIX) and not launchd_label in exclude_items: # Add the label to launchd_labels launchd_labels.append(launchd_item.split('\t')[2]) # Returns a list of launchd labels return launchd_labels if __name__ == '__main__': # Check to see if we're root if os.geteuid() != 0: print("ERROR: You must run this as root!", file=sys.stderr) sys.exit(1) # If INSTALLHELPER_RUN_TYPE isn't set if not os.environ.get('INSTALLHELPER_RUN_TYPE'): # If we have no arguments passed or the arguments passed are not expected.. exit if len(sys.argv) == 1 or not (sys.argv[1].lower() == 'appusage' or sys.argv[1].lower() == 'launchd'): print("ERROR: Requires either \'appusage\' or \'launchd\' arguments to be passed.") sys.exit(1) # If INSTALLHELPER_RUN_TYPE is set else: # Else if we have INSTALLHELPER_RUN_TYPE defined, and it's not what's expected.. exit if not (os.environ.get('INSTALLHELPER_RUN_TYPE') != 'appusage' or os.environ.get('INSTALLHELPER_RUN_TYPE') != 'launchd'): # Set launchd_group to the value of INSTALLHELPER_RUN_TYPE print("ERROR: Requires INSTALLHELPER_RUN_TYPE to be set to either \'appusage\' or " "\'launchd\'") sys.exit(1) # Proceed to run main main()