#!/usr/bin/env python # encoding: utf-8 # # Copyright 2016 Greg Neagle. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ repoclean Created by Greg Neagle on 2016-06-22. A tool to remove older, unused software items from a Munki repo. """ import subprocess import sys import os import optparse from distutils.version import LooseVersion try: from munkilib import FoundationPlist as plistlib LOCAL_PREFS_SUPPORT = True except ImportError: try: import FoundationPlist as plistlib LOCAL_PREFS_SUPPORT = True except ImportError: # maybe we're not on an OS X machine... print >> sys.stderr, ("WARNING: FoundationPlist is not available, " "using plistlib instead.") import plistlib LOCAL_PREFS_SUPPORT = False try: from munkilib.munkicommon import listdir, get_version except ImportError: # munkilib is not available def listdir(path): """OS X HFS+ string encoding safe listdir(). Args: path: path to list contents of Returns: list of contents, items as str or unicode types """ # if os.listdir() is supplied a unicode object for the path, # it will return unicode filenames instead of their raw fs-dependent # version, which is decomposed utf-8 on OS X. # # we use this to our advantage here and have Python do the decoding # work for us, instead of decoding each item in the output list. # # references: # https://docs.python.org/howto/unicode.html#unicode-filenames # https://developer.apple.com/library/mac/#qa/qa2001/qa1235.html # http://lists.zerezo.com/git/msg643117.html # http://unicode.org/reports/tr15/ section 1.2 if type(path) is str: path = unicode(path, 'utf-8') elif type(path) is not unicode: path = unicode(path) return os.listdir(path) def get_version(): '''Placeholder if munkilib is not available''' return 'UNKNOWN' def print_utf8(text): '''Print Unicode text as UTF-8''' print text.encode('UTF-8') def print_err_utf8(text): '''Print Unicode text to stderr as UTF-8''' print >> sys.stderr, text.encode('UTF-8') def nameAndVersion(aString): """Splits a string into the name and version number. Name and version must be seperated with a hyphen ('-') or double hyphen ('--'). 'TextWrangler-2.3b1' becomes ('TextWrangler', '2.3b1') 'AdobePhotoshopCS3--11.2.1' becomes ('AdobePhotoshopCS3', '11.2.1') 'MicrosoftOffice2008-12.2.1' becomes ('MicrosoftOffice2008', '12.2.1') """ for delim in ('--', '-'): if aString.count(delim) > 0: chunks = aString.split(delim) vers = chunks.pop() name = delim.join(chunks) if vers[0] in '0123456789': return (name, vers) return (aString, '') def humanReadable(bytes): """Returns sizes in human-readable units.""" units = [(" bytes", 2**10), (" KB", 2**20), (" MB", 2**30), (" GB", 2**40), (" TB", 2**50),] for suffix, limit in units: if bytes > limit: continue else: return str(round(bytes/float(limit/2**10), 1)) + suffix def make_catalogs(repopath): """Calls makecatalogs to rebuild our catalogs""" # first look for a makecatalogs in the same dir as us mydir = os.path.dirname(os.path.abspath(__file__)) makecatalogs_path = os.path.join(mydir, 'makecatalogs') if not os.path.exists(makecatalogs_path): # didn't find it; assume the default install path makecatalogs_path = '/usr/local/munki/makecatalogs' print 'Rebuilding catalogs at %s...' % repopath proc = subprocess.Popen([makecatalogs_path, repopath], bufsize=-1, stdout=subprocess.PIPE, stderr=subprocess.PIPE) while True: output = proc.stdout.readline() if not output and (proc.poll() != None): break if 0: print output.rstrip('\n').encode('UTF-8') errors = proc.stderr.read() if errors: print '\nThe following errors occurred while building catalogs:\n' print errors def count_pkgs_to_delete(items_to_delete, pkgs_to_keep): count = 0 for item in items_to_delete: if 'pkg_path' in item and not item['pkg_path'] in pkgs_to_keep: count += 1 return count def get_file_sizes(repopath, items_to_delete, pkgs_to_keep): '''Returns human-readable sizes for the pkginfo items and pkgs that are to be deleted''' pkginfo_total_size = 0 pkg_total_size = 0 for item in items_to_delete: if 'relative_path' in item: try: pkginfo_total_size += os.stat( os.path.join(repopath, 'pkgsinfo', item['relative_path'])).st_size except (OSError, IOError): pass if 'pkg_path' in item and not item['pkg_path'] in pkgs_to_keep: try: pkg_total_size += os.stat( os.path.join(repopath, 'pkgs', item['pkg_path'])).st_size except (OSError, IOError): pass return (humanReadable(pkginfo_total_size), humanReadable(pkg_total_size)) def delete_items(repopath, items_to_delete, pkgs_to_keep): '''Deletes items from the repo''' for item in items_to_delete: if 'relative_path' in item: path_to_remove = os.path.join( repopath, 'pkgsinfo', item['relative_path']) print_utf8('Removing %s' % path_to_remove) try: os.unlink(path_to_remove) except (OSError, IOError), err: print_err_utf8(unicode(err)) if 'pkg_path' in item and not item['pkg_path'] in pkgs_to_keep: path_to_remove = os.path.join(repopath, 'pkgs', item['pkg_path']) print_utf8('Removing %s' % path_to_remove) try: os.unlink(path_to_remove) except (OSError, IOError), err: print_err_utf8(unicode(err)) def repo_plists(repo_subdir): '''Generator function that returns plist filepaths''' for dirpath, dirnames, filenames in os.walk(repo_subdir, followlinks=True): for dirname in dirnames: # don't recurse into directories that start # with a period. if dirname.startswith('.'): dirnames.remove(dirname) for filename in filenames: if filename.startswith('.'): # skip files that start with a period as well continue yield os.path.join(dirpath, filename) def clean_repo(repopath, options): '''Clean it all up''' def compare_versions(a, b): """sort highest version to top""" return cmp(LooseVersion(b), LooseVersion(a)) # Make sure the manifests directory exists manifestspath = os.path.join(repopath, 'manifests') # make sure manifestpath is Unicode so that os.walk later gives us # Unicode names back. if type(manifestspath) is str: manifestspath = unicode(manifestspath, 'utf-8') elif type(manifestspath) is not unicode: manifestspath = unicode(manifestspath) if not os.path.exists(manifestspath): print_err_utf8("manifests path %s doesn't exist!" % manifestspath) exit(-1) # Make sure the pkgsinfo directory exists pkgsinfopath = os.path.join(repopath, 'pkgsinfo') # make sure pkgsinfopath is Unicode so that os.walk later gives us # Unicode names back. if type(pkgsinfopath) is str: pkgsinfopath = unicode(pkgsinfopath, 'utf-8') elif type(pkgsinfopath) is not unicode: pkgsinfopath = unicode(pkgsinfopath) if not os.path.exists(pkgsinfopath): print_err_utf8("pkgsinfo path %s doesn't exist!" % pkgsinfopath) exit(-1) errors = [] manifest_items = set() manifest_items_with_versions = set() print_utf8('Analyzing manifest files...') # look through all manifests for "Foo-1.0" style items # we need to note these so the specific referenced version is not deleted for filepath in repo_plists(manifestspath): try: manifest = plistlib.readPlist(filepath) except IOError, inst: errors.append("IO error for %s: %s" % (filepath, inst)) continue except BaseException, inst: errors.append("Unexpected error for %s: %s" % (filepath, inst)) continue for key in ['managed_installs', 'managed_uninstalls', 'managed_updates', 'optional_installs']: for item in manifest.get(key, []): itemname, itemvers = nameAndVersion(item) manifest_items.add(itemname) if itemvers: manifest_items_with_versions.add((itemname, itemvers)) # next check conditional_items within the manifest for conditional_item in manifest.get('conditional_items', []): for key in ['managed_installs', 'managed_uninstalls', 'managed_updates', 'optional_installs']: for item in conditional_item.get(key, []): itemname, itemvers = nameAndVersion(item) manifest_items.add(itemname) if itemvers: manifest_items_with_versions.add((itemname, itemvers)) pkginfodb = {} required_items = set() pkginfo_count = 0 print_utf8('Analyzing pkginfo files...') for filepath in repo_plists(pkgsinfopath): try: pkginfo = plistlib.readPlist(filepath) except IOError, inst: errors.append("IO error for %s: %s" % (filepath, inst)) continue except BaseException, inst: errors.append("Unexpected error for %s: %s" % (filepath, inst)) continue name = pkginfo['name'] version = pkginfo['version'] relpath = filepath[len(pkgsinfopath):].lstrip(os.path.sep) pkgpath = pkginfo.get('installer_item_location', '') # track required items; if these are in "Foo-1.0" format, we need to # note these so we don't delete the specific referenced version if 'requires' in pkginfo: dependencies = pkginfo['requires'] # fix things if 'requires' was specified as a string # instead of an array of strings if isinstance(dependencies, basestring): dependencies = [dependencies] for dependency in dependencies: required_name, required_vers = nameAndVersion(dependency) if required_vers: required_items.add((required_name, required_vers)) # if this item is in a manifest, then anything it requires # should be treated as if it, too, is in a manifest. if name in manifest_items: manifest_items.add(required_name) # now process update_for: if this is an update_for an item that is # in manifest_items, it should be treated as if it, too is in a # manifest if 'update_for' in pkginfo: update_items = pkginfo['update_for'] # fix things if 'update_for' was specified as a string # instead of an array of strings if isinstance(update_items, basestring): update_items = [update_items] for update_item in update_items: update_item_name, update_item_vers = nameAndVersion(update_item) if update_item_name in manifest_items: # add our name manifest_items.add(name) metakey = '' for key in ['name', 'catalogs', 'minimum_munki_version', 'minimum_os_version', 'maximum_os_version', 'supported_architectures', 'installable_condition']: if pkginfo.get(key): value = pkginfo[key] if key == 'catalogs': value = ', '.join(value) metakey += u"%s: %s\n" % (key, value) metakey = metakey.rstrip('\n') if metakey not in pkginfodb: pkginfodb[metakey] = {} if version not in pkginfodb[metakey]: pkginfodb[metakey][version] = [] pkginfodb[metakey][version].append({ 'name': name, 'version': version, 'relative_path': relpath, 'pkg_path': pkgpath, }) pkginfo_count += 1 items_to_delete = [] pkgs_to_keep = set() for key in sorted(pkginfodb.keys()): print_this = (options.show_all or len(pkginfodb[key].keys()) > options.keep) item_name = pkginfodb[key][pkginfodb[key].keys()[0]][0]['name'] if print_this: print key if item_name not in manifest_items: print "[not in any manifests]" print "versions:" index = 0 for version in sorted(pkginfodb[key].keys(), compare_versions): line_info = '' index += 1 item_list = pkginfodb[key][version] if (item_list[0]['name'], version) in manifest_items_with_versions: for item in item_list: if item['pkg_path']: pkgs_to_keep.add(item['pkg_path']) line_info = "(REQUIRED by a manifest)" elif (item_list[0]['name'], version) in required_items: for item in item_list: if item['pkg_path']: pkgs_to_keep.add(item['pkg_path']) line_info = "(REQUIRED by another pkginfo item)" elif index <= options.keep: for item in item_list: if item['pkg_path']: pkgs_to_keep.add(item['pkg_path']) else: for item in item_list: items_to_delete.append(item) line_info = "[to be DELETED]" if len(item_list) > 1: line_info = ( "(multiple items share this version number) " + line_info) else: line_info = "(%s) %s" % (item['relative_path'], line_info) if print_this: print " ", version, line_info if len(item_list) > 1: for item in item_list: print " ", " " * len(version), "(%s)" % item['relative_path'] if print_this: print print_utf8("Total pkginfo items: %s" % pkginfo_count) print_utf8("Item variants: %s" % len(pkginfodb.keys())) print_utf8("pkginfo items to delete: %s" % len(items_to_delete)) print_utf8("pkgs to delete: %s" % count_pkgs_to_delete(items_to_delete, pkgs_to_keep)) pkginfo_size, pkg_size = get_file_sizes( repopath, items_to_delete, pkgs_to_keep) print_utf8("pkginfo space savings: %s" % pkginfo_size) print_utf8("pkg space savings: %s" % pkg_size) if errors: print_err_utf8("\nErrors encountered when processing repo:\n") for error in errors: print_err_utf8(error) if len(items_to_delete): print answer = raw_input( 'Delete pkginfo and pkg items marked as [to be DELETED]? ' 'WARNING: This action cannot be undone. [y/n] ') if answer.lower().startswith('y'): answer = raw_input( 'Are you sure? This action cannot be undone. [y/n] ') if answer.lower().startswith('y'): delete_items(repopath, items_to_delete, pkgs_to_keep) make_catalogs(repopath) def pref(prefname): """Returns a preference for prefname""" if not LOCAL_PREFS_SUPPORT: return None try: _prefs = plistlib.readPlist(PREFSPATH) except BaseException: return None if prefname in _prefs: return _prefs[prefname] else: return None PREFSNAME = 'com.googlecode.munki.munkiimport.plist' PREFSPATH = os.path.expanduser(os.path.join('~/Library/Preferences', PREFSNAME)) def main(): '''Main''' usage = "usage: %prog [options] [/path/to/repo_root]" parser = optparse.OptionParser(usage=usage) parser.add_option('--version', '-V', action='store_true', help='Print the version of the munki tools and exit.') parser.add_option('--keep', '-k', default=2, help='Keep this many versions of a specific variation. ' 'Defaults to 2.') parser.add_option('--show-all', action='store_true', help='Show all items even if none will be deleted.') parser.add_option('--delete-items-in-no-manifests', action='store_true', help='Also delete items that are not referenced in any ' 'manifests. Not yet implemented.') options, arguments = parser.parse_args() if options.version: print get_version() exit(0) try: options.keep = int(options.keep) except ValueError: print_err_utf8('--keep value must be a positive integer!') exit(-1) if options.keep < 1: print_err_utf8('--keep value must be a positive integer!') exit(-1) # Make sure we have a path to work with repopath = None if len(arguments) == 0: repopath = pref('repo_path') if not repopath: print_err_utf8("Need to specify a path to the repo root!") exit(-1) else: print_utf8("Using repo path: %s" % repopath) else: repopath = arguments[0].rstrip("/") # Make sure the repo path exists if not os.path.exists(repopath): print_err_utf8("Repo root path %s doesn't exist!" % repopath) exit(-1) # clean up the repo clean_repo(repopath, options) if __name__ == '__main__': main()