v1.2.0 = Actual Python3 Ready, Bug Fixes, Code Improvements & more!

+ Numerous bug fixes
    + *Now* _actually_ Python3 ready...
    + Better (actual) support for macOS 11 and newer
    + Updated for Jamf Pro's new icon API
  + Adding logging function for handling output to Policy Logs (level can be dynamically changed)
  + Further improved launchctl functions
  + Additional code improvements
This commit is contained in:
Zack T
2023-09-20 17:35:54 -07:00
parent 2fb5b53fc1
commit b7c6d4c459

View File

@@ -3,15 +3,18 @@
####################################################################################################
# Script Name: jamf_Patcher.py
# By: Zack Thompson / Created: 7/10/2019
# Version: 1.1.1 / Updated: 9/20/2023 / By: ZT
# Version: 1.2.0 / Updated: 9/20/2023 / By: ZT
#
# Description: This script handles patching of applications with user notifications.
#
####################################################################################################
import logging
import os
import platform
import plistlib
import re
import shlex
import signal
import subprocess
import sys
@@ -22,59 +25,122 @@ except ImportError:
from urllib import request as urllib # For Python 3
def run_utility(command, continue_on_error=True):
"""A helper function for subprocess.
def log_setup():
"""Setup logging"""
# Create logger
logger = logging.getLogger("Jamf Pro Patcher")
logger.setLevel(logging.DEBUG)
# Create file handler which logs even debug messages
# file_handler = logging.FileHandler("/var/log/JamfPatcher.log")
# file_handler.setLevel(logging.INFO)
# Create console handler with a higher log level
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create formatter and add it to the handlers
formatter = logging.Formatter(
"%(asctime)s | %(levelname)s | %(name)s:%(lineno)s - %(funcName)20s() | %(message)s")
# file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
# logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
# Initialize logging
log = log_setup()
def execute_process(command, input=None):
"""
A helper function for subprocess.
Args:
command (str): String containing the commands and
arguments that will be passed to a shell.
continue_on_error (bool): Whether to continue on error or not
command (str): The command line level syntax that would be written in a
shell script or a terminal window
Returns:
stdout: output of the command
dict: Results in a dictionary
"""
try:
return subprocess.check_output(command, shell=True)
# Validate that command is not a string
if not isinstance(command, str):
raise TypeError("Command must be a str type")
except subprocess.CalledProcessError as error:
# Format the command
command = shlex.split(command)
if continue_on_error:
return "continue"
# Run the command
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False,
universal_newlines=True
)
print(f"Error code: {error.returncode}\nError: {error}")
return "error"
if input:
(stdout, stderr) = process.communicate(input=input)
else:
(stdout, stderr) = process.communicate()
return {
"stdout": (stdout).strip(),
"stderr": (stderr).strip() if stderr != None else None,
"exitcode": process.returncode,
"success": True if process.returncode == 0 else False
}
def plist_reader(plist_file):
def plist_reader(plist_file_path):
"""A helper function to get the contents of a Property List file.
Args:
plist_file (str): A .plist file path to read in.
plist_file_path (str): A .plist file path to read in.
Returns:
stdout: Returns the contents of the plist file.
"""
if os.path.exists(plist_file):
# print(f"Reading {plist_file}...")
if os.path.exists(plist_file_path):
log.debug(f"Reading {plist_file_path}...")
try:
plist_contents = plistlib.load(plist_file)
except Exception:
file_cmd = f"/usr/bin/file --mime-encoding {plist_file}"
file_response = run_utility(file_cmd)
file_type = file_response.split(": ")[1].strip()
# print(f"File Type: {file_type}")
with open(plist_file_path, "rb") as plist_file:
if file_type == "binary":
# print("Converting plist...")
plutil_cmd = f"/usr/bin/plutil -convert xml1 {plist_file}"
_ = run_utility(plutil_cmd)
try:
plist_contents = plistlib.load(plist_file)
except Exception:
file_cmd = f"/usr/bin/file --mime-encoding {plist_file}"
file_response = execute_process(file_cmd).get("stdout")
file_type = file_response.split(": ")[1].strip()
log.debug(f"File Type: {file_type}")
plist_contents = plistlib.load(plist_file)
if file_type == "binary":
log.debug("Converting plist...")
plutil_cmd = f"/usr/bin/plutil -convert xml1 {plist_file}"
_ = execute_process(plutil_cmd)
plist_contents = plistlib.load(plist_file)
else:
print("Something's terribly wrong here...")
log.warning("Something's terribly wrong here...")
return plist_contents
def plist_writer(contents, plist_file_path):
"""A helper function to write the contents of a Property List file.
Args:
contents (str): The content to write to a plist file.
plist_file_path (str): A .plist file path to read in.
"""
with open(plist_file_path, "wb") as plist_file:
plistlib.dump(contents, plist_file)
def prompt_to_patch(**parameters):
"""Uses Jamf Helper to prompt a user to patch a software title,
optionally allowing them to delay the patch.
@@ -84,47 +150,48 @@ def prompt_to_patch(**parameters):
"""
# Prompt user to quit app.
prompt = f"""\
'{parameters.get("jamf_helper")}' \
-window_type '{parameters.get("window_type")}' \
-title '{parameters.get("title")}' \
-icon '{parameters.get("icon")}' \
-heading '{parameters.get("heading")}' \
-description '{parameters.get("description")}' \
-button1 OK \
-timeout 3600 \
-countdown \
-countdownPrompt 'If you wish to delay this patch, please make a selection in ' \
-alignCountdown center \
-lockHUD \
-showDelayOptions ', 600, 3600, 86400' \
"""
prompt = (
f"'{parameters.get('jamf_helper')}' "
f"-windowType '{parameters.get('window_type')}' "
f"-title '{parameters.get('title')}' "
f"-icon '{parameters.get('icon')}' "
f"-iconSize '{parameters.get('icon_size')}' "
f"-heading '{parameters.get('heading')}' "
f"-description '{parameters.get('description')}' "
"-button1 OK "
"-timeout 3600 "
"-countdown "
"-countdownPrompt 'If you wish to delay this patch, please make a selection in ' "
"-alignCountdown center "
"-lockHUD "
"-showDelayOptions ', 600, 3600, 86400' "
)
selection = run_utility(prompt)
print(f"SELECTION: {selection}")
selection = execute_process(prompt).get("stdout")
log.debug(f"User delay selection: {selection}")
if selection == "1":
print("User selected to patch now.")
log.info("User selected to patch now.")
kill_and_install(**parameters)
elif selection[:-1] == "600":
print("DELAY: 600 seconds")
log.info("DELAY: 600 seconds")
create_delay_daemon(delayTime=600, **parameters)
elif selection[:-1] == "3600":
print("DELAY: 3600 seconds")
log.info("DELAY: 3600 seconds")
create_delay_daemon(delayTime=3600, **parameters)
elif selection[:-1] == "86400":
print("DELAY: 86400 seconds")
log.info("DELAY: 86400 seconds")
create_delay_daemon(delayTime=86400, **parameters)
elif selection == "243":
print("TIMED OUT: user did not make a selection")
log.info("TIMED OUT: user did not make a selection")
kill_and_install(**parameters)
else:
print("Unknown action was taken at prompt.")
log.info("Unknown action was taken at prompt.")
kill_and_install(**parameters)
@@ -137,35 +204,32 @@ def kill_and_install(**parameters):
"""
try:
print("Attempting to close app if it's running...")
# Get PID of the application
pid = int(parameters.get("status").split(" ")[0])
print(f"Process ID: {pid}")
log.info("Attempting to close app if it's running...")
# Kill PID
os.kill(pid, signal.SIGTERM) #or signal.SIGKILL
os.kill(parameters.get("pid"), signal.SIGTERM) #or signal.SIGKILL
except Exception:
print("Unable to terminate app, assuming it was manually closed...")
log.info("Unable to terminate app, assuming it was manually closed...")
print("Performing install...")
log.info("Performing install...")
# Run Policy
run_utility(parameters.get("install_policy"))
# print("Test run, don't run policy!")
execute_process(parameters.get("install_policy"))
prompt = f"""\
'{parameters.get("jamf_helper")}' \
-window_type '{parameters.get("window_type")}' \
-title '{parameters.get("title")}' \
-icon '{parameters.get("icon")}' \
-heading '{parameters.get("heading")}' \
-description '{parameters.get("description_complete")}' \
-button1 OK \
-timeout 60 \
-alignCountdown center \
-lockHUD \
"""
prompt = (
f"'{parameters.get('jamf_helper')}' "
f"-windowType '{parameters.get('window_type')}' "
f"-title '{parameters.get('title')}' "
f"-icon '{parameters.get('icon')}' "
f"-iconSize '{parameters.get('icon_size')}' "
f"-heading '{parameters.get('heading')}' "
f"-description '{parameters.get('description_complete')}' "
"-button1 OK "
"-timeout 60 "
"-alignCountdown center "
"-lockHUD "
)
run_utility(prompt)
execute_process(prompt)
def create_delay_daemon(**parameters):
@@ -179,7 +243,7 @@ def create_delay_daemon(**parameters):
application_name = parameters.get("application_name")
launch_daemon_label = parameters.get('launch_daemon_label')
launch_daemon_location = parameters.get("launch_daemon_location")
os_minor_version = parameters.get("os_minor_version")
os_version = parameters.get("os_version")
patch_plist = parameters.get("patch_plist")
# Configure for delay.
@@ -189,12 +253,12 @@ def create_delay_daemon(**parameters):
else:
patch_plist_contents = { application_name : "Delayed" }
plistlib.dump(patch_plist_contents, patch_plist)
plist_writer(patch_plist_contents, patch_plist)
print("Creating the Patcher LaunchDaemon...")
log.info("Creating the Patcher LaunchDaemon...")
launch_daemon_plist = {
"Label": "com.github.mlbz521.jamf.patcher",
"Label": launch_daemon_label,
"ProgramArguments": [
"/usr/local/jamf/bin/jamf",
"policy",
@@ -205,11 +269,10 @@ def create_delay_daemon(**parameters):
"AbandonProcessGroup": True,
}
plistlib.dump(launch_daemon_plist, launch_daemon_location)
plist_writer(launch_daemon_plist, launch_daemon_location)
if os.path.exists(launch_daemon_location):
# Start the LaunchDaemon
start_daemon(os_minor_version, launch_daemon_label, launch_daemon_location)
start_daemon(os_version, launch_daemon_label, launch_daemon_location)
def clean_up(**parameters):
@@ -220,12 +283,12 @@ def clean_up(**parameters):
parameters (kwargs): Key word arguments
"""
print("Performing cleanup...")
log.info("Performing cleanup...")
application_name = parameters.get("application_name")
launch_daemon_label = parameters.get('launch_daemon_label')
launch_daemon_location = parameters.get("launch_daemon_location")
os_minor_version = parameters.get("os_minor_version")
os_version = parameters.get("os_version")
patch_plist = parameters.get("patch_plist")
# Clean up patch_plist.
@@ -234,45 +297,38 @@ def clean_up(**parameters):
if patch_plist_contents.get(application_name):
patch_plist_contents.pop(application_name, None)
print(f"Removing previously delayed app: {application_name}")
plistlib.dump(patch_plist_contents, patch_plist)
log.info(f"Removing previously delayed app: {application_name}")
plist_writer(patch_plist_contents, patch_plist)
else:
print(f"App not listed in patch_plist: {application_name}")
log.info(f"App not listed in patch_plist: {application_name}")
# Stop LaunchDaemon before deleting it
stop_daemon(os_minor_version, launch_daemon_label, launch_daemon_location)
stop_daemon(os_version, launch_daemon_label, launch_daemon_location)
if os.path.exists(launch_daemon_location):
os.remove(launch_daemon_location)
def execute_launchctl(sub_cmd, service_target, exit_code_only=False):
def execute_launchctl(sub_cmd, service_target):
"""A helper function to run launchctl.
Args:
sub_cmd (str): A launchctl subcommand (option)
service_target (str): A launchctl service target (parameter)
exit_code_only (bool, optional): Whether to report only success or failure.
Defaults to False.
Returns:
bool | str: Depending on `exit_code_only`, returns either a bool or
the output from launchctl
"""
launchctl_cmd = f"/bin/launchctl {sub_cmd} {service_target}"
if exit_code_only:
launchctl_cmd = f"{launchctl_cmd} > /dev/null 2>&1; echo $?"
return run_utility(launchctl_cmd)
return execute_process(f"/bin/launchctl {sub_cmd} {service_target}")
def is_daemon_running(os_minor_version, launch_daemon_label):
def is_daemon_running(os_version, launch_daemon_label):
"""Checks if the daemon is running.
Args:
os_minor_version (int): Used to determines the proper launchctl
os_version (int): Used to determines the proper launchctl
syntax based on OS Version of the device
launch_daemon_label (str): The LaunchDaemon's Label
@@ -280,98 +336,107 @@ def is_daemon_running(os_minor_version, launch_daemon_label):
bool: True or False whether or not the LaunchDaemon is running
"""
if os_minor_version >= 11:
return execute_launchctl("print", f"system/{launch_daemon_label}", exit_code_only=True)
if os_version >= 10.11:
exit_code = execute_launchctl("print", f"system/'{launch_daemon_label}'").get("exitcode")
elif os_minor_version <= 10:
return execute_launchctl("list", launch_daemon_label, exit_code_only=True)
elif os_version <= 10.10:
exit_code = execute_launchctl("list", f"'{launch_daemon_label}'").get("exitcode")
return exit_code == 0
def start_daemon(os_minor_version, launch_daemon_label, launch_daemon_location):
def start_daemon(os_version, launch_daemon_label, launch_daemon_location):
"""Starts a daemon, if it is running, it will be restarted in
case a change was made to the plist file.
Args:
os_minor_version (int): Used to determines the proper launchctl
os_version (int): Used to determines the proper launchctl
syntax based on OS Version of the device
launch_daemon_label (str): The LaunchDaemon's Label
launch_daemon_location (str): The file patch to the LaunchDaemon
"""
restart_daemon(os_minor_version, launch_daemon_label, launch_daemon_location)
if is_daemon_running(os_version, launch_daemon_label):
restart_daemon(os_version, launch_daemon_label, launch_daemon_location)
start_daemon(os_version, launch_daemon_label, launch_daemon_location)
print("Loading LaunchDaemon...")
else:
log.info("Loading LaunchDaemon...")
if os_minor_version >= 11:
execute_launchctl("bootstrap", f"system {launch_daemon_location}")
execute_launchctl("enable", f"system/{launch_daemon_label}")
if os_version >= 10.11:
execute_launchctl("bootstrap", f"system '{launch_daemon_location}'")
execute_launchctl("enable", f"system/'{launch_daemon_label}'")
elif os_minor_version <= 10:
execute_launchctl("load", launch_daemon_location)
elif os_version <= 10.10:
execute_launchctl("load", launch_daemon_location)
def restart_daemon(os_minor_version, launch_daemon_label, launch_daemon_location):
def restart_daemon(os_version, launch_daemon_label, launch_daemon_location):
"""Restarts a daemon if it is running.
Args:
os_minor_version (int): Used to determines the proper launchctl
os_version (int): Used to determines the proper launchctl
syntax based on OS Version of the device
launch_daemon_label (str): The LaunchDaemon's Label
launch_daemon_location (str): The file patch to the LaunchDaemon
"""
# Check if the LaunchDaemon is running.
exit_code = is_daemon_running(os_minor_version, launch_daemon_label)
if int(exit_code) == 0:
print("LaunchDaemon is currently started; stopping now...")
if os_minor_version >= 11:
execute_launchctl("bootout", f"system/{launch_daemon_label}")
elif os_minor_version <= 10:
execute_launchctl("unload", launch_daemon_location)
if is_daemon_running(os_version, launch_daemon_label):
log.info("LaunchDaemon is currently started; stopping now...")
stop_daemon(os_version, launch_daemon_label, launch_daemon_location)
def stop_daemon(os_minor_version, launch_daemon_label, launch_daemon_location):
def stop_daemon(os_version, launch_daemon_label, launch_daemon_location):
"""Stops a daemon.
Args:
os_minor_version (int): Used to determines the proper launchctl
os_version (int): Used to determines the proper launchctl
syntax based on OS Version of the device
launch_daemon_label (str): The LaunchDaemon's Label
launch_daemon_location (str): The file patch to the LaunchDaemon
"""
# Check if the LaunchDaemon is running.
exit_code = is_daemon_running(os_minor_version, launch_daemon_label)
if is_daemon_running(os_version, launch_daemon_label):
log.info("Stopping the LaunchDaemon...")
if int(exit_code) == 0:
print("Stopping the LaunchDaemon...")
if os_minor_version >= 11:
execute_launchctl("bootout", f"system/{launch_daemon_label}")
elif os_minor_version <= 10:
execute_launchctl("unload", launch_daemon_location)
if os_version >= 10.11:
execute_launchctl("bootout", f"system/'{launch_daemon_label}'")
elif os_version <= 10.10:
execute_launchctl("unload", f"'{launch_daemon_location}'")
else:
print("LaunchDaemon not running")
log.info("LaunchDaemon not running")
def get_major_minor_os_version():
os_version = platform.mac_ver()[0]
if os_version.count('.') > 1:
os_version = os_version.rsplit('.', maxsplit=1)[0]
return float(os_version)
def main():
print("***** jamf_Patcher process: START *****")
log.info("***** jamf_Patcher process: START *****")
##################################################
# Define Script Parameters
print(f"All args: {sys.argv}")
log.info(f"All args: {sys.argv}")
department_name = sys.argv[4] # "<Organization's> Technology Office"
application_name = sys.argv[5] # "zoom.us"
icon_id = sys.argv[6] # "https://jps.server.com:8443/icon?id=49167"
patch_id = sys.argv[7]
policy_id = sys.argv[8]
log_level = sys.argv[9]
for handler in log.handlers:
match log_level:
case "DEBUG":
handler.setLevel(logging.DEBUG)
case "INFO":
handler.setLevel(logging.INFO)
##################################################
# Define Variables
@@ -380,44 +445,37 @@ def main():
jamf_helper = "/Library/Application Support/JAMF/bin/jamfHelper.app/Contents/MacOS/jamfHelper"
launch_daemon_label = f"com.github.mlbz521.jamf.patcher.{application_name}"
launch_daemon_location = f"/Library/LaunchDaemons/{launch_daemon_label}.plist"
os_minor_version = platform.mac_ver()[0].split(".")[1]
os_version = get_major_minor_os_version()
install_policy = f"/usr/local/jamf/bin/jamf policy -id {policy_id}"
##################################################
# Define jamfHelper window values
title="Security Patch Notification"
window_type="hud"
description = f"{application_name} will be updated to patch a security vulnerability. \
Please quit {application_name} to apply this update.\n\nIf you have questions, \
please contact your deskside support group."
description_force = f"{application_name} will be updated to patch a security vulnerability. \
Please quit {application_name} within the allotted time to apply this update.\n\n\
If you have questions, please contact your deskside support group."
window_type="utility"
heading = "My Organization"
description = (f"{application_name} needs to be updated to patch a security vulnerability. "
f"Please quit {application_name} to apply this update.\n\nIf you have questions, "
"please contact your deskside support group.")
description_force = (f"{application_name} will be updated to patch a security vulnerability. "
f"Please quit {application_name} within the allotted time to apply this update.\n\n"
"If you have questions, please contact your deskside support group.")
description_complete = f"{application_name} has been patched!\n\n."
local_icon_path = f"/private/tmp/{application_name}_icon.png"
icon_size = "150"
if department_name:
heading = f"My Organization - {department_name}"
else:
heading = "My Organization"
heading = f"{heading} - {department_name}"
##################################################
# Bits staged...
process_check = f"/bin/ps -ax -o pid,command | \
/usr/bin/grep -E '/Applications/{application_name}' | \
/usr/bin/grep -v 'grep' 2> /dev/null"
status = run_utility(process_check)
print(f"APP STATUS: {status}")
parameters = {
"application_name": application_name,
"patch_plist": patch_plist,
"launch_daemon_label": launch_daemon_label,
"launch_daemon_location": launch_daemon_location,
"os_minor_version": os_minor_version,
"os_version": os_version,
"patch_id": patch_id,
"status": status,
"install_policy": install_policy
}
@@ -428,21 +486,36 @@ def main():
"heading": heading,
"description": description,
"description_complete": description_complete,
"icon": local_icon_path
"icon": local_icon_path,
"icon_size": icon_size
}
if status == "continue":
print(f"{application_name} is not running, installing now...")
run_utility(install_policy)
# print("Test run, don't run policy!")
# Check if application is running.
process_check = "/bin/ps -ax -o pid,command"
results = execute_process(process_check)
app_running = re.findall(
rf".*/Applications/{application_name}.*", results.get("stdout"), re.MULTILINE)
log.debug(f"Application status: {app_running}")
if not app_running:
log.info(f"{application_name} is not running, installing now...")
execute_process(install_policy)
clean_up(**parameters)
else:
print(f"{application_name} is running...")
log.info(f"{application_name} is running...")
# Get PID of the application
for value in app_running[0].split(" "):
if pid := re.match(r"(\d)+", value):
# pid = int(pid[0])
log.debug(f"Process ID: {pid}")
parameters |= {"pid": int(pid[0])}
break
# Download the icon from the JPS
icon_url = f"{jamf_pro_server}icon?id={icon_id}"
icon_url = f"{jamf_pro_server}api/v1/icon/download/{icon_id}"
try:
downloaded_icon = requests.get(icon_url)
@@ -451,43 +524,41 @@ def main():
sys.exc_clear()
urllib.urlretrieve(icon_url, filename=local_icon_path)
if os.path.exists(patch_plist):
patch_plist_contents = plist_reader(patch_plist)
if (
os.path.exists(patch_plist) and
# Delay Check
if patch_plist_contents.get(application_name):
print("STATUS: Patch has already been delayed; forcing upgrade.")
(patch_plist_contents := plist_reader(patch_plist).get(application_name))
):
# Prompt user with one last warning.
prompt = f"\
'{jamf_helper}' \
-window_type '{window_type}' \
-title '{title}' \
-icon '{local_icon_path}' \
-heading '{heading}' \
-description '{description_force}' \
-button1 OK \
-timeout 600 \
-countdown \
-countdownPrompt '{application_name} will be force closed in ' \
-alignCountdown center \
-lockHUD \
> /dev/null 2>&1 \
"
log.info("Patch has already been delayed; forcing upgrade...")
run_utility(prompt)
kill_and_install(**parameters, **jamf_helper_parameters)
clean_up(patch_plist_contents=patch_plist_contents, **parameters)
# Prompt user with one last warning.
prompt = (
f"'{jamf_helper}' "
f"-windowType '{window_type}' "
f"-title '{title}' "
f"-icon '{local_icon_path}' "
f"-iconSize '{icon_size}' "
f"-heading '{heading}' "
f"-description '{description_force}' "
"-button1 OK "
"-timeout 600 "
"-countdown "
f"-countdownPrompt '{application_name} will be force closed in ' "
"-alignCountdown center "
"-lockHUD "
"> /dev/null 2>&1"
)
else:
print("STATUS: Patch has not been delayed; prompting user.")
prompt_to_patch(**parameters, **jamf_helper_parameters)
execute_process(prompt)
kill_and_install(**parameters, **jamf_helper_parameters)
clean_up(patch_plist_contents=patch_plist_contents, **parameters)
else:
print("STATUS: Patch has not been delayed; prompting user.")
log.info("Patch has not been delayed; prompting user...")
prompt_to_patch(**parameters, **jamf_helper_parameters)
print("***** jamf_Patcher process: SUCCESS *****")
log.info("***** jamf_Patcher process: SUCCESS *****")
if __name__ == "__main__":
main()