diff --git a/Jamf Pro/Scripts/jamf_CollectDiagnostics.py b/Jamf Pro/Scripts/Collect-Diagnostics.py similarity index 56% rename from Jamf Pro/Scripts/jamf_CollectDiagnostics.py rename to Jamf Pro/Scripts/Collect-Diagnostics.py index d38f6b7..1b23dce 100644 --- a/Jamf Pro/Scripts/jamf_CollectDiagnostics.py +++ b/Jamf Pro/Scripts/Collect-Diagnostics.py @@ -1,9 +1,9 @@ #!/opt/ManagedFrameworks/Python.framework/Versions/Current/bin/python3 """ -Script Name: jamf_CollectDiagnostics.py +Script Name: Collect-Diagnostics.py By: Zack Thompson / Created: 8/22/2019 -Version: 1.5.0 / Updated: 12/03/2021 By: ZT +Version: 1.6.0 / Updated: 5/2/2022 By: ZT Description: This script allows you to upload a compressed zip of specified files to a computers' inventory record. @@ -14,18 +14,20 @@ import argparse import base64 import csv import datetime -from Foundation import NSBundle import json -import objc import os import plistlib -import requests -import shutil +import re +import shlex import sqlite3 import subprocess import sys import zipfile +from Foundation import NSBundle, NSString +import objc +import requests + # Jamf Function to obfuscate credentials. def DecryptString(inputString, salt, passphrase): @@ -33,66 +35,52 @@ def DecryptString(inputString, salt, passphrase): Usage: >>> DecryptString("Encrypted String", "Salt", "Passphrase") """ - return runUtility( - "echo '{inputString}' | /usr/bin/openssl enc -aes256 -d -a -A -S '{salt}' -k '{passphrase}'".format( - salt=salt, passphrase=passphrase, inputString=inputString)) + return execute_process( + "/usr/bin/openssl enc -aes256 -d -a -A -S '{salt}' -k '{passphrase}'".format( + salt=salt, passphrase=passphrase), input=inputString)["stdout"] -def runUtility(command): - """A helper function for subprocess. +def execute_process(command, input=None): + """ + A helper function for subprocess. + Args: - command: String containing the commands and arguments that will be passed to a shell. + command (str): The command line level syntax that would be written in a + shell script or a terminal window + Returns: - stdout: output of the command + dict: Results in a dictionary """ - try: - process = subprocess.check_output(command, shell=True) - except subprocess.CalledProcessError as error: - print ("Error code: {}".format(error.returncode)) - print ("Error: {}".format(error)) - process = "error" + # Validate that command is not a string + if not isinstance(command, str): + raise TypeError("Command must be a str type") - return process + # Format the command + command = shlex.split(command) + # Run the command + process = subprocess.Popen( + command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=False, + universal_newlines=True + ) -def plistReader(plist_file_path, verbose): - """A helper function to get the contents of a Property List. - Args: - plist_file_path: A .plist file to read in. - Returns: - stdout: Returns the contents of the plist file. - """ + if input: + (stdout, stderr) = process.communicate(input=input) - if os.path.exists(plist_file_path): - if verbose: - print("Opening plist: {}".format(plist_file_path)) - - try: - # Get the contents of the plist file. - with open(plist_file_path, "rb") as plist_file: - plist_Contents = plistlib.load(plist_file) - except Exception: - file_cmd = "/usr/bin/file --mime-encoding {}".format(plist_file_path) - file_response = runUtility(file_cmd) - file_type = file_response.split(": ")[1].strip() - if verbose: - print("File Type: {}".format(file_type)) - - if file_type == "binary": - if verbose: - print("Converting plist...") - plutil_cmd = "/usr/bin/plutil -convert xml1 {}".format(plist_file_path) - plutil_response = runUtility(plutil_cmd) - - # Get the contents of the plist file. - with open(plist_file_path, "rb") as plist_file: - plist_Contents = plistlib.load(plist_file) else: - print("ERROR: Unable to locate the specified plist file!") - sys.exit(3) + (stdout, stderr) = process.communicate() - return plist_Contents + return { + "stdout": (stdout).strip(), + "stderr": (stderr).strip() if stderr != None else None, + "exitcode": process.returncode, + "success": True if process.returncode == 0 else False + } # Modified from: https://stackoverflow.com/a/36211470 @@ -126,46 +114,86 @@ def dbTableWriter(database, table): return os.path.abspath(file_name) -# Credit to (Mikey Mike/Froger/Pudquick/etc) for this logic: -# https://gist.github.com/pudquick/c7dd1262bd81a32663f0 -def get_system(attribute): - """A helper function to get specific system attributes. +def get_system_info(): + """ + A helper function to get specific system attributes. + + Credit: Mikey Mike/Froger/Pudquick/etc + Source: https://gist.github.com/pudquick/c7dd1262bd81a32663f0 + Notes: Modified from source + Args: - type: The system attribute desired. + attribute: The system attribute desired. + Returns: stdout: The system attribute value. """ IOKit_bundle = NSBundle.bundleWithIdentifier_("com.apple.framework.IOKit") functions = [ - ( "IOServiceGetMatchingService", b"II@" ), - ( "IOServiceMatching", b"@*" ), - ( "IORegistryEntryCreateCFProperty", b"@I@@I" ) + ("IORegistryEntryCreateCFProperty", b"@I@@I"), + ("IOServiceGetMatchingService", b"II@"), + ("IOServiceMatching", b"@*") ] objc.loadBundleFunctions(IOKit_bundle, globals(), functions) - def io_key(keyname): - return IORegistryEntryCreateCFProperty( - int( IOServiceGetMatchingService( - 0, IOServiceMatching( - "IOPlatformExpertDevice".encode("utf-8")))), keyname, None, 0) + def io_key(key_name, service_name="IOPlatformExpertDevice"): + service = IOServiceMatching(service_name.encode("utf-8")) + key = NSString.stringWithString_(key_name) + return IORegistryEntryCreateCFProperty(IOServiceGetMatchingService(0, service), key, None, 0) def get_hardware_uuid(): return io_key("IOPlatformUUID") # def get_hardware_serial(): - # return io_key("IOPlatformSerialNumber".encode("utf-8")) + # return io_key("IOPlatformSerialNumber") # def get_board_id(): - # return str(io_key("board-id".encode("utf-8"))).rstrip("\x00") + # try: + # return bytes(io_key("board-id")).decode().rstrip("\x00") + # except TypeError: + # return "" - options = {"uuid" : get_hardware_uuid #, - # "serial" : get_hardware_serial, - # "boardID" : get_board_id + # def get_model_id(): + # return bytes(io_key("model")).decode().rstrip("\x00") + + # def lookup_model(lookup_code): + # xml = requests.get("https://support-sp.apple.com/sp/product?cc={}".format(lookup_code)).text + + # try: + # tree = ElementTree.fromstringlist(xml) + # return tree.find(".//configCode").text + + # except ElementTree.ParseError as err: + # print("Failed to retrieve model name: {}".format(err.strerror)) + # return "" + + + # serial_number = get_hardware_serial() + # sn_length = len(serial_number) + # model = "" + + # if sn_length == 10: + # results = execute_process("/usr/sbin/ioreg -arc IOPlatformDevice -k product-name") + + # if results["success"]: + # plist_contents = plistlib.loads(results["stdout"].encode()) + # model = plist_contents[0].get("product-name").decode().rstrip("\x00") + + # elif sn_length == 12: + # model = lookup_model(serial_number[-4:]) + + # elif sn_length == 11: + # model = lookup_model(serial_number[-3:]) + + return { + # "serial_number": serial_number, + "uuid": get_hardware_uuid(), + # "board_id": get_board_id(), + # "model_id": get_model_id(), + # "model_friendly": model } - return options[attribute]() - def apiGET(**parameters): """A helper function that performs a GET to the Jamf API. @@ -196,7 +224,8 @@ def apiGET(**parameters): statusCode = response.status_code json_response = response.json() - except Exception: + except Exception as error: + print("Requests error:\n{}".format(error)) # If `requests` fails, resort to using curl. if parameters.get("verbose"): @@ -207,8 +236,8 @@ def apiGET(**parameters): 'statusCode:%{{http_code}}' --location --header 'Accept: application/json' \ --header 'Authorization: Basic {jps_credentials}' --url {url} --request GET".format( jps_credentials=parameters.get("jps_credentials"), url=url) - response = runUtility(curl_cmd) - json_content, statusCode = response.split(b"statusCode:") + response = execute_process(curl_cmd)['stdout'] + json_content, statusCode = response.split("statusCode:") json_response = json.loads(json_content) return statusCode, json_response @@ -268,14 +297,67 @@ def apiPOST(**parameters): file_to_upload=parameters.get("file_to_upload") ) - response = runUtility(curl_cmd) - content, statusCode = response.split(b"statusCode:") + response = execute_process(curl_cmd)['stdout'] + content, statusCode = response.split("statusCode:") return statusCode, content +def archiver(path, archive, mode="a", verbose=True): + """A Context Manager for creating or modifying a compressed archive. + + Args: + path (str): Path to a file or directory to include in the archive + archive (str): Path to the archive file; will be created if it does not exist + mode (str, optional): The mode that will be used to open the archive. Defaults to "a". + verbose (bool, optional): Print verbose messages. Defaults to True. + """ + + if verbose: + print("Archiving: {}".format(os.path.abspath(path))) + + if os.path.exists(path): + + with zipfile.ZipFile(archive, 'a', zipfile.ZIP_DEFLATED) as zip_file: + + if os.path.isdir(path): + + for root, dirs, files in os.walk(path): + + for file in files: + + zip_file.write( + os.path.join(root, file), + os.path.relpath( + os.path.join(root, file), + os.path.join(path, '..') + ) + ) + + else: + + zip_file.write(os.path.abspath(path), compress_type=zipfile.ZIP_DEFLATED) + + else: + print("WARNING: Unable to locate the specified file!") + + def main(): - # print("All calling args: {}".format(sys.argv)) + # print("All calling args: {}\n".format(sys.argv)) + + parse_args = [] + + for arg in sys.argv: + + if arg != "": + + if re.match(r'.*("|\').*', arg): + parse_args.extend(shlex.split(arg)) + + else: + parse_args.append(arg) + + # print("Parsed args: {}\n".format(parse_args)) ################################################## # Define Script Parameters @@ -283,43 +365,48 @@ def main(): parser = argparse.ArgumentParser( description="This script allows you to upload a compressed zip \ of specified files to a computers' inventory record") - collection = parser.add_mutually_exclusive_group() - parser.add_argument("--api-username", "-u", help="Provide the encrypted string for the API Username", required=True) parser.add_argument("--api-password", "-p", help="Provide the encrypted string for the API Password", required=True) - collection.add_argument("--defaults", default=True, + parser.add_argument("--defaults", default=True, help="Collects the default files.", required=False) - collection.add_argument("--file", "-f", type=str, nargs=1, + parser.add_argument("--file", "-f", type=str, nargs="*", help="Specify specific file to collect.", required=False) - collection.add_argument("--directory", "-d", metavar="/path/to/directory/", type=str, + parser.add_argument("--directory", "-d", metavar="/path/to/directory/", type=str, nargs="*", help="Specify a specific directory to collect.", required=False) parser.add_argument("--quiet", "-q", action="store_true", help="Do not print verbose messages.", required=False) - args = parser.parse_known_args() + args = parser.parse_known_args(args=parse_args) args = args[0] print("Argparse args: {}".format(args)) # sys.exit(0) if len(sys.argv) > 1: + upload_items = [] + if args.file: - upload_items = [] - upload_items.append((args.file[0]).strip()) - elif args.directory: - upload_items = (args.directory).strip() - elif args.defaults: - upload_items = [ - "/private/var/log/jamf.log", - "/private/var/log/install.log", - "/private/var/log/system.log", - "/private/var/log/jamf_RecoveryAgent.log", - "/private/var/log/jamf_ReliableEnrollment.log", - "/private/var/log/32bitApps_inventory.log", - "/opt/ManagedFrameworks/EA_History.log" + for file in args.file: + upload_items.append((file).strip()) + + if args.directory: + for folder in args.directory: + upload_items.append((folder).strip()) + + if args.defaults: + upload_items.extend( + [ + "/private/var/log/jamf.log", + "/private/var/log/install.log", + "/private/var/log/system.log", + "/private/var/log/jamf_RecoveryAgent.log", + "/private/var/log/jamf_ReliableEnrollment.log", + "/private/var/log/32bitApps_inventory.log", + "/opt/ManagedFrameworks/EA_History.log" ] + ) # Setup databases that we want to collect info from db_kext = {} @@ -345,25 +432,30 @@ def main(): (args.api_username).strip(), "", "" - )).strip().decode() + ) + ).strip() jps_api_password = ( DecryptString( (args.api_password).strip(), "", "") - ).strip().decode() + ) + ).strip() jps_credentials = ( base64.b64encode( "{}:{}".format(jps_api_user, jps_api_password).encode() )).decode() time_stamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") - archive_file = "/private/tmp/{}_logs".format(time_stamp) - archive_max_size = 40000000 + archive_file = "/private/tmp/{}_logs.zip".format(time_stamp) + archive_max_size = 50000000 # 50MB # Get the systems' Jamf Pro Server if os.path.exists(jamf_plist): - jamf_plist_contents = plistReader(jamf_plist, verbose) + with open(jamf_plist, "rb") as plist: + jamf_plist_contents = plistlib.load(plist) + jps_url = jamf_plist_contents["jss_url"] + if verbose: print("Jamf Pro Server URL: {}".format(jps_url)) else: @@ -371,7 +463,7 @@ def main(): sys.exit(1) # Get the system's UUID - hw_UUID = get_system("uuid") + hw_UUID = get_system_info().get("uuid") if verbose: print("System UUID: {}".format(hw_UUID)) @@ -383,41 +475,28 @@ def main(): if database_items: print("Requested databases: {}".format(database_items)) - if args.directory: - if os.path.exists(upload_items): - parent_directory = os.path.abspath(os.path.join(upload_items, os.pardir)) - shutil.make_archive(archive_file, "zip", parent_directory, upload_items ) - else: - print("ERROR: Unable to locate the provided directory!") - sys.exit(4) - else: - zip_file = zipfile.ZipFile("{}.zip".format(archive_file), "w") - for upload_item in upload_items: + for upload_item in upload_items: + archiver(upload_item, archive=archive_file, verbose=verbose) + + for database_item in database_items: + if os.path.exists(database_item["database"]): if verbose: - print("Archiving file: {}".format(os.path.abspath(upload_item))) - if os.path.exists(upload_item): - zip_file.write(os.path.abspath(upload_item), compress_type=zipfile.ZIP_DEFLATED) - else: - print("WARNING: Unable to locate the specified file!") - for database_item in database_items: - if os.path.exists(database_item["database"]): + print( + "Archiving tables from database: {}".format( + os.path.abspath(database_item["database"]))) + for table in database_item["tables"]: if verbose: - print( - "Archiving tables from database: {}".format( - os.path.abspath(database_item["database"]))) - for table in database_item["tables"]: - if verbose: - print("Creating csv and archiving table: {}".format(table)) - file_name = dbTableWriter(database_item["database"], table) - zip_file.write(os.path.abspath(file_name), compress_type=zipfile.ZIP_DEFLATED) - else: - print("WARNING: Unable to locate the specified file!") + print("Creating csv and archiving table: {}".format(table)) + file_name = dbTableWriter(database_item["database"], table) - zip_file.close() + archiver(os.path.abspath(file_name), archive=archive_file, verbose=verbose) + else: + print("WARNING: Unable to locate the specified database!") + + archive_size = os.path.getsize(archive_file) - archive_size = os.path.getsize("{}.zip".format(archive_file)) if verbose: - print("Archive name: {}.zip".format(archive_file)) + print("Archive name: {}".format(archive_file)) print("Archive size: {}".format(archive_size)) if archive_size > archive_max_size: @@ -445,7 +524,7 @@ def main(): jps_url=jps_url, jps_credentials=jps_credentials, endpoint="/fileuploads/computers/id/{id}".format(id=computer_id), - file_to_upload="{}.zip".format(archive_file), + file_to_upload=archive_file, archive_size=archive_size, verbose=verbose )