#!/usr/bin/python # encoding: utf-8 # # Copyright 2017-2018 Greg Neagle. # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ app_usage_monitor Created by Greg Neagle 14 Feb 2017 Refactored April 2018 as user-level agent A tool to monitor application usage and record it to a database. Borrowing lots of code and ideas from the crankd project, part of pymacadmin: https://github.com/MacSysadmin/pymacadmin and the application_usage scripts created by Google MacOps: https://github.com/google/macops/tree/master/crankd """ # standard Python libs import logging import os import plistlib import select import socket import sys try: # Apple frameworks from AppKit import NSWorkspace from Foundation import NSDate from Foundation import NSDictionary from Foundation import NSDistributedNotificationCenter from Foundation import NSObject from Foundation import NSRunLoop except ImportError: logging.critical("PyObjC wrappers for Apple frameworks are missing.") sys.exit(-1) APPUSAGED_SOCKET = "/var/run/appusaged" class AppUsageClientError(Exception): '''Exception to raise for errors in AppUsageClient''' pass class AppUsageClient(object): '''Handles communication with auppusaged daemon''' def connect(self): '''Connect to authrestartd''' try: #pylint: disable=attribute-defined-outside-init self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) #pylint: enable=attribute-defined-outside-init self.socket.connect(APPUSAGED_SOCKET) except socket.error as err: raise AppUsageClientError( "Couldn't connect to appusaged: %s" % err.strerror) def send_request(self, request): '''Send a request to appusaged''' self.socket.send(plistlib.writePlistToString(request)) with os.fdopen(self.socket.fileno()) as fileref: # use select so we don't hang indefinitely if appusaged dies ready = select.select([fileref], [], [], 2) if ready[0]: reply = fileref.read() else: reply = '' if reply: return reply.rstrip() else: return "ERROR:No reply" def disconnect(self): '''Disconnect from appusaged''' self.socket.close() def process(self, request): '''Send a request and return the result''' try: self.connect() result = self.send_request(request) finally: self.disconnect() return result class NotificationHandler(NSObject): """A subclass of NSObject to handle workspace notifications""" # Disable PyLint complaining about 'invalid' camelCase names # pylint: disable=C0103 def init(self): """NSObject-compatible initializer""" self = super(NotificationHandler, self).init() if self is None: return None self.usage = AppUsageClient() self.ws_nc = NSWorkspace.sharedWorkspace().notificationCenter() self.ws_nc.addObserver_selector_name_object_( self, 'didLaunchApplicationNotification:', 'NSWorkspaceDidLaunchApplicationNotification', None) self.ws_nc.addObserver_selector_name_object_( self, 'didActivateApplicationNotification:', 'NSWorkspaceDidActivateApplicationNotification', None) self.ws_nc.addObserver_selector_name_object_( self, 'didTerminateApplicationNotification:', 'NSWorkspaceDidTerminateApplicationNotification', None) self.dnc = NSDistributedNotificationCenter.defaultCenter() self.dnc.addObserver_selector_name_object_( self, 'requestedItemForInstall:', 'com.googlecode.munki.managedsoftwareupdate.installrequest', None) return self # NOTE: Unlike Python, NSObject's init() must return self! def __del__(self): """Unregister for all the notifications we registered for""" self.ws_nc.removeObserver_(self) self.dnc.removeObserver_(self) def get_app_dict(self, app_object): """Returns a dict with info about an application. Args: app_object: NSRunningApplication object Returns: app_dict: {bundle_id: str, path: str, version: str}""" # pylint: disable=no-self-use if app_object: try: url = app_object.bundleURL() app_path = url.path() except AttributeError: app_path = None try: bundle_id = app_object.bundleIdentifier() except AttributeError: # use the base filename if app_path: bundle_id = os.path.basename(app_path) if app_path: app_info_plist = NSDictionary.dictionaryWithContentsOfFile_( '%s/Contents/Info.plist' % app_path) if app_info_plist: app_version = app_info_plist.get( 'CFBundleShortVersionString', app_info_plist.get('CFBundleVersion', '0')) else: app_version = '0' return {'bundle_id': bundle_id, 'path': app_path, 'version': app_version} def didLaunchApplicationNotification_(self, notification): """Handle NSWorkspaceDidLaunchApplicationNotification""" app_object = notification.userInfo().get('NSWorkspaceApplicationKey') app_dict = self.get_app_dict(app_object) self.usage.process({'event': 'launch', 'app_dict': app_dict}) def didActivateApplicationNotification_(self, notification): """Handle NSWorkspaceDidActivateApplicationNotification""" app_object = notification.userInfo().get('NSWorkspaceApplicationKey') app_dict = self.get_app_dict(app_object) self.usage.process({'event': 'activate', 'app_dict': app_dict}) def didTerminateApplicationNotification_(self, notification): """Handle NSWorkspaceDidTerminateApplicationNotification""" app_object = notification.userInfo().get('NSWorkspaceApplicationKey') app_dict = self.get_app_dict(app_object) self.usage.process({'event': 'quit', 'app_dict': app_dict}) def requestedItemForInstall_(self, notification): """Handle com.googlecode.munki.managedsoftwareupdate.installrequest""" logging.info('got install request notification') user_info = notification.userInfo() self.usage.process(user_info) def main(): """Initialize our handler object and let NSWorkspace's notification center know we are interested in notifications""" # PyLint can't tell that NotificationHandler' NSObject superclass # has an alloc() method # pylint: disable=no-member notification_handler = NotificationHandler.alloc().init() # pylint: enable=no-member while True: # listen for notifications forever NSRunLoop.currentRunLoop().runUntilDate_( NSDate.dateWithTimeIntervalSinceNow_(0.1)) if __name__ == '__main__': main()