mirror of
https://github.com/munki/munki.git
synced 2025-12-30 19:20:10 -06:00
* Use PyObjC recommended pattern to call superclass init to fix #1183 * Use the pylint comments from gurl.
240 lines
8.4 KiB
Python
Executable File
240 lines
8.4 KiB
Python
Executable File
#!/usr/local/munki/munki-python
|
|
# encoding: utf-8
|
|
#
|
|
# Copyright 2017-2023 Greg Neagle.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the 'License');
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an 'AS IS' BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""
|
|
app_usage_monitor
|
|
|
|
Created by Greg Neagle 14 Feb 2017
|
|
Refactored April 2018 as user-level agent
|
|
|
|
A tool to monitor application usage and record it to a database.
|
|
Borrowing lots of code and ideas from the crankd project, part of pymacadmin:
|
|
https://github.com/MacSysadmin/pymacadmin
|
|
and the application_usage scripts created by Google MacOps:
|
|
https://github.com/google/macops/tree/master/crankd
|
|
"""
|
|
from __future__ import absolute_import
|
|
|
|
# standard Python libs
|
|
import logging
|
|
import os
|
|
import select
|
|
import socket
|
|
import sys
|
|
|
|
# builtin super doesn't work with Cocoa classes in recent PyObjC releases.
|
|
# pylint: disable=redefined-builtin,no-name-in-module
|
|
from objc import super
|
|
# pylint: enable=redefined-builtin,no-name-in-module
|
|
|
|
try:
|
|
# Apple frameworks
|
|
from AppKit import NSWorkspace
|
|
from Foundation import NSDate
|
|
from Foundation import NSDictionary
|
|
from Foundation import NSDistributedNotificationCenter
|
|
from Foundation import NSObject
|
|
from Foundation import NSRunLoop
|
|
except ImportError:
|
|
logging.critical("PyObjC wrappers for Apple frameworks are missing.")
|
|
sys.exit(-1)
|
|
|
|
# our libs
|
|
from munkilib.wrappers import writePlistToString, PlistWriteError
|
|
|
|
|
|
APPUSAGED_SOCKET = "/var/run/appusaged"
|
|
|
|
|
|
class AppUsageClientError(Exception):
|
|
'''Exception to raise for errors in AppUsageClient'''
|
|
pass
|
|
|
|
|
|
class AppUsageClient(object):
|
|
'''Handles communication with auppusaged daemon'''
|
|
def connect(self):
|
|
'''Connect to appusaged'''
|
|
try:
|
|
#pylint: disable=attribute-defined-outside-init
|
|
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
#pylint: enable=attribute-defined-outside-init
|
|
self.socket.connect(APPUSAGED_SOCKET)
|
|
except socket.error as err:
|
|
raise AppUsageClientError(
|
|
"Couldn't connect to appusaged: %s" % err.strerror)
|
|
|
|
def send_request(self, request):
|
|
'''Send a request to appusaged'''
|
|
try:
|
|
request_string = writePlistToString(request)
|
|
self.socket.send(request_string)
|
|
except PlistWriteError as err:
|
|
return "ERROR:Bad request: %s" % err
|
|
# use select so we don't hang indefinitely if appusaged dies
|
|
ready = select.select([self.socket.fileno()], [], [], 2)
|
|
if ready[0]:
|
|
reply = self.socket.recv(8192).decode("UTF-8")
|
|
else:
|
|
reply = ''
|
|
|
|
if reply:
|
|
return reply.rstrip()
|
|
return "ERROR:No reply"
|
|
|
|
def disconnect(self):
|
|
'''Disconnect from appusaged'''
|
|
self.socket.close()
|
|
|
|
def process(self, request):
|
|
'''Send a request and return the result'''
|
|
try:
|
|
self.connect()
|
|
result = self.send_request(request)
|
|
except AppUsageClientError as err:
|
|
return "ERROR:%s" % err
|
|
finally:
|
|
self.disconnect()
|
|
return result
|
|
|
|
|
|
class NotificationHandler(NSObject):
|
|
"""A subclass of NSObject to handle workspace notifications"""
|
|
|
|
# Disable PyLint complaining about 'invalid' camelCase names
|
|
# pylint: disable=C0103
|
|
|
|
def init(self):
|
|
"""NSObject-compatible initializer"""
|
|
self = super().init()
|
|
if self is None:
|
|
return None
|
|
|
|
self.usage = AppUsageClient()
|
|
self.ws_nc = NSWorkspace.sharedWorkspace().notificationCenter()
|
|
|
|
self.ws_nc.addObserver_selector_name_object_(
|
|
self, 'didLaunchApplicationNotification:',
|
|
'NSWorkspaceDidLaunchApplicationNotification', None)
|
|
|
|
self.ws_nc.addObserver_selector_name_object_(
|
|
self, 'didActivateApplicationNotification:',
|
|
'NSWorkspaceDidActivateApplicationNotification', None)
|
|
|
|
self.ws_nc.addObserver_selector_name_object_(
|
|
self, 'didTerminateApplicationNotification:',
|
|
'NSWorkspaceDidTerminateApplicationNotification', None)
|
|
|
|
self.dnc = NSDistributedNotificationCenter.defaultCenter()
|
|
self.dnc.addObserver_selector_name_object_(
|
|
self, 'requestedItemForInstall:',
|
|
'com.googlecode.munki.managedsoftwareupdate.installrequest', None)
|
|
|
|
return self # NOTE: Unlike Python, NSObject's init() must return self!
|
|
|
|
def __del__(self):
|
|
"""Unregister for all the notifications we registered for"""
|
|
self.ws_nc.removeObserver_(self)
|
|
self.dnc.removeObserver_(self)
|
|
|
|
def getInfoDictForApp_(self, app_object):
|
|
"""Returns a dict with info about an application.
|
|
Args:
|
|
app_object: NSRunningApplication object
|
|
Returns:
|
|
app_dict: {bundle_id: str,
|
|
path: str,
|
|
version: str}"""
|
|
# pylint: disable=no-self-use
|
|
bundle_id = ""
|
|
app_path = ""
|
|
app_version = "0"
|
|
if app_object:
|
|
try:
|
|
url = app_object.bundleURL()
|
|
app_path = url.path()
|
|
except AttributeError:
|
|
app_path = ""
|
|
try:
|
|
bundle_id = app_object.bundleIdentifier()
|
|
except AttributeError:
|
|
# use the base filename
|
|
if app_path:
|
|
bundle_id = os.path.basename(app_path)
|
|
if app_path:
|
|
# try to get the version from the bundle's plist
|
|
app_info_plist = NSDictionary.dictionaryWithContentsOfFile_(
|
|
"%s/Contents/Info.plist" % app_path)
|
|
if app_info_plist:
|
|
app_version = app_info_plist.get(
|
|
"CFBundleShortVersionString",
|
|
app_info_plist.get("CFBundleVersion", "0"))
|
|
|
|
return {'bundle_id': bundle_id,
|
|
'path': app_path,
|
|
'version': app_version}
|
|
|
|
def didLaunchApplicationNotification_(self, notification):
|
|
"""Handle NSWorkspaceDidLaunchApplicationNotification"""
|
|
app_object = notification.userInfo().get('NSWorkspaceApplicationKey')
|
|
app_dict = self.getInfoDictForApp_(app_object)
|
|
self.usage.process({'event': 'launch', 'app_dict': app_dict})
|
|
|
|
def didActivateApplicationNotification_(self, notification):
|
|
"""Handle NSWorkspaceDidActivateApplicationNotification"""
|
|
app_object = notification.userInfo().get('NSWorkspaceApplicationKey')
|
|
app_dict = self.getInfoDictForApp_(app_object)
|
|
self.usage.process({'event': 'activate', 'app_dict': app_dict})
|
|
|
|
def didTerminateApplicationNotification_(self, notification):
|
|
"""Handle NSWorkspaceDidTerminateApplicationNotification"""
|
|
app_object = notification.userInfo().get('NSWorkspaceApplicationKey')
|
|
app_dict = self.getInfoDictForApp_(app_object)
|
|
self.usage.process({'event': 'quit', 'app_dict': app_dict})
|
|
|
|
def requestedItemForInstall_(self, notification):
|
|
"""Handle com.googlecode.munki.managedsoftwareupdate.installrequest"""
|
|
logging.info('got install request notification')
|
|
user_info = notification.userInfo()
|
|
self.usage.process(
|
|
{'event': user_info.get('event', 'unknown'),
|
|
'name': user_info.get('name', 'unknown'),
|
|
'version': user_info.get('version', 'unknown')
|
|
}
|
|
)
|
|
|
|
|
|
def main():
|
|
"""Initialize our handler object and let NSWorkspace's notification center
|
|
know we are interested in notifications"""
|
|
|
|
# PyLint can't tell that NotificationHandler' NSObject superclass
|
|
# has an alloc() method
|
|
# pylint: disable=no-member
|
|
# pylint: disable=unused-variable
|
|
notification_handler = NotificationHandler.alloc().init()
|
|
# pylint: enable=unused-variable
|
|
# pylint: enable=no-member
|
|
|
|
while True:
|
|
# listen for notifications forever
|
|
NSRunLoop.currentRunLoop().runUntilDate_(
|
|
NSDate.dateWithTimeIntervalSinceNow_(0.1))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|