From b7c6d4c459a7207fca07cc238911b3455142a02b Mon Sep 17 00:00:00 2001 From: Zack T Date: Wed, 20 Sep 2023 17:35:54 -0700 Subject: [PATCH] v1.2.0 = Actual Python3 Ready, Bug Fixes, Code Improvements & more! + Numerous bug fixes + *Now* _actually_ Python3 ready... + Better (actual) support for macOS 11 and newer + Updated for Jamf Pro's new icon API + Adding logging function for handling output to Policy Logs (level can be dynamically changed) + Further improved launchctl functions + Additional code improvements --- Jamf Pro/Scripts/jamf_Patcher.py | 467 ++++++++++++++++++------------- 1 file changed, 269 insertions(+), 198 deletions(-) diff --git a/Jamf Pro/Scripts/jamf_Patcher.py b/Jamf Pro/Scripts/jamf_Patcher.py index 75256da..c9d4a64 100644 --- a/Jamf Pro/Scripts/jamf_Patcher.py +++ b/Jamf Pro/Scripts/jamf_Patcher.py @@ -3,15 +3,18 @@ #################################################################################################### # Script Name: jamf_Patcher.py # By: Zack Thompson / Created: 7/10/2019 -# Version: 1.1.1 / Updated: 9/20/2023 / By: ZT +# Version: 1.2.0 / Updated: 9/20/2023 / By: ZT # # Description: This script handles patching of applications with user notifications. # #################################################################################################### +import logging import os import platform import plistlib +import re +import shlex import signal import subprocess import sys @@ -22,59 +25,122 @@ except ImportError: from urllib import request as urllib # For Python 3 -def run_utility(command, continue_on_error=True): - """A helper function for subprocess. +def log_setup(): + """Setup logging""" + + # Create logger + logger = logging.getLogger("Jamf Pro Patcher") + logger.setLevel(logging.DEBUG) + # Create file handler which logs even debug messages + # file_handler = logging.FileHandler("/var/log/JamfPatcher.log") + # file_handler.setLevel(logging.INFO) + # Create console handler with a higher log level + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + # Create formatter and add it to the handlers + formatter = logging.Formatter( + "%(asctime)s | %(levelname)s | %(name)s:%(lineno)s - %(funcName)20s() | %(message)s") + # file_handler.setFormatter(formatter) + console_handler.setFormatter(formatter) + # Add the handlers to the logger + # logger.addHandler(file_handler) + logger.addHandler(console_handler) + return logger + + +# Initialize logging +log = log_setup() + + +def execute_process(command, input=None): + """ + A helper function for subprocess. + Args: - command (str): String containing the commands and - arguments that will be passed to a shell. - continue_on_error (bool): Whether to continue on error or not + command (str): The command line level syntax that would be written in a + shell script or a terminal window + Returns: - stdout: output of the command + dict: Results in a dictionary """ - try: - return subprocess.check_output(command, shell=True) + # Validate that command is not a string + if not isinstance(command, str): + raise TypeError("Command must be a str type") - except subprocess.CalledProcessError as error: + # Format the command + command = shlex.split(command) - if continue_on_error: - return "continue" + # Run the command + process = subprocess.Popen( + command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=False, + universal_newlines=True + ) - print(f"Error code: {error.returncode}\nError: {error}") - return "error" + if input: + (stdout, stderr) = process.communicate(input=input) + + else: + (stdout, stderr) = process.communicate() + + return { + "stdout": (stdout).strip(), + "stderr": (stderr).strip() if stderr != None else None, + "exitcode": process.returncode, + "success": True if process.returncode == 0 else False + } -def plist_reader(plist_file): +def plist_reader(plist_file_path): """A helper function to get the contents of a Property List file. + Args: - plist_file (str): A .plist file path to read in. + plist_file_path (str): A .plist file path to read in. Returns: stdout: Returns the contents of the plist file. """ - if os.path.exists(plist_file): - # print(f"Reading {plist_file}...") + if os.path.exists(plist_file_path): + log.debug(f"Reading {plist_file_path}...") - try: - plist_contents = plistlib.load(plist_file) - except Exception: - file_cmd = f"/usr/bin/file --mime-encoding {plist_file}" - file_response = run_utility(file_cmd) - file_type = file_response.split(": ")[1].strip() - # print(f"File Type: {file_type}") + with open(plist_file_path, "rb") as plist_file: - if file_type == "binary": - # print("Converting plist...") - plutil_cmd = f"/usr/bin/plutil -convert xml1 {plist_file}" - _ = run_utility(plutil_cmd) + try: + plist_contents = plistlib.load(plist_file) + except Exception: + file_cmd = f"/usr/bin/file --mime-encoding {plist_file}" + file_response = execute_process(file_cmd).get("stdout") + file_type = file_response.split(": ")[1].strip() + log.debug(f"File Type: {file_type}") - plist_contents = plistlib.load(plist_file) + if file_type == "binary": + log.debug("Converting plist...") + plutil_cmd = f"/usr/bin/plutil -convert xml1 {plist_file}" + _ = execute_process(plutil_cmd) + + plist_contents = plistlib.load(plist_file) else: - print("Something's terribly wrong here...") + log.warning("Something's terribly wrong here...") return plist_contents +def plist_writer(contents, plist_file_path): + """A helper function to write the contents of a Property List file. + + Args: + contents (str): The content to write to a plist file. + plist_file_path (str): A .plist file path to read in. + """ + + with open(plist_file_path, "wb") as plist_file: + plistlib.dump(contents, plist_file) + + def prompt_to_patch(**parameters): """Uses Jamf Helper to prompt a user to patch a software title, optionally allowing them to delay the patch. @@ -84,47 +150,48 @@ def prompt_to_patch(**parameters): """ # Prompt user to quit app. - prompt = f"""\ - '{parameters.get("jamf_helper")}' \ - -window_type '{parameters.get("window_type")}' \ - -title '{parameters.get("title")}' \ - -icon '{parameters.get("icon")}' \ - -heading '{parameters.get("heading")}' \ - -description '{parameters.get("description")}' \ - -button1 OK \ - -timeout 3600 \ - -countdown \ - -countdownPrompt 'If you wish to delay this patch, please make a selection in ' \ - -alignCountdown center \ - -lockHUD \ - -showDelayOptions ', 600, 3600, 86400' \ - """ + prompt = ( + f"'{parameters.get('jamf_helper')}' " + f"-windowType '{parameters.get('window_type')}' " + f"-title '{parameters.get('title')}' " + f"-icon '{parameters.get('icon')}' " + f"-iconSize '{parameters.get('icon_size')}' " + f"-heading '{parameters.get('heading')}' " + f"-description '{parameters.get('description')}' " + "-button1 OK " + "-timeout 3600 " + "-countdown " + "-countdownPrompt 'If you wish to delay this patch, please make a selection in ' " + "-alignCountdown center " + "-lockHUD " + "-showDelayOptions ', 600, 3600, 86400' " + ) - selection = run_utility(prompt) - print(f"SELECTION: {selection}") + selection = execute_process(prompt).get("stdout") + log.debug(f"User delay selection: {selection}") if selection == "1": - print("User selected to patch now.") + log.info("User selected to patch now.") kill_and_install(**parameters) elif selection[:-1] == "600": - print("DELAY: 600 seconds") + log.info("DELAY: 600 seconds") create_delay_daemon(delayTime=600, **parameters) elif selection[:-1] == "3600": - print("DELAY: 3600 seconds") + log.info("DELAY: 3600 seconds") create_delay_daemon(delayTime=3600, **parameters) elif selection[:-1] == "86400": - print("DELAY: 86400 seconds") + log.info("DELAY: 86400 seconds") create_delay_daemon(delayTime=86400, **parameters) elif selection == "243": - print("TIMED OUT: user did not make a selection") + log.info("TIMED OUT: user did not make a selection") kill_and_install(**parameters) else: - print("Unknown action was taken at prompt.") + log.info("Unknown action was taken at prompt.") kill_and_install(**parameters) @@ -137,35 +204,32 @@ def kill_and_install(**parameters): """ try: - print("Attempting to close app if it's running...") - # Get PID of the application - pid = int(parameters.get("status").split(" ")[0]) - print(f"Process ID: {pid}") + log.info("Attempting to close app if it's running...") # Kill PID - os.kill(pid, signal.SIGTERM) #or signal.SIGKILL + os.kill(parameters.get("pid"), signal.SIGTERM) #or signal.SIGKILL except Exception: - print("Unable to terminate app, assuming it was manually closed...") + log.info("Unable to terminate app, assuming it was manually closed...") - print("Performing install...") + log.info("Performing install...") # Run Policy - run_utility(parameters.get("install_policy")) - # print("Test run, don't run policy!") + execute_process(parameters.get("install_policy")) - prompt = f"""\ - '{parameters.get("jamf_helper")}' \ - -window_type '{parameters.get("window_type")}' \ - -title '{parameters.get("title")}' \ - -icon '{parameters.get("icon")}' \ - -heading '{parameters.get("heading")}' \ - -description '{parameters.get("description_complete")}' \ - -button1 OK \ - -timeout 60 \ - -alignCountdown center \ - -lockHUD \ - """ + prompt = ( + f"'{parameters.get('jamf_helper')}' " + f"-windowType '{parameters.get('window_type')}' " + f"-title '{parameters.get('title')}' " + f"-icon '{parameters.get('icon')}' " + f"-iconSize '{parameters.get('icon_size')}' " + f"-heading '{parameters.get('heading')}' " + f"-description '{parameters.get('description_complete')}' " + "-button1 OK " + "-timeout 60 " + "-alignCountdown center " + "-lockHUD " + ) - run_utility(prompt) + execute_process(prompt) def create_delay_daemon(**parameters): @@ -179,7 +243,7 @@ def create_delay_daemon(**parameters): application_name = parameters.get("application_name") launch_daemon_label = parameters.get('launch_daemon_label') launch_daemon_location = parameters.get("launch_daemon_location") - os_minor_version = parameters.get("os_minor_version") + os_version = parameters.get("os_version") patch_plist = parameters.get("patch_plist") # Configure for delay. @@ -189,12 +253,12 @@ def create_delay_daemon(**parameters): else: patch_plist_contents = { application_name : "Delayed" } - plistlib.dump(patch_plist_contents, patch_plist) + plist_writer(patch_plist_contents, patch_plist) - print("Creating the Patcher LaunchDaemon...") + log.info("Creating the Patcher LaunchDaemon...") launch_daemon_plist = { - "Label": "com.github.mlbz521.jamf.patcher", + "Label": launch_daemon_label, "ProgramArguments": [ "/usr/local/jamf/bin/jamf", "policy", @@ -205,11 +269,10 @@ def create_delay_daemon(**parameters): "AbandonProcessGroup": True, } - plistlib.dump(launch_daemon_plist, launch_daemon_location) + plist_writer(launch_daemon_plist, launch_daemon_location) if os.path.exists(launch_daemon_location): - # Start the LaunchDaemon - start_daemon(os_minor_version, launch_daemon_label, launch_daemon_location) + start_daemon(os_version, launch_daemon_label, launch_daemon_location) def clean_up(**parameters): @@ -220,12 +283,12 @@ def clean_up(**parameters): parameters (kwargs): Key word arguments """ - print("Performing cleanup...") + log.info("Performing cleanup...") application_name = parameters.get("application_name") launch_daemon_label = parameters.get('launch_daemon_label') launch_daemon_location = parameters.get("launch_daemon_location") - os_minor_version = parameters.get("os_minor_version") + os_version = parameters.get("os_version") patch_plist = parameters.get("patch_plist") # Clean up patch_plist. @@ -234,45 +297,38 @@ def clean_up(**parameters): if patch_plist_contents.get(application_name): patch_plist_contents.pop(application_name, None) - print(f"Removing previously delayed app: {application_name}") - plistlib.dump(patch_plist_contents, patch_plist) + log.info(f"Removing previously delayed app: {application_name}") + plist_writer(patch_plist_contents, patch_plist) else: - print(f"App not listed in patch_plist: {application_name}") + log.info(f"App not listed in patch_plist: {application_name}") # Stop LaunchDaemon before deleting it - stop_daemon(os_minor_version, launch_daemon_label, launch_daemon_location) + stop_daemon(os_version, launch_daemon_label, launch_daemon_location) if os.path.exists(launch_daemon_location): os.remove(launch_daemon_location) -def execute_launchctl(sub_cmd, service_target, exit_code_only=False): +def execute_launchctl(sub_cmd, service_target): """A helper function to run launchctl. Args: sub_cmd (str): A launchctl subcommand (option) service_target (str): A launchctl service target (parameter) - exit_code_only (bool, optional): Whether to report only success or failure. - Defaults to False. Returns: bool | str: Depending on `exit_code_only`, returns either a bool or the output from launchctl """ - launchctl_cmd = f"/bin/launchctl {sub_cmd} {service_target}" - - if exit_code_only: - launchctl_cmd = f"{launchctl_cmd} > /dev/null 2>&1; echo $?" - - return run_utility(launchctl_cmd) + return execute_process(f"/bin/launchctl {sub_cmd} {service_target}") -def is_daemon_running(os_minor_version, launch_daemon_label): +def is_daemon_running(os_version, launch_daemon_label): """Checks if the daemon is running. Args: - os_minor_version (int): Used to determines the proper launchctl + os_version (int): Used to determines the proper launchctl syntax based on OS Version of the device launch_daemon_label (str): The LaunchDaemon's Label @@ -280,98 +336,107 @@ def is_daemon_running(os_minor_version, launch_daemon_label): bool: True or False whether or not the LaunchDaemon is running """ - if os_minor_version >= 11: - return execute_launchctl("print", f"system/{launch_daemon_label}", exit_code_only=True) + if os_version >= 10.11: + exit_code = execute_launchctl("print", f"system/'{launch_daemon_label}'").get("exitcode") - elif os_minor_version <= 10: - return execute_launchctl("list", launch_daemon_label, exit_code_only=True) + elif os_version <= 10.10: + exit_code = execute_launchctl("list", f"'{launch_daemon_label}'").get("exitcode") + + return exit_code == 0 -def start_daemon(os_minor_version, launch_daemon_label, launch_daemon_location): +def start_daemon(os_version, launch_daemon_label, launch_daemon_location): """Starts a daemon, if it is running, it will be restarted in case a change was made to the plist file. Args: - os_minor_version (int): Used to determines the proper launchctl + os_version (int): Used to determines the proper launchctl syntax based on OS Version of the device launch_daemon_label (str): The LaunchDaemon's Label launch_daemon_location (str): The file patch to the LaunchDaemon - """ - restart_daemon(os_minor_version, launch_daemon_label, launch_daemon_location) + if is_daemon_running(os_version, launch_daemon_label): + restart_daemon(os_version, launch_daemon_label, launch_daemon_location) + start_daemon(os_version, launch_daemon_label, launch_daemon_location) - print("Loading LaunchDaemon...") + else: + log.info("Loading LaunchDaemon...") - if os_minor_version >= 11: - execute_launchctl("bootstrap", f"system {launch_daemon_location}") - execute_launchctl("enable", f"system/{launch_daemon_label}") + if os_version >= 10.11: + execute_launchctl("bootstrap", f"system '{launch_daemon_location}'") + execute_launchctl("enable", f"system/'{launch_daemon_label}'") - elif os_minor_version <= 10: - execute_launchctl("load", launch_daemon_location) + elif os_version <= 10.10: + execute_launchctl("load", launch_daemon_location) -def restart_daemon(os_minor_version, launch_daemon_label, launch_daemon_location): +def restart_daemon(os_version, launch_daemon_label, launch_daemon_location): """Restarts a daemon if it is running. Args: - os_minor_version (int): Used to determines the proper launchctl + os_version (int): Used to determines the proper launchctl syntax based on OS Version of the device launch_daemon_label (str): The LaunchDaemon's Label launch_daemon_location (str): The file patch to the LaunchDaemon - """ - # Check if the LaunchDaemon is running. - exit_code = is_daemon_running(os_minor_version, launch_daemon_label) - - if int(exit_code) == 0: - print("LaunchDaemon is currently started; stopping now...") - - if os_minor_version >= 11: - execute_launchctl("bootout", f"system/{launch_daemon_label}") - - elif os_minor_version <= 10: - execute_launchctl("unload", launch_daemon_location) + if is_daemon_running(os_version, launch_daemon_label): + log.info("LaunchDaemon is currently started; stopping now...") + stop_daemon(os_version, launch_daemon_label, launch_daemon_location) -def stop_daemon(os_minor_version, launch_daemon_label, launch_daemon_location): +def stop_daemon(os_version, launch_daemon_label, launch_daemon_location): """Stops a daemon. Args: - os_minor_version (int): Used to determines the proper launchctl + os_version (int): Used to determines the proper launchctl syntax based on OS Version of the device launch_daemon_label (str): The LaunchDaemon's Label launch_daemon_location (str): The file patch to the LaunchDaemon - """ - # Check if the LaunchDaemon is running. - exit_code = is_daemon_running(os_minor_version, launch_daemon_label) + if is_daemon_running(os_version, launch_daemon_label): + log.info("Stopping the LaunchDaemon...") - if int(exit_code) == 0: - print("Stopping the LaunchDaemon...") - - if os_minor_version >= 11: - execute_launchctl("bootout", f"system/{launch_daemon_label}") - elif os_minor_version <= 10: - execute_launchctl("unload", launch_daemon_location) + if os_version >= 10.11: + execute_launchctl("bootout", f"system/'{launch_daemon_label}'") + elif os_version <= 10.10: + execute_launchctl("unload", f"'{launch_daemon_location}'") else: - print("LaunchDaemon not running") + log.info("LaunchDaemon not running") + + +def get_major_minor_os_version(): + + os_version = platform.mac_ver()[0] + + if os_version.count('.') > 1: + os_version = os_version.rsplit('.', maxsplit=1)[0] + + return float(os_version) def main(): - print("***** jamf_Patcher process: START *****") + log.info("***** jamf_Patcher process: START *****") ################################################## # Define Script Parameters - print(f"All args: {sys.argv}") + log.info(f"All args: {sys.argv}") department_name = sys.argv[4] # " Technology Office" application_name = sys.argv[5] # "zoom.us" icon_id = sys.argv[6] # "https://jps.server.com:8443/icon?id=49167" patch_id = sys.argv[7] policy_id = sys.argv[8] + log_level = sys.argv[9] + + for handler in log.handlers: + match log_level: + case "DEBUG": + handler.setLevel(logging.DEBUG) + case "INFO": + handler.setLevel(logging.INFO) ################################################## # Define Variables @@ -380,44 +445,37 @@ def main(): jamf_helper = "/Library/Application Support/JAMF/bin/jamfHelper.app/Contents/MacOS/jamfHelper" launch_daemon_label = f"com.github.mlbz521.jamf.patcher.{application_name}" launch_daemon_location = f"/Library/LaunchDaemons/{launch_daemon_label}.plist" - os_minor_version = platform.mac_ver()[0].split(".")[1] + os_version = get_major_minor_os_version() install_policy = f"/usr/local/jamf/bin/jamf policy -id {policy_id}" ################################################## # Define jamfHelper window values title="Security Patch Notification" - window_type="hud" - description = f"{application_name} will be updated to patch a security vulnerability. \ - Please quit {application_name} to apply this update.\n\nIf you have questions, \ - please contact your deskside support group." - description_force = f"{application_name} will be updated to patch a security vulnerability. \ - Please quit {application_name} within the allotted time to apply this update.\n\n\ - If you have questions, please contact your deskside support group." + window_type="utility" + heading = "My Organization" + description = (f"{application_name} needs to be updated to patch a security vulnerability. " + f"Please quit {application_name} to apply this update.\n\nIf you have questions, " + "please contact your deskside support group.") + description_force = (f"{application_name} will be updated to patch a security vulnerability. " + f"Please quit {application_name} within the allotted time to apply this update.\n\n" + "If you have questions, please contact your deskside support group.") description_complete = f"{application_name} has been patched!\n\n." local_icon_path = f"/private/tmp/{application_name}_icon.png" + icon_size = "150" if department_name: - heading = f"My Organization - {department_name}" - else: - heading = "My Organization" + heading = f"{heading} - {department_name}" ################################################## # Bits staged... - process_check = f"/bin/ps -ax -o pid,command | \ - /usr/bin/grep -E '/Applications/{application_name}' | \ - /usr/bin/grep -v 'grep' 2> /dev/null" - status = run_utility(process_check) - print(f"APP STATUS: {status}") - parameters = { "application_name": application_name, "patch_plist": patch_plist, "launch_daemon_label": launch_daemon_label, "launch_daemon_location": launch_daemon_location, - "os_minor_version": os_minor_version, + "os_version": os_version, "patch_id": patch_id, - "status": status, "install_policy": install_policy } @@ -428,21 +486,36 @@ def main(): "heading": heading, "description": description, "description_complete": description_complete, - "icon": local_icon_path + "icon": local_icon_path, + "icon_size": icon_size } - if status == "continue": - print(f"{application_name} is not running, installing now...") - run_utility(install_policy) - # print("Test run, don't run policy!") + # Check if application is running. + process_check = "/bin/ps -ax -o pid,command" + results = execute_process(process_check) + app_running = re.findall( + rf".*/Applications/{application_name}.*", results.get("stdout"), re.MULTILINE) + log.debug(f"Application status: {app_running}") + + if not app_running: + log.info(f"{application_name} is not running, installing now...") + execute_process(install_policy) clean_up(**parameters) else: - print(f"{application_name} is running...") + log.info(f"{application_name} is running...") + + # Get PID of the application + for value in app_running[0].split(" "): + if pid := re.match(r"(\d)+", value): + # pid = int(pid[0]) + log.debug(f"Process ID: {pid}") + parameters |= {"pid": int(pid[0])} + break # Download the icon from the JPS - icon_url = f"{jamf_pro_server}icon?id={icon_id}" + icon_url = f"{jamf_pro_server}api/v1/icon/download/{icon_id}" try: downloaded_icon = requests.get(icon_url) @@ -451,43 +524,41 @@ def main(): sys.exc_clear() urllib.urlretrieve(icon_url, filename=local_icon_path) - if os.path.exists(patch_plist): - patch_plist_contents = plist_reader(patch_plist) - + if ( + os.path.exists(patch_plist) and # Delay Check - if patch_plist_contents.get(application_name): - print("STATUS: Patch has already been delayed; forcing upgrade.") + (patch_plist_contents := plist_reader(patch_plist).get(application_name)) + ): - # Prompt user with one last warning. - prompt = f"\ - '{jamf_helper}' \ - -window_type '{window_type}' \ - -title '{title}' \ - -icon '{local_icon_path}' \ - -heading '{heading}' \ - -description '{description_force}' \ - -button1 OK \ - -timeout 600 \ - -countdown \ - -countdownPrompt '{application_name} will be force closed in ' \ - -alignCountdown center \ - -lockHUD \ - > /dev/null 2>&1 \ - " + log.info("Patch has already been delayed; forcing upgrade...") - run_utility(prompt) - kill_and_install(**parameters, **jamf_helper_parameters) - clean_up(patch_plist_contents=patch_plist_contents, **parameters) + # Prompt user with one last warning. + prompt = ( + f"'{jamf_helper}' " + f"-windowType '{window_type}' " + f"-title '{title}' " + f"-icon '{local_icon_path}' " + f"-iconSize '{icon_size}' " + f"-heading '{heading}' " + f"-description '{description_force}' " + "-button1 OK " + "-timeout 600 " + "-countdown " + f"-countdownPrompt '{application_name} will be force closed in ' " + "-alignCountdown center " + "-lockHUD " + "> /dev/null 2>&1" + ) - else: - print("STATUS: Patch has not been delayed; prompting user.") - prompt_to_patch(**parameters, **jamf_helper_parameters) + execute_process(prompt) + kill_and_install(**parameters, **jamf_helper_parameters) + clean_up(patch_plist_contents=patch_plist_contents, **parameters) else: - print("STATUS: Patch has not been delayed; prompting user.") + log.info("Patch has not been delayed; prompting user...") prompt_to_patch(**parameters, **jamf_helper_parameters) - print("***** jamf_Patcher process: SUCCESS *****") + log.info("***** jamf_Patcher process: SUCCESS *****") if __name__ == "__main__": main()