#!/usr/bin/env python # encoding: utf-8 # # Copyright 2016-2020 Greg Neagle. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ repoclean Created by Greg Neagle on 2016-06-22. A tool to remove older, unused software items from a Munki repo. """ from __future__ import absolute_import, print_function import subprocess import sys import os import optparse from distutils.version import LooseVersion from munkilib.cliutils import get_version, pref, path2url from munkilib import munkirepo from munkilib.wrappers import (is_a_string, get_input, readPlistFromString, unicode_or_str, PlistReadError) def name_and_version(a_string): """Splits a string into the name and version number. Name and version must be separated with a hyphen ('-') or double hyphen ('--'). 'TextWrangler-2.3b1' becomes ('TextWrangler', '2.3b1') 'AdobePhotoshopCS3--11.2.1' becomes ('AdobePhotoshopCS3', '11.2.1') 'MicrosoftOffice2008-12.2.1' becomes ('MicrosoftOffice2008', '12.2.1') """ for delim in ('--', '-'): if a_string.count(delim) > 0: chunks = a_string.split(delim) vers = chunks.pop() name = delim.join(chunks) if vers[0] in '0123456789': return (name, vers) return (a_string, '') class RepoCleaner(object): '''Encapsulates our repo cleaning logic''' def __init__(self, repo, options): '''Setup data storage''' self.repo = repo self.options = options self.errors = [] self.manifest_items = set() self.manifest_items_with_versions = set() self.pkginfodb = {} self.referenced_pkgs = set() self.orphaned_pkgs = [] self.required_items = set() self.pkginfo_count = 0 self.items_to_delete = [] self.pkgs_to_keep = set() def get_items_to_delete_stats(self): '''Returns count the number of installer and uninstaller pkgs we will delete and human-readable sizes for the pkginfo items and pkgs that are to be deleted''' def human_readable(size_in_bytes): """Returns sizes in human-readable units.""" units = ((" bytes", 2**10), (" KB", 2**20), (" MB", 2**30), (" GB", 2**40), (" TB", 2**50), (" PB", 2**60),) # set suffix and limit to last items in units in case the value # is so big it falls off the edge suffix = units[-1][0] limit = units[-1][1] # find an appropriate suffix for test_suffix, test_limit in units: if size_in_bytes > (test_limit - 1): continue else: suffix = test_suffix limit = test_limit break if limit == 2**10: # no decimal since "1.0 bytes" is silly return str(size_in_bytes) + " bytes" return str(round(size_in_bytes/float(limit/2**10), 1)) + suffix count = len(self.orphaned_pkgs) pkginfo_total_size = 0 pkg_total_size = 0 for item in self.items_to_delete: pkginfo_total_size += int(item.get('item_size', 0)) if (item.get('pkg_path') and not item['pkg_path'] in self.pkgs_to_keep): count += 1 pkg_total_size += int(item.get('pkg_size', 0)) if (item.get('uninstallpkg_path') and not item['uninstallpkg_path'] in self.pkgs_to_keep): count += 1 pkg_total_size += int(item.get('uninstallpkg_size', 0)) return (count, human_readable(pkginfo_total_size), human_readable(pkg_total_size)) def analyze_manifests(self): '''Examine all manifests and populate our sets of manifest_items and manifest_items_with_versions''' print('Analyzing manifest files...') # look through all manifests for "Foo-1.0" style items # we need to note these so the specific referenced version is not # deleted try: manifests_list = self.repo.itemlist('manifests') except munkirepo.RepoError as err: self.errors.append( "Repo error getting list of manifests: %s" % unicode_or_str(err)) manifests_list = [] for manifest_name in manifests_list: try: data = self.repo.get(os.path.join('manifests', manifest_name)) manifest = readPlistFromString(data) except (munkirepo.RepoError, IOError, OSError, PlistReadError) as err: self.errors.append("Unexpected error for %s: %s" % (manifest_name, unicode_or_str(err))) continue for key in ['managed_installs', 'managed_uninstalls', 'managed_updates', 'optional_installs']: for item in manifest.get(key, []): itemname, itemvers = name_and_version(item) self.manifest_items.add(itemname) if itemvers: self.manifest_items_with_versions.add( (itemname, itemvers)) # next check conditional_items within the manifest for conditional_item in manifest.get('conditional_items', []): for key in ['managed_installs', 'managed_uninstalls', 'managed_updates', 'optional_installs']: for item in conditional_item.get(key, []): itemname, itemvers = name_and_version(item) self.manifest_items.add(itemname) if itemvers: self.manifest_items_with_versions.add( (itemname, itemvers)) def analyze_pkgsinfo(self): '''Examines all pkginfo files and populates self.pkginfodb, self.required_items and self.pkginfo_count''' print('Analyzing pkginfo files...') try: pkgsinfo_list = self.repo.itemlist('pkgsinfo') except munkirepo.RepoError as err: self.errors.append( "Repo error getting list of pkgsinfo: %s" % unicode_or_str(err)) pkgsinfo_list = [] for pkginfo_name in pkgsinfo_list: pkginfo_identifier = os.path.join('pkgsinfo', pkginfo_name) try: data = self.repo.get(pkginfo_identifier) pkginfo = readPlistFromString(data) except (munkirepo.RepoError, IOError, OSError, PlistReadError) as err: self.errors.append("Unexpected error for %s: %s" % (pkginfo_name, unicode_or_str(err))) continue try: name = pkginfo['name'] version = pkginfo['version'] except KeyError: self.errors.append( "Missing 'name' or 'version' keys in %s" % pkginfo_name) continue pkgpath = pkginfo.get('installer_item_location', '') pkgsize = pkginfo.get('installer_item_size', 0) * 1024 uninstallpkgpath = pkginfo.get('uninstaller_item_location', '') uninstallpkgsize = pkginfo.get('uninstaller_item_size', 0) * 1024 if pkgpath: self.referenced_pkgs.add(pkgpath) if uninstallpkgpath: self.referenced_pkgs.add(uninstallpkgpath) # track required items; if these are in "Foo-1.0" format, we need # to note these so we don't delete the specific referenced version if 'requires' in pkginfo: dependencies = pkginfo['requires'] # fix things if 'requires' was specified as a string # instead of an array of strings if is_a_string(dependencies): dependencies = [dependencies] for dependency in dependencies: required_name, required_vers = name_and_version(dependency) if required_vers: self.required_items.add((required_name, required_vers)) # if this item is in a manifest, then anything it requires # should be treated as if it, too, is in a manifest. if name in self.manifest_items: self.manifest_items.add(required_name) # now process update_for: if this is an update_for an item that is # in manifest_items, it should be treated as if it, too is in a # manifest if 'update_for' in pkginfo: update_items = pkginfo['update_for'] # fix things if 'update_for' was specified as a string # instead of an array of strings if is_a_string(update_items): update_items = [update_items] for update_item in update_items: (update_item_name, dummy_vers) = name_and_version(update_item) if update_item_name in self.manifest_items: # add our name self.manifest_items.add(name) metakey = '' keys_to_hash = ['name', 'catalogs', 'minimum_munki_version', 'minimum_os_version', 'maximum_os_version', 'supported_architectures', 'installable_condition'] if pkginfo.get('uninstall_method') == 'removepackages': keys_to_hash.append('receipts') for key in keys_to_hash: if pkginfo.get(key): value = pkginfo[key] if key == 'catalogs': value = ', '.join(sorted(value)) if key == 'receipts': value = ', '.join( [item.get('packageid', '') for item in value]) metakey += u"%s: %s\n" % (key, value) metakey = metakey.rstrip('\n') if metakey not in self.pkginfodb: self.pkginfodb[metakey] = {} if version not in self.pkginfodb[metakey]: self.pkginfodb[metakey][version] = [] self.pkginfodb[metakey][version].append({ 'name': name, 'version': version, 'resource_identifier': pkginfo_identifier, 'item_size': len(data), 'pkg_path': pkgpath, 'pkg_size': pkgsize, 'uninstallpkg_path': uninstallpkgpath, 'uninstallpkg_size': uninstallpkgsize }) self.pkginfo_count += 1 def find_orphaned_pkgs(self): '''Finds installer items that are not referred to by any pkginfo file''' print('Analyzing installer items...') try: pkgs_list = self.repo.itemlist('pkgs') except munkirepo.RepoError as err: self.errors.append( "Repo error getting list of pkgs: %s" % unicode_or_str(err)) pkgs_list = [] for pkg in pkgs_list: if pkg not in self.referenced_pkgs: self.orphaned_pkgs.append(pkg) def find_cleanup_items(self): '''Using the info on manifests and pkgsinfo, find items to clean up. Populates self.items_to_delete: a list of pkginfo items to remove, and self.pkgs_to_keep: pkgs (install and uninstall items) that we need to keep.''' for key in sorted(list(self.pkginfodb.keys())): print_this = ( self.options.show_all or len(list(self.pkginfodb[key].keys())) > self.options.keep ) item_name = self.pkginfodb[key][ list(self.pkginfodb[key].keys())[0]][0]['name'] if print_this: print(key) if item_name not in self.manifest_items: print("[not in any manifests]") print("versions:") index = 0 for version in sorted(list(self.pkginfodb[key].keys()), key=LooseVersion, reverse=True): line_info = '' index += 1 item_list = self.pkginfodb[key][version] if ((item_list[0]['name'], version) in self.manifest_items_with_versions): for item in item_list: if item['pkg_path']: self.pkgs_to_keep.add(item['pkg_path']) if item['uninstallpkg_path']: self.pkgs_to_keep.add(item['uninstallpkg_path']) line_info = "(REQUIRED by a manifest)" elif (item_list[0]['name'], version) in self.required_items: for item in item_list: if item['pkg_path']: self.pkgs_to_keep.add(item['pkg_path']) if item['uninstallpkg_path']: self.pkgs_to_keep.add(item['uninstallpkg_path']) line_info = "(REQUIRED by another pkginfo item)" elif index <= self.options.keep: for item in item_list: if item['pkg_path']: self.pkgs_to_keep.add(item['pkg_path']) if item['uninstallpkg_path']: self.pkgs_to_keep.add(item['uninstallpkg_path']) else: for item in item_list: self.items_to_delete.append(item) line_info = "[to be DELETED]" if len(item_list) > 1: line_info = ( "(multiple items share this version number) " + line_info) else: line_info = "(%s) %s" % (item['resource_identifier'], line_info) if print_this: print(" ", version, line_info) if len(item_list) > 1: for item in item_list: print(" ", " " * len(version), end=' ') print("(%s)" % item['resource_identifier']) if print_this: print() if self.orphaned_pkgs: print('The following pkgs are not referred to by any pkginfo ' 'item:') for pkg in self.orphaned_pkgs: print("\t%s" % pkg) if print_this: print() print("Total pkginfo items: %s" % self.pkginfo_count) print("Item variants: %s" % len(list(self.pkginfodb.keys()))) print("pkginfo items to delete: %s" % len(self.items_to_delete)) pkg_count, pkginfo_size, pkg_size = self.get_items_to_delete_stats() print("pkgs to delete: %s" % pkg_count) print("pkginfo space savings: %s" % pkginfo_size) print("pkg space savings: %s" % pkg_size) if len(self.orphaned_pkgs): print(" " "(Unknown additional pkg space savings from %s orphaned pkgs)" % len(self.orphaned_pkgs)) if self.errors: print("\nErrors encountered when processing repo:\n", file=sys.stderr) for error in self.errors: print(error, file=sys.stderr) def delete_items(self): '''Deletes items from the repo''' # remove old pkginfo and referenced pkgs for item in self.items_to_delete: if 'resource_identifier' in item: print('Removing %s' % item['resource_identifier'], file=sys.stderr) try: self.repo.delete(item['resource_identifier']) except munkirepo.RepoError as err: print(unicode_or_str(err), file=sys.stderr) if (item.get('pkg_path') and not item['pkg_path'] in self.pkgs_to_keep): pkg_to_remove = os.path.join('pkgs', item['pkg_path']) print('Removing %s' % pkg_to_remove) try: self.repo.delete(pkg_to_remove) except munkirepo.RepoError as err: print(unicode_or_str(err), file=sys.stderr) if (item.get('uninstallpkg_path') and not item['uninstallpkg_path'] in self.pkgs_to_keep): pkg_to_remove = os.path.join('pkgs', item['uninstallpkg_path']) print('Removing %s' % pkg_to_remove, file=sys.stderr) try: self.repo.delete(pkg_to_remove) except munkirepo.RepoError as err: print(unicode_or_str(err), file=sys.stderr) # remove orphaned pkgs for pkg in self.orphaned_pkgs: pkg_to_remove = os.path.join('pkgs', pkg) print('Removing %s' % pkg_to_remove) try: self.repo.delete(pkg_to_remove) except munkirepo.RepoError as err: print(unicode_or_str(err), file=sys.stderr) def make_catalogs(self): """Calls makecatalogs to rebuild our catalogs""" # first look for a makecatalogs in the same dir as us if hasattr(self.repo, 'authtoken'): # Build an environment dict so we can put the authtoken # into makecatalogs' environment env = {'MUNKIREPO_AUTHTOKEN': self.repo.authtoken} else: env = None mydir = os.path.dirname(os.path.abspath(__file__)) makecatalogs_path = os.path.join(mydir, 'makecatalogs') if not os.path.exists(makecatalogs_path): # didn't find it; assume the default install path makecatalogs_path = '/usr/local/munki/makecatalogs' print('Rebuilding catalogs at %s...' % self.options.repo_url) cmd = [makecatalogs_path] cmd.append('--repo-url') cmd.append(self.options.repo_url) cmd.append('--plugin') cmd.append(self.options.plugin) proc = subprocess.Popen(cmd, bufsize=-1, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) while True: output = proc.stdout.readline() if not output and (proc.poll() != None): break # we don't print stdout -- too much info #print output.rstrip('\n').decode('UTF-8') errors = proc.stderr.read().decode('UTF-8') if errors: print('\nThe following issues occurred while building catalogs:\n') print(errors) def clean(self): '''Clean our repo!''' self.analyze_manifests() self.analyze_pkgsinfo() self.find_orphaned_pkgs() self.find_cleanup_items() if self.items_to_delete or self.orphaned_pkgs: print() if not self.options.auto: answer = get_input( 'Delete pkginfo and pkg items marked as [to be DELETED]? ' 'WARNING: This action cannot be undone. [y/N] ') if answer.lower().startswith('y'): answer = get_input( 'Are you sure? This action cannot be undone. [y/N] ') if answer.lower().startswith('y'): self.delete_items() self.make_catalogs() else: print('Auto mode selected, deleting pkginfo and pkg items ' 'marked as [to be DELETED]') self.delete_items() self.make_catalogs() def main(): '''Main''' usage = "usage: %prog [options] [/path/to/repo_root]" parser = optparse.OptionParser(usage=usage) parser.add_option('--version', '-V', action='store_true', help='Print the version of the munki tools and exit.') parser.add_option('--keep', '-k', default=2, help='Keep this many versions of a specific variation. ' 'Defaults to 2.') parser.add_option('--show-all', action='store_true', help='Show all items even if none will be deleted.') parser.add_option('--delete-items-in-no-manifests', action='store_true', help='Also delete items that are not referenced in any ' 'manifests. Not yet implemented.') parser.add_option('--repo_url', '--repo-url', help='Optional repo URL. If specified, overrides any ' 'repo_url specified via --configure.') parser.add_option('--plugin', default=pref('plugin'), help='Optional plugin to connect to repo. If specified, ' 'overrides any plugin specified via --configure.') parser.add_option('--auto', '-a', action='store_true', default=False, help='Do not prompt for confirmation before deleting ' 'repo items. Use with caution.') options, arguments = parser.parse_args() if options.version: print(get_version()) exit(0) if not options.repo_url: if arguments: options.repo_url = path2url(arguments[0]) elif pref('repo_path'): options.repo_url = path2url(pref('repo_path')) if not options.plugin: options.plugin = 'FileRepo' try: options.keep = int(options.keep) except ValueError: print('--keep value must be a positive integer!', file=sys.stderr) exit(-1) if options.keep < 1: print('--keep value must be a positive integer!', file=sys.stderr) exit(-1) # Make sure we have a repo_url to work with if not options.repo_url: print("Need to specify a path to the repo root!", file=sys.stderr) exit(-1) else: print("Using repo url: %s" % options.repo_url, file=sys.stderr) try: repo = munkirepo.connect(options.repo_url, options.plugin) except munkirepo.RepoError as err: print(u'Could not connect to munki repo: %s' % unicode_or_str(err), file=sys.stderr) exit(-1) # clean up the repo RepoCleaner(repo, options).clean() if __name__ == '__main__': main()