#!/usr/bin/env python # encoding: utf-8 # # Copyright 2009-2017 Greg Neagle. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ makecatalogs Created by Greg Neagle on 2009-03-30. Recursively scans a directory, looking for installer item info files. Builds a repo catalog from these files. Assumes a pkgsinfo directory under repopath. User calling this needs to be able to write to repo/catalogs. """ import hashlib import os import optparse import plistlib import sys from munkilib.cliutils import pref, path2url, print_utf8, print_err_utf8 from munkilib.cliutils import get_version from munkilib import munkirepo def list_items_of_kind(repo, kind): '''Returns a list of items of kind. Relative pathnames are prepended with kind. (example: ['icons/Bar.png', 'icons/Foo.png'])''' return [os.path.join(kind, item) for item in repo.itemlist(kind)] def makecatalogs(repo, options): '''Assembles all pkginfo files into catalogs. Assumes a pkgsinfo directory under repopath. User calling this needs to be able to write to the repo/catalogs directory.''' # start with no errors! errors = [] exit_code = 0 icons = {} iconhashing = False icon_list = list_items_of_kind(repo, 'icons') if icon_list: iconhashing = True for icon_ref in icon_list: print_utf8("Hashing %s..." % (icon_ref)) # Try to read the icon file try: icondata = repo.get(icon_ref) icons[os.path.basename(icon_ref)] = hashlib.sha256( icondata).hexdigest() except munkirepo.RepoError, err: errors.append('RepoError for %s: %s' % (icon_ref, unicode(err))) exit_code = -1 continue except IOError, err: errors.append("IO error for %s: %s" % (icon_ref, err)) exit_code = -1 continue except BaseException, err: errors.append("Unexpected error for %s: %s" % (icon_ref, err)) exit_code = -1 continue # get a list of pkgsinfo items try: pkgsinfo_list = list_items_of_kind(repo, 'pkgsinfo') except munkirepo.RepoError, err: print_err_utf8( "Error getting list of pkgsinfo items: %s" % unicode(err)) exit(-1) # get a list of pkgs items try: pkgs_list = list_items_of_kind(repo, 'pkgs') except munkirepo.RepoError, err: print_err_utf8("Error getting list of pkgs items: %s" % unicode(err)) exit(-1) # start with empty catalogs dict catalogs = {} catalogs['all'] = [] # Walk through the pkginfo files for pkginfo_ref in pkgsinfo_list: # Try to read the pkginfo file try: data = repo.get(pkginfo_ref) pkginfo = plistlib.readPlistFromString(data) except IOError, err: errors.append("IO error for %s: %s" % (pkginfo_ref, err)) exit_code = -1 continue except BaseException, err: errors.append("Unexpected error for %s: %s" % (pkginfo_ref, err)) exit_code = -1 continue if not 'name' in pkginfo: errors.append("WARNING: %s is missing name" % pkginfo_ref) continue # don't copy admin notes to catalogs. if pkginfo.get('notes'): del pkginfo['notes'] # strip out any keys that start with "_" # (example: pkginfo _metadata) for key in pkginfo.keys(): if key.startswith('_'): del pkginfo[key] if iconhashing: name = pkginfo.get('name') if pkginfo.get('icon_name'): iconhash = (icons.get(pkginfo['icon_name']) or icons.get(pkginfo['icon_name'] + '.png')) if not iconhash: errors.append( "WARNING: icon_name specified in %s but it does not " "exist" % pkginfo_ref) else: iconhash = icons.get(name + '.png') if iconhash is not None: pkginfo['icon_hash'] = iconhash iconhash = None #simple sanity checking do_pkg_check = True installer_type = pkginfo.get('installer_type') if installer_type in ['nopkg', 'apple_update_metadata']: do_pkg_check = False if pkginfo.get('PackageCompleteURL'): do_pkg_check = False if pkginfo.get('PackageURL'): do_pkg_check = False if do_pkg_check: if not 'installer_item_location' in pkginfo: errors.append( "WARNING: %s is missing installer_item_location" % pkginfo_ref) # Skip this pkginfo unless we're running with force flag if not options.force: exit_code = -1 continue # Try to form a path and fail if the # installer_item_location is not a valid type try: installeritempath = os.path.join( "pkgs", pkginfo['installer_item_location']) except TypeError: errors.append("WARNING: invalid installer_item_location " "in %s" % pkginfo_ref) exit_code = -1 continue # Check if the installer item actually exists if not installeritempath in pkgs_list: # do a case-insenstive comparison found_caseinsensitive_match = False for repo_pkg in pkgs_list: if installeritempath.lower() == repo_pkg.lower(): errors.append( "WARNING: %s refers to installer item: %s. " "The pathname of the item in the repo has " "different case: %s. This may cause issues " "depending on the case-sensitivity of the " "underlying filesystem." % (pkginfo_ref, pkginfo['installer_item_location'], repo_pkg)) found_caseinsensitive_match = True break if not found_caseinsensitive_match: errors.append( "WARNING: %s refers to missing installer item: %s" % (pkginfo_ref, pkginfo['installer_item_location'])) # Skip this pkginfo unless we're running with force flag if not options.force: exit_code = -1 continue #uninstaller sanity checking uninstaller_type = pkginfo.get('uninstall_method') if uninstaller_type in ['AdobeCCPUninstaller']: # uninstaller_item_location is required if not 'uninstaller_item_location' in pkginfo: errors.append( "WARNING: %s is missing uninstaller_item_location" % pkginfo_ref) # Skip this pkginfo unless we're running with force flag if not options.force: exit_code = -1 continue # if an uninstaller_item_location is specified, sanity-check it if 'uninstaller_item_location' in pkginfo: try: uninstalleritempath = os.path.join( "pkgs", pkginfo['uninstaller_item_location']) except TypeError: errors.append("WARNING: invalid uninstaller_item_location " "in %s" % pkginfo_ref) exit_code = -1 continue # Check if the uninstaller item actually exists if not uninstalleritempath in pkgs_list: # do a case-insenstive comparison found_caseinsensitive_match = False for repo_pkg in pkgs_list: if uninstalleritempath.lower() == repo_pkg.lower(): errors.append( "WARNING: %s refers to uninstaller item: %s. " "The pathname of the item in the repo has " "different case: %s. This may cause issues " "depending on the case-sensitivity of the " "underlying filesystem." % (pkginfo_ref, pkginfo['uninstaller_item_location'], repo_pkg)) found_caseinsensitive_match = True break if not found_caseinsensitive_match: errors.append( "WARNING: %s refers to missing uninstaller item: %s" % (pkginfo_ref, pkginfo['uninstaller_item_location'])) # Skip this pkginfo unless we're running with force flag if not options.force: exit_code = -1 continue catalogs['all'].append(pkginfo) for catalogname in pkginfo.get("catalogs", []): if not catalogname: errors.append("WARNING: %s has an empty catalogs array!" % pkginfo_ref) exit_code = -1 continue if not catalogname in catalogs: catalogs[catalogname] = [] catalogs[catalogname].append(pkginfo) print_utf8("Adding %s to %s..." % (pkginfo_ref, catalogname)) # look for catalog names that differ only in case duplicate_catalogs = [] for key in catalogs: if key.lower() in [item.lower() for item in catalogs if item != key]: duplicate_catalogs.append(key) if duplicate_catalogs: errors.append("WARNING: There are catalogs with names that differ only " "by case. This may cause issues depending on the case-" "sensitivity of the underlying filesystem: %s" % duplicate_catalogs) if errors: # group all errors at the end for better visibility print for error in errors: print_err_utf8(error) # clear out old catalogs try: catalog_list = repo.itemlist('catalogs') except munkirepo.RepoError: catalog_list = [] for catalog_name in catalog_list: if catalog_name not in catalogs.keys(): catalog_ref = os.path.join('catalogs', catalog_name) try: repo.delete(catalog_ref) except munkirepo.RepoError: print_err_utf8('Could not delete catalog %s' % catalog_name) # write the new catalogs print for key in catalogs: catalogpath = os.path.join("catalogs", key) if len(catalogs[key]): catalog_data = plistlib.writePlistToString(catalogs[key]) try: repo.put(catalogpath, catalog_data) print "Created %s..." % (catalogpath) except munkirepo.RepoError, err: print_err_utf8( u'Failed to create catalog %s: %s' % (key, unicode(err))) else: print_err_utf8( "WARNING: Did not create catalog %s because it is empty" % key) exit_code = -1 # Exit with "exit_code" if we got this far. This will be -1 if there were # any errors that prevented the catalogs to be written. exit(exit_code) def main(): '''Main''' usage = "usage: %prog [options] [/path/to/repo_root]" parser = optparse.OptionParser(usage=usage) parser.add_option('--version', '-V', action='store_true', help='Print the version of the munki tools and exit.') parser.add_option('--force', '-f', action='store_true', dest='force', help='Disable sanity checks.') parser.add_option('--repo_url', '--repo-url', help='Optional repo URL that takes precedence ' 'over the default repo_url specified via ' '--configure.') parser.add_option('--plugin', help='Specify a custom plugin to connect to repo.') parser.set_defaults(force=False) options, arguments = parser.parse_args() if options.version: print get_version() exit(0) # backwards compatibility if not options.repo_url and not options.plugin: if arguments: options.repo_url = path2url(arguments[0]) elif pref('repo_path'): options.repo_url = path2url(pref('repo_path')) else: options.repo_url = pref('repo_url') options.plugin = pref('plugin') if not options.repo_url: parser.print_usage() exit(-1) # Connect to the repo try: repo = munkirepo.connect(options.repo_url, options.plugin) except munkirepo.RepoError, err: print >> sys.stderr, (u'Could not connect to munki repo: %s' % unicode(err)) exit(-1) # Make the catalogs makecatalogs(repo, options) if __name__ == '__main__': main()