#!/usr/bin/python # encoding: utf-8 # # Copyright 2009-2011 Greg Neagle. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ manifestutil Created by Greg Neagle on 2011-03-04. """ import fnmatch import shlex import subprocess import sys import optparse import os import readline try: from munkilib import FoundationPlist as plistlib except ImportError: try: import FoundationPlist as plistlib except ImportError: # maybe we're not on an OS X machine... print >> sys.stderr, ("WARNING: FoundationPlist is not available, " "using plistlib instead.") import plistlib try: from munkilib.munkicommon import get_version except ImportError: # munkilib is not available def get_version(): '''Placeholder if munkilib is not available''' return 'UNKNOWN' def getInstallerItemNames(cataloglist): '''Returns a list of unique installer item (pkg) names from the given list of catalogs''' item_list = [] catalogs_path = os.path.join(pref('repo_path'), 'catalogs') for filename in os.listdir(catalogs_path): if filename in cataloglist: try: catalog = plistlib.readPlist( os.path.join(catalogs_path, filename)) except Exception: # skip items that aren't valid plists # or that we can't read pass else: item_list.extend([item['name'] for item in catalog if not item.get('update_for')]) item_list = list(set(item_list)) item_list.sort() return item_list def getManifestNames(): '''Returns a list of available manifests''' manifests_path = os.path.join(pref('repo_path'), 'manifests') manifests = [] for dirpath, dirnames, filenames in os.walk(manifests_path): subdir = dirpath[len(manifests_path):] for name in filenames: if name.startswith("."): # don't process these continue manifests.append(os.path.join(subdir, name).lstrip('/')) return manifests def getCatalogs(): '''Returns a list of available catalogs''' catalogs_path = os.path.join(pref('repo_path'), 'catalogs') catalogs = [] for name in os.listdir(catalogs_path): if name.startswith(".") or name == 'all': # don't process these continue try: catalog = plistlib.readPlist( os.path.join(catalogs_path, name)) except Exception: # skip items that aren't valid plists pass else: catalogs.append(name) return catalogs def getManifestPkgSections(): '''Returns a list of manifest sections that can contain pkg names''' return ['managed_installs', 'managed_uninstalls', 'managed_updates', 'optional_installs'] def printplistitem(label, value, indent=0): """Prints a plist item in an 'attractive' way""" indentspace = ' ' if type(value) == type(None): print indentspace*indent, '%s: !NONE!' % label elif type(value) == list or type(value).__name__ == 'NSCFArray': if label: print indentspace*indent, '%s:' % label index = 0 for item in value: index += 1 printplistitem('', item, indent+1) elif type(value) == dict or type(value).__name__ == 'NSCFDictionary': if label: print indentspace*indent, '%s:' % label for subkey in value.keys(): printplistitem(subkey, value[subkey], indent+1) else: if label: print indentspace*indent, '%s: %s' % (label, value) else: print indentspace*indent, '%s' % value def printplist(plistdict): """Prints plist dictionary in a pretty(?) way""" keys = list(plistdict.keys()) keys.sort() for key in keys: printplistitem(key, plistdict[key]) def getManifest(manifest_name): '''Gets the contents of a manifest''' manifest_path = os.path.join( pref('repo_path'), 'manifests', manifest_name) if os.path.exists(manifest_path): try: return plistlib.readPlist(manifest_path) except Exception, errmsg: print >> sys.stderr, \ 'Could not read manifest %s' % manifest_name return None else: print >> sys.stderr, 'Manifest %s doesn\'t exist!' % manifest_name return None def saveManifest(manifest_dict, manifest_name, overwrite_existing=False): '''Saves a manifest to disk''' manifest_path = os.path.join( pref('repo_path'), 'manifests', manifest_name) if not overwrite_existing: if os.path.exists(manifest_path): print >> sys.stderr, '%s already exists!' % manifest_name return False try: plistlib.writePlist(manifest_dict, manifest_path) return True except Exception, errmsg: print >> sys.stderr, 'Saving %s failed: %s' % (manifest_name, errmsg) return False def repoAvailable(): """Checks the repo path for proper directory structure. If the directories look wrong we probably don't have a valid repo path. Returns True if things look OK.""" repo_path = pref('repo_path') if not repo_path: print >> sys.stderr, 'No repo path specified.' return False if not os.path.exists(repo_path): mountRepoCLI() if not os.path.exists(repo_path): return False for subdir in ['catalogs', 'manifests', 'pkgs', 'pkgsinfo']: if not os.path.exists(os.path.join(repo_path, subdir)): print >> sys.stderr, "%s is missing %s" % (repo_path, subdir) return False # if we get this far, the repo path looks OK return True def mountRepoCLI(): """Attempts to connect to the repo fileshare""" global WE_MOUNTED_THE_REPO repo_path = pref('repo_path') repo_url = pref('repo_url') if os.path.exists(repo_path): return os.mkdir(repo_path, 0777) print 'Attempting to mount fileshare %s:' % repo_url cmd = ['/sbin/mount_afp', '-i', repo_url, repo_path] retcode = subprocess.call(cmd) if retcode: os.rmdir(repo_path) else: WE_MOUNTED_THE_REPO = True def unmountRepoCLI(): """Attempts to unmount the repo fileshare""" repo_path = pref('repo_path') if not os.path.exists(repo_path): return cmd = ['/sbin/umount', repo_path] return subprocess.call(cmd) def cleanupAndExit(exitcode): result = 0 if WE_MOUNTED_THE_REPO: answer = raw_input('Unmount the repo fileshare? [y/n] ') if answer.lower().startswith('y'): result = unmountRepoCLI() exit(exitcode or result) _prefs = {} def pref(prefname): """Returns a preference for prefname""" global _prefs if not _prefs: try: _prefs = plistlib.readPlist(PREFSPATH) except Exception: pass if prefname in _prefs: return _prefs[prefname] else: return None def updateCachedManifestList(): '''Updates our cached list of available manifests so our completer will return all the available manifests.''' CMD_ARG_DICT['manifests'] = getManifestNames() ##### subcommand functions ##### class MyOptParseError(Exception): '''Exception for our custom option parser''' pass class MyOptionParser(optparse.OptionParser): '''Custom option parser that overrides the error handler''' def error(self, msg): """error(msg : string) """ self.print_usage(sys.stderr) raise MyOptParseError('option error: %s' % msg) def version(args): '''Prints version number''' if len(args) != 0: print >> sys.stderr, 'Usage: version' return 22 # Invalid argument print get_version() return 0 def list_catalogs(args): '''Prints the names of the available catalogs''' if len(args) != 0: print >> sys.stderr, 'Usage: list-catalogs' return 22 # Invalid argument for item in getCatalogs(): print item return 0 def list_catalog_items(args): '''Lists items in the given catalogs''' if len(args) == 0: print >> sys.stderr, \ 'Usage: list-catalog-items CATALOG_NAME [CATALOG_NAME ...]' return 1 # Operation not permitted available_catalogs = getCatalogs() for catalog in args: if catalog not in available_catalogs: print >> sys.stderr, '%s: no such catalog!' % catalog return 2 # No such file or directory for pkg in getInstallerItemNames(args): print pkg return 0 def list_manifests(args): '''Prints names of available manifests, filtering on arg[0] similar to Unix file globbing''' p = MyOptionParser() p.set_usage( 'list-manifests [FILE_NAME_MATCH_STRING]') if len(args) == 1: list_filter = args[0] elif len(args) == 0: list_filter = '*' else: p.print_usage(sys.stderr) return 7 # Argument list too long for item in getManifestNames(): if fnmatch.fnmatch(item, list_filter): print item return 0 def find(args): '''Find text in manifests, optionally searching just a specific manifest section specified by keyname''' p = MyOptionParser() p.set_usage( 'find FIND_TEXT [--section SECTION_NAME]') p.add_option('--section', metavar='SECTION_NAME', help='Section of the manifest to search for FIND_TEXT') try: options, arguments = p.parse_args(args) except MyOptParseError, errmsg: print >> sys.stderr, str(errmsg) return 22 # Invalid argument if not options.section and len(arguments) == 2: options.section = arguments[1] del arguments[1] if len(arguments) != 1: p.print_usage(sys.stderr) return 7 # Argument list too long findtext = arguments[0] keyname = options.section manifests_path = os.path.join(pref('repo_path'), 'manifests') count = 0 for name in getManifestNames(): pathname = os.path.join(manifests_path, name) manifest = plistlib.readPlist(pathname) if keyname: if keyname in manifest: value = manifest[keyname] if type(value) == list or type(value).__name__ == 'NSCFArray': for item in value: if findtext.upper() in item.upper(): print '%s: %s' % (name, item) count += 1 break elif findtext.upper() in value.upper(): print '%s: %s' % (name, value) count += 1 else: for key in manifest.keys(): value = manifest[key] if type(value) == list or type(value).__name__ == 'NSCFArray': for item in value: if findtext.upper() in item.upper(): print '%s (%s): %s' % (name, key, item) count += 1 break elif findtext.upper() in value.upper(): print '%s (%s): %s' % (name, key, value) count += 1 print '%s items found.' % count return 0 def display_manifest(args): '''Prints contents of a given manifest''' if len(args) != 1: print >> sys.stderr, 'Usage: display-manifest MANIFESTNAME' return 7 # Argument list too long manifestname = args[0] manifest = getManifest(manifestname) if manifest: printplist(manifest) return 0 else: return 2 # No such file or directory def new_manifest(args): '''Creates a new, empty manifest''' if len(args) != 1: print >> sys.stderr, 'Usage: new-manifest MANIFESTNAME' return 7 # Argument list too long manifest_name = args[0] manifest = {'catalogs': [], 'included_manifests': [], 'managed_installs': [], 'managed_uninstalls': []} if saveManifest(manifest, manifest_name): updateCachedManifestList() return 0 else: return 1 # Operation not permitted def copy_manifest(args): '''Copies one manifest to another''' if len(args) != 2: print >> sys.stderr, \ 'Usage: copy-manifest SOURCE_MANIFEST DESTINATION_MANIFEST' return 7 # Argument list too long source_manifest = args[0] dest_manifest = args[1] manifest = getManifest(source_manifest) if manifest and saveManifest(manifest, dest_manifest): updateCachedManifestList() return 0 else: return 1 # Operation not permitted def add_pkg(args): '''Adds a package to a manifest.''' p = MyOptionParser() p.set_usage( 'add-pkg PKGNAME --manifest MANIFESTNAME [--section SECTIONNAME]') p.add_option('--manifest') p.add_option('--section', default='managed_installs') try: options, arguments = p.parse_args(args) except MyOptParseError, errmsg: print >> sys.stderr, str(errmsg) return 22 # Invalid argument if not options.manifest: if len(arguments) == 2: options.manifest = arguments[1] del arguments[1] else: p.print_usage(sys.stderr) return 7 # Argument list too long if len(arguments) != 1: p.print_usage(sys.stderr) return 7 # Argument list too long pkgname = arguments[0] manifest = getManifest(options.manifest) if not manifest: return 2 # No such file or directory for section in getManifestPkgSections(): if pkgname in manifest.get(section, []): print >> sys.stderr, ( 'Package %s is already in section %s of manifest %s.' % (pkgname, section, options.manifest)) return 1 # Operation not permitted manifest_catalogs = manifest.get('catalogs', []) available_pkgnames = getInstallerItemNames(manifest_catalogs) if pkgname not in available_pkgnames: print >> sys.stderr, ( 'WARNING: Package %s is not available in catalogs %s ' 'of manifest %s.' % (pkgname, manifest_catalogs, options.manifest)) if not options.section in manifest: manifest[options.section] = [pkgname] else: manifest[options.section].append(pkgname) if saveManifest(manifest, options.manifest, overwrite_existing=True): print ('Added %s to section %s of manifest %s.' % (pkgname, options.section, options.manifest)) return 0 else: return 1 # Operation not permitted def remove_pkg(args): '''Removes a package from a manifest.''' p = MyOptionParser() p.set_usage( 'remove-pkg PKGNAME --manifest MANIFESTNAME ' '[--section SECTIONNAME]') p.add_option('--manifest') p.add_option('--section', default='managed_installs') try: options, arguments = p.parse_args(args) except MyOptParseError, errmsg: print >> sys.stderr, str(errmsg) return 22 # Invalid argument if not options.manifest: if len(arguments) == 2: options.manifest = arguments[1] del arguments[1] else: p.print_usage(sys.stderr) return 7 # Argument list too long if len(arguments) != 1: p.print_usage(sys.stderr) return 7 # Argument list too long pkgname = arguments[0] manifest = getManifest(options.manifest) if not manifest: return 2 # No such file or directory if not options.section in manifest: print >> sys.stderr, ('Section %s is not in manifest %s.' % (options.section, manifest)) return 1 # Operation not permitted if pkgname not in manifest[options.section]: print >> sys.stderr, ('Package %s is not in section %s ' 'of manifest %s.' % (pkgname, options.section, options.manifest)) return 1 # Operation not permitted else: manifest[options.section].remove(pkgname) if saveManifest(manifest, options.manifest, overwrite_existing=True): print ('Removed %s from section %s of manifest %s.' % (pkgname, options.section, options.manifest)) return 0 else: return 1 # Operation not permitted def add_catalog(args): '''Adds a catalog to a manifest.''' p = MyOptionParser() p.set_usage('add-catalog CATALOGNAME --manifest MANIFESTNAME') p.add_option('--manifest') try: options, arguments = p.parse_args(args) except MyOptParseError, errmsg: print >> sys.stderr, str(errmsg) return 22 # Invalid argument if not options.manifest: if len(arguments) == 2: options.manifest = arguments[1] del arguments[1] else: p.print_usage(sys.stderr) return 7 # Argument list too long if len(arguments) != 1: p.print_usage(sys.stderr) return 7 # Argument list too long catalogname = arguments[0] available_catalogs = getCatalogs() if catalogname not in available_catalogs: print >> sys.stderr, 'Unknown catalog name: %s.' % catalogname return 2 # no such file or directory manifest = getManifest(options.manifest) if not manifest: return 2 # no such file or directory if not 'catalogs' in manifest: manifest['catalogs'] = [] if catalogname in manifest['catalogs']: print >> sys.stderr, ( 'Catalog %s is already in manifest %s.' % (catalogname, options.manifest)) return 1 # Operation not permitted else: # put it at the front of the catalog list as that is usually # what is wanted... manifest['catalogs'].insert(0, catalogname) if saveManifest(manifest, options.manifest, overwrite_existing=True): print ('Added %s to catalogs of manifest %s.' % (catalogname, options.manifest)) return 0 else: return 1 # Operation not permitted def remove_catalog(args): '''Removes a catalog from a manifest.''' p = MyOptionParser() p.set_usage('remove-catalog PKGNAME --manifest MANIFESTNAME') p.add_option('--manifest') try: options, arguments = p.parse_args(args) except MyOptParseError, errmsg: print >> sys.stderr, str(errmsg) return 22 # Invalid argument if not options.manifest: if len(arguments) == 2: options.manifest = arguments[1] del arguments[1] else: p.print_usage(sys.stderr) return 7 # Argument list too long if len(arguments) != 1: p.print_usage(sys.stderr) return 7 # Argument list too long catalogname = arguments[0] manifest = getManifest(options.manifest) if not manifest: return 2 # no such file or directory if catalogname not in manifest.get('catalogs', []): print >> sys.stderr, ( 'Catalog %s is not in manifest %s.' % (catalogname, options.manifest)) return 1 # Operation not permitted else: manifest['catalogs'].remove(catalogname) if saveManifest(manifest, options.manifest, overwrite_existing=True): print ('Removed %s from catalogs of manifest %s.' % (catalogname, options.manifest)) return 0 else: return 1 # Operation not permitted def add_included_manifest(args): '''Adds an included manifest to a manifest.''' p = MyOptionParser() p.set_usage('add-included-manifest MANIFEST_TO_INCLUDE ' '--manifest TARGET_MANIFEST') p.add_option('--manifest') try: options, arguments = p.parse_args(args) except MyOptParseError, errmsg: print >> sys.stderr, str(errmsg) return 22 # Invalid argument if not options.manifest: if len(arguments) == 2: options.manifest = arguments[1] del arguments[1] else: p.print_usage(sys.stderr) return 7 # Argument list too long if len(arguments) != 1: p.print_usage(sys.stderr) return 7 # Argument list too long manifest_to_include = arguments[0] available_manifests = getManifestNames() if manifest_to_include not in available_manifests: print >> sys.stderr, ('Unknown manifest name: %s.' % manifest_to_include) return 2 # no such file or directory if manifest_to_include == options.manifest: print >> sys.stderr, ('Can\'t include %s in itself!.' % manifest_to_include) return 1 # Operation not permitted manifest = getManifest(options.manifest) if not manifest: return 2 # no such file or directory if not 'included_manifests' in manifest: manifest['included_manifests'] = [] if manifest_to_include in manifest['included_manifests']: print >> sys.stderr, ( 'Manifest %s is already included in manifest %s.' % (manifest_to_include, options.manifest)) return 1 # Operation not permitted else: manifest['included_manifests'].append(manifest_to_include) if saveManifest(manifest, options.manifest, overwrite_existing=True): print ('Added %s to included_manifests of manifest %s.' % (manifest_to_include, options.manifest)) return 0 else: return 1 # Operation not permitted def remove_included_manifest(args): '''Removes an included manifest from a manifest.''' p = MyOptionParser() p.set_usage( 'remove-included_manifest INCLUDED_MANIFEST --manifest MANIFESTNAME') p.add_option('--manifest') try: options, arguments = p.parse_args(args) except MyOptParseError, errmsg: print >> sys.stderr, str(errmsg) return 22 # Invalid argument if not options.manifest: if len(arguments) == 2: options.manifest = arguments[1] del arguments[1] else: p.print_usage(sys.stderr) return 7 # Argument list too long if len(arguments) != 1: p.print_usage(sys.stderr) return 7 # Argument list too long included_manifest = arguments[0] manifest = getManifest(options.manifest) if not manifest: return 2 # no such file or directory if included_manifest not in manifest.get('included_manifests', []): print >> sys.stderr, ( 'Manifest %s is not included in manifest %s.' % (included_manifest, options.manifest)) return 1 # Operation not permitted else: manifest['included_manifests'].remove(included_manifest) if saveManifest(manifest, options.manifest, overwrite_existing=True): print ('Removed %s from included_manifests of manifest %s.' % (included_manifest, options.manifest)) return 0 else: return 1 # Operation not permitted def help(args): '''Prints available subcommands''' print "Available sub-commands:" subcommands = CMD_ARG_DICT['cmds'].keys() subcommands.sort() for item in subcommands: print '\t%s' % item return 0 def configure(args): """Configures manifestutil for use""" if len(args): print >> sys.stderr, 'Usage: configure' return 22 # Invalid argument for (key, prompt) in [ ('repo_path', 'Path to munki repo (example: /Volumes/repo)'), ('repo_url', 'Repo fileshare URL (example: afp://munki.pretendco.com/repo)')]: newvalue = raw_input('%15s [%s]: ' % (prompt, pref(key))) _prefs[key] = newvalue or pref(key) or '' try: plistlib.writePlist(_prefs, PREFSPATH) return 0 except Exception: print >> sys.stderr, 'Could not save configuration to %s' % PREFSPATH return 1 # Operation not permitted def tab_completer(text, state): '''Called by the readline lib to calculate possible completions''' array_to_match = None if readline.get_begidx() == 0: # since we are at the start of the line # we are matching commands array_to_match = 'cmds' match_list = CMD_ARG_DICT.get('cmds', {}).keys() else: # we are matching args cmd_line = readline.get_line_buffer()[0:readline.get_begidx()] cmd = shlex.split(cmd_line)[-1] array_to_match = CMD_ARG_DICT.get('cmds', {}).get(cmd) if array_to_match: match_list = CMD_ARG_DICT[array_to_match] else: array_to_match = CMD_ARG_DICT.get('options', {}).get(cmd) if array_to_match: match_list = CMD_ARG_DICT[array_to_match] else: array_to_match = 'options' match_list = CMD_ARG_DICT.get('options',{}).keys() matches = [item for item in match_list if item.upper().startswith(text.upper())] try: return matches[state] except IndexError: return None def setUpTabCompleter(): '''Starts our tab-completer when running interactively''' readline.set_completer(tab_completer) if sys.platform == 'darwin': readline.parse_and_bind ("bind ^I rl_complete") else: readline.parse_and_bind("tab: complete") def handleSubcommand(args): '''Does all our subcommands''' # strip leading hyphens and # replace embedded hyphens with underscores # so '--add-pkg' becomes 'add_pkg' # and 'new-manifest' becomes 'new_manifest' subcommand = args[0].lstrip('-').replace('-', '_') # special case the exit command if subcommand == 'exit': cleanupAndExit(0) if subcommand not in ['version', 'configure', 'help']: if not repoAvailable(): exit(-1) try: # find function to call by looking in the global name table # for a function with a name matching the subcommand subcommand_function = globals()[subcommand] return subcommand_function(args[1:]) except (TypeError, KeyError): print >> sys.stderr, 'Unknown subcommand: %s' % subcommand help(args) return 2 PREFSNAME = 'com.googlecode.munki.munkiimport.plist' PREFSPATH = os.path.expanduser(os.path.join('~/Library/Preferences', PREFSNAME)) WE_MOUNTED_THE_REPO = False INTERACTIVE_MODE = False CMD_ARG_DICT = {} def main(): global INTERACTIVE_MODE global CMD_ARG_DICT cmds = {'add-pkg': 'pkgs', 'add-catalog': 'catalogs', 'add-included-manifest': 'manifests', 'remove-pkg': 'pkgs', 'remove-catalog': 'catalogs', 'remove-included-manifest': 'manifests', 'list-manifests': 'manifests', 'list-catalogs': 'default', 'list-catalog-items': 'catalogs', 'display-manifest': 'manifests', 'find': 'default', 'new-manifest': 'default', 'copy-manifest': 'manifests', 'exit': 'default', 'help': 'default', 'configure': 'default', 'version': 'default' } CMD_ARG_DICT['cmds'] = cmds if len(sys.argv) > 1: # some commands or options were passed at the command line cmd = sys.argv[1].lstrip('-') retcode = handleSubcommand(sys.argv[1:]) cleanupAndExit(retcode) else: # if we get here, no options or commands, # so let's enter interactive mode INTERACTIVE_MODE = True # must have an available repo for interfactive mode if not repoAvailable(): exit(-1) # build the rest of our dict to enable tab completion CMD_ARG_DICT['options'] = {'--manifest': 'manifests', '--section': 'sections'} CMD_ARG_DICT['default'] = [] CMD_ARG_DICT['sections'] = getManifestPkgSections() CMD_ARG_DICT['manifests'] = getManifestNames() CMD_ARG_DICT['catalogs'] = getCatalogs() CMD_ARG_DICT['pkgs'] = getInstallerItemNames(getCatalogs()) setUpTabCompleter() print 'Entering interactive mode... (type "help" for commands)' while 1: try: cmd = raw_input('> ') except KeyboardInterrupt: print # so we finish off the raw_input line cleanupAndExit(0) args = shlex.split(cmd) handleSubcommand(args) if __name__ == '__main__': main()