diff --git a/Jamf Pro/Scripts/jamf_CollectDiagnostics.py b/Jamf Pro/Scripts/jamf_CollectDiagnostics.py index 1bada26..d38f6b7 100644 --- a/Jamf Pro/Scripts/jamf_CollectDiagnostics.py +++ b/Jamf Pro/Scripts/jamf_CollectDiagnostics.py @@ -1,14 +1,13 @@ #!/opt/ManagedFrameworks/Python.framework/Versions/Current/bin/python3 + """ -################################################################################################### -# Script Name: jamf_CollectDiagnostics.py -# By: Zack Thompson / Created: 8/22/2019 -# Version: 1.4.1 / Updated: 12/03/2021 By: ZT -# -# Description: This script allows you to upload a compressed zip of specified files to a -# computers' inventory record. -# -################################################################################################### +Script Name: jamf_CollectDiagnostics.py +By: Zack Thompson / Created: 8/22/2019 +Version: 1.5.0 / Updated: 12/03/2021 By: ZT + +Description: This script allows you to upload a compressed + zip of specified files to a computers' inventory record. + """ import argparse @@ -20,20 +19,23 @@ import json import objc import os import plistlib +import requests import shutil import sqlite3 import subprocess import sys import zipfile -from urllib import request as urllib - # Jamf Function to obfuscate credentials. def DecryptString(inputString, salt, passphrase): - """Usage: >>> DecryptString("Encrypted String", "Salt", "Passphrase")""" - result = runUtility('echo \'{inputString}\' | /usr/bin/openssl enc -aes256 -d -a -A -S \'{salt}\' -k \'{passphrase}\''.format(salt=salt, passphrase=passphrase, inputString=inputString)) - return result + """ + Usage: >>> DecryptString("Encrypted String", "Salt", "Passphrase") + """ + + return runUtility( + "echo '{inputString}' | /usr/bin/openssl enc -aes256 -d -a -A -S '{salt}' -k '{passphrase}'".format( + salt=salt, passphrase=passphrase, inputString=inputString)) def runUtility(command): @@ -47,8 +49,8 @@ def runUtility(command): try: process = subprocess.check_output(command, shell=True) except subprocess.CalledProcessError as error: - print ('Error code: {}'.format(error.returncode)) - print ('Error: {}'.format(error)) + print ("Error code: {}".format(error.returncode)) + print ("Error: {}".format(error)) process = "error" return process @@ -64,30 +66,30 @@ def plistReader(plist_file_path, verbose): if os.path.exists(plist_file_path): if verbose: - print('Opening plist: {}'.format(plist_file_path)) + print("Opening plist: {}".format(plist_file_path)) try: # Get the contents of the plist file. with open(plist_file_path, "rb") as plist_file: plist_Contents = plistlib.load(plist_file) except Exception: - file_cmd = '/usr/bin/file --mime-encoding {}'.format(plist_file_path) + file_cmd = "/usr/bin/file --mime-encoding {}".format(plist_file_path) file_response = runUtility(file_cmd) - file_type = file_response.split(': ')[1].strip() + file_type = file_response.split(": ")[1].strip() if verbose: - print('File Type: {}'.format(file_type)) + print("File Type: {}".format(file_type)) - if file_type == 'binary': + if file_type == "binary": if verbose: - print('Converting plist...') - plutil_cmd = '/usr/bin/plutil -convert xml1 {}'.format(plist_file_path) + print("Converting plist...") + plutil_cmd = "/usr/bin/plutil -convert xml1 {}".format(plist_file_path) plutil_response = runUtility(plutil_cmd) # Get the contents of the plist file. with open(plist_file_path, "rb") as plist_file: plist_Contents = plistlib.load(plist_file) else: - print('ERROR: Unable to locate the specified plist file!') + print("ERROR: Unable to locate the specified plist file!") sys.exit(3) return plist_Contents @@ -103,7 +105,7 @@ def dbTableWriter(database, table): file: Returns the abspath of the file. """ - file_name = '/private/tmp/{}.csv'.format(table) + file_name = "/private/tmp/{}.csv".format(table) # Setup database connection db_connect = sqlite3.connect(database) @@ -113,7 +115,7 @@ def dbTableWriter(database, table): database.execute("select * from {}".format(table)) # Write to file - with open(file_name,'w') as table_csv: + with open(file_name,"w") as table_csv: csv_out = csv.writer(table_csv) # Write header csv_out.writerow([description[0] for description in database.description]) @@ -124,7 +126,8 @@ def dbTableWriter(database, table): return os.path.abspath(file_name) -# Credit to (Mikey Mike/Froger/Pudquick/etc) for this logic: https://gist.github.com/pudquick/c7dd1262bd81a32663f0 +# Credit to (Mikey Mike/Froger/Pudquick/etc) for this logic: +# https://gist.github.com/pudquick/c7dd1262bd81a32663f0 def get_system(attribute): """A helper function to get specific system attributes. Args: @@ -133,12 +136,19 @@ def get_system(attribute): stdout: The system attribute value. """ - IOKit_bundle = NSBundle.bundleWithIdentifier_('com.apple.framework.IOKit') - functions = [("IOServiceGetMatchingService", b"II@"), ("IOServiceMatching", b"@*"), ("IORegistryEntryCreateCFProperty", b"@I@@I"),] + IOKit_bundle = NSBundle.bundleWithIdentifier_("com.apple.framework.IOKit") + functions = [ + ( "IOServiceGetMatchingService", b"II@" ), + ( "IOServiceMatching", b"@*" ), + ( "IORegistryEntryCreateCFProperty", b"@I@@I" ) + ] objc.loadBundleFunctions(IOKit_bundle, globals(), functions) def io_key(keyname): - return IORegistryEntryCreateCFProperty(int(IOServiceGetMatchingService(0, IOServiceMatching("IOPlatformExpertDevice".encode("utf-8")))), keyname, None, 0) + return IORegistryEntryCreateCFProperty( + int( IOServiceGetMatchingService( + 0, IOServiceMatching( + "IOPlatformExpertDevice".encode("utf-8")))), keyname, None, 0) def get_hardware_uuid(): return io_key("IOPlatformUUID") @@ -147,18 +157,21 @@ def get_system(attribute): # return io_key("IOPlatformSerialNumber".encode("utf-8")) # def get_board_id(): - # return str(io_key("board-id".encode("utf-8"))).rstrip('\x00') + # return str(io_key("board-id".encode("utf-8"))).rstrip("\x00") - options = {'uuid' : get_hardware_uuid #, - # 'serial' : get_hardware_serial, - # 'boardID' : get_board_id + options = {"uuid" : get_hardware_uuid #, + # "serial" : get_hardware_serial, + # "boardID" : get_board_id } return options[attribute]() def apiGET(**parameters): - """A helper function that performs a GET to the Jamf API. Attempts to first use the python urllib2 library, but if that fails, falls back to the system curl. + """A helper function that performs a GET to the Jamf API. + Attempts to first use the Python `requests` library, + but if that fails, falls back to the system curl. + Args: jps_url: Jamf Pro Server URL jps_credentials: base64 encoded credentials @@ -167,35 +180,45 @@ def apiGET(**parameters): stdout: json data from the response contents """ - url = parameters.get('jps_url') + 'JSSResource' + parameters.get('endpoint') - if parameters.get('verbose'): - print('API URL: {}'.format(url)) + url = "{}JSSResource{}".format(parameters.get("jps_url"), parameters.get("endpoint")) + + if parameters.get("verbose"): + print("API URL: {}".format(url)) try: - if parameters.get('verbose'): - print('Trying urllib...') - headers = {'Accept': 'application/json', 'Authorization': 'Basic ' + parameters.get('jps_credentials')} - request = urllib.Request(url, headers=headers) - response = urllib.urlopen(request) - statusCode = response.code - json_response = json.loads(response.read()) + if parameters.get("verbose"): + print("Trying `requests`...") + headers = { + "Accept": "application/json", + "Authorization": "Basic {}".format(parameters.get("jps_credentials")) + } + response = requests.get(url, headers=headers) + statusCode = response.status_code + json_response = response.json() except Exception: - # If urllib fails, resort to using curl. + # If `requests` fails, resort to using curl. + + if parameters.get("verbose"): + print("Trying curl...") - if parameters.get('verbose'): - print('Trying curl...') # Build the command. - curl_cmd = '/usr/bin/curl --silent --show-error --no-buffer --fail --write-out "statusCode:%{{http_code}}" --location --header "Accept: application/json" --header "Authorization: Basic {jps_credentials}" --url {url} --request GET'.format(jps_credentials=parameters.get('jps_credentials'), url=url) + curl_cmd = "/usr/bin/curl --silent --show-error --no-buffer --fail --write-out \ + 'statusCode:%{{http_code}}' --location --header 'Accept: application/json' \ + --header 'Authorization: Basic {jps_credentials}' --url {url} --request GET".format( + jps_credentials=parameters.get("jps_credentials"), url=url) response = runUtility(curl_cmd) - json_content, statusCode = response.split(b'statusCode:') + json_content, statusCode = response.split(b"statusCode:") json_response = json.loads(json_content) return statusCode, json_response def apiPOST(**parameters): - """A helper function that performs a POST to the Jamf API. Attempts to first use the python urllib2 library, but if that fails, falls back to the system curl. + """A helper function that performs a POST to the Jamf API. + Attempts to first use the python `requests` library, + but if that fails, falls back to the system curl. + Args: jps_url: Jamf Pro Server URL jps_credentials: base64 encoded credentials @@ -206,62 +229,81 @@ def apiPOST(**parameters): stdout: the response contents """ - url = parameters.get('jps_url') + 'JSSResource' + parameters.get('endpoint') - if parameters.get('verbose'): - print('API URL: {}'.format(url)) - if parameters.get('verbose'): - print('Uploading file: {}'.format(parameters.get('file_to_upload'))) + url = "{}JSSResource{}".format(parameters.get("jps_url"), parameters.get("endpoint")) + + if parameters.get("verbose"): + print("API URL: {}".format(url)) + print("Uploading file: {}".format(parameters.get("file_to_upload"))) - ##### Unable to quite get urllib to work at the moment... # try: - # if parameters.get('verbose'): print('Trying urllib...') - # basename = os.path.basename(parameters.get('file_to_upload')) - # # headers = {'Content-Disposition': 'name="{0}"'.format(parameters.get('file_to_upload')), 'Authorization': 'Basic ' + parameters.get('jps_credentials'), 'Content-Type': 'multipart/form-data', 'Content-Length': parameters.get('archive_size')} - # headers = {'Authorization': 'Basic ' + parameters.get('jps_credentials'), "Content-type" : "application/zip", 'Content-Length': parameters.get('archive_size')} - # request = urllib.Request(url, open(parameters.get('file_to_upload'), "rb"), headers=headers) - # response = urllib.urlopen(request) - # statusCode = response.code - # content = response.read() + ##### Unable to get requests nor urllib to work... + # if parameters.get("verbose"): + # print("Trying `requests`...") + + # files = { + # "name": (None, open(parameters.get("file_to_upload"), "rb")) + # } + + # body, content_type = requests.models.RequestEncodingMixin._encode_files(files, {}) + # headers = { + # "Authorization": "Basic {}".format(parameters.get("jps_credentials")), + # "Content-Type": content_type + # } + # response = requests.post(url, data=body, headers=headers) + # statusCode = response.status_code + # content = response.text # except Exception: # If urllib fails, resort to using curl. - if parameters.get('verbose'): - print('Trying curl...') + if parameters.get("verbose"): + print("Trying curl...") + # Build the command. - curl_cmd = '/usr/bin/curl --silent --show-error --no-buffer --fail --write-out "statusCode:%{{http_code}}" --location --header "Accept: application/json" --header "Authorization: Basic {jps_credentials}" --url {url} --request POST --form name=@{file_to_upload}'.format(jps_credentials=parameters.get('jps_credentials'), url=url, file_to_upload=parameters.get('file_to_upload')) + curl_cmd = "/usr/bin/curl --silent --show-error --no-buffer --fail --write-out \ + 'statusCode:%{{http_code}}' --location --header 'Accept: application/json' \ + --header 'Authorization: Basic {jps_credentials}' --url {url} --request POST \ + --form name=@{file_to_upload}".format( + jps_credentials=parameters.get("jps_credentials"), url=url, + file_to_upload=parameters.get("file_to_upload") + ) + response = runUtility(curl_cmd) - content, statusCode = response.split(b'statusCode:') + content, statusCode = response.split(b"statusCode:") return statusCode, content def main(): - # print('All calling args: {}'.format(sys.argv)) + # print("All calling args: {}".format(sys.argv)) ################################################## # Define Script Parameters - parser = argparse.ArgumentParser(description="This script allows you to upload a compressed zip of specified files to a computers' inventory record") + parser = argparse.ArgumentParser( + description="This script allows you to upload a compressed zip \ + of specified files to a computers' inventory record") collection = parser.add_mutually_exclusive_group() - parser.add_argument('--api-username', '-u', help='Provide the encrypted string for the API Username', required=True) - parser.add_argument('--api-password', '-p', help='Provide the encrypted string for the API Password', required=True) - collection.add_argument('--defaults', default=True, help='Collects the default files.', required=False) - collection.add_argument('--file', '-f', type=str, nargs=1, help='Specify specific file to collect.', required=False) - collection.add_argument('--directory', '-d', metavar='/path/to/directory/', type=str, help='Specify a specific directory to collect.', required=False) - parser.add_argument('--quiet', '-q', action='store_true', help='Do not print verbose messages.', required=False) + parser.add_argument("--api-username", "-u", + help="Provide the encrypted string for the API Username", required=True) + parser.add_argument("--api-password", "-p", + help="Provide the encrypted string for the API Password", required=True) + collection.add_argument("--defaults", default=True, + help="Collects the default files.", required=False) + collection.add_argument("--file", "-f", type=str, nargs=1, + help="Specify specific file to collect.", required=False) + collection.add_argument("--directory", "-d", metavar="/path/to/directory/", type=str, + help="Specify a specific directory to collect.", required=False) + parser.add_argument("--quiet", "-q", action="store_true", + help="Do not print verbose messages.", required=False) args = parser.parse_known_args() args = args[0] - # args = parser.parse_known_args() - # print('args: {}'.format(args)) - # # args = args[0] + print("Argparse args: {}".format(args)) # sys.exit(0) - print('Argparse args: {}'.format(args)) - if len(sys.argv) > 1: if args.file: upload_items = [] @@ -269,13 +311,21 @@ def main(): elif args.directory: upload_items = (args.directory).strip() elif args.defaults: - upload_items = ['/private/var/log/jamf.log', '/private/var/log/install.log', '/private/var/log/system.log', '/private/var/log/jamf_RecoveryAgent.log', '/private/var/log/jamf_ReliableEnrollment.log', '/private/var/log/32bitApps_inventory.log' ] + upload_items = [ + "/private/var/log/jamf.log", + "/private/var/log/install.log", + "/private/var/log/system.log", + "/private/var/log/jamf_RecoveryAgent.log", + "/private/var/log/jamf_ReliableEnrollment.log", + "/private/var/log/32bitApps_inventory.log", + "/opt/ManagedFrameworks/EA_History.log" + ] # Setup databases that we want to collect info from db_kext = {} database_items = [] - db_kext['database'] = '/var/db/SystemPolicyConfiguration/KextPolicy' - db_kext['tables'] = [ 'kext_policy_mdm', 'kext_policy' ] + db_kext["database"] = "/var/db/SystemPolicyConfiguration/KextPolicy" + db_kext["tables"] = [ "kext_policy_mdm", "kext_policy" ] database_items.append(db_kext) if args.quiet: @@ -289,98 +339,126 @@ def main(): ################################################## # Define Variables - jamf_plist = '/Library/Preferences/com.jamfsoftware.jamf.plist' - jps_api_user = (DecryptString((args.api_username).strip(), '', '')).strip().decode() - jps_api_password = (DecryptString((args.api_password).strip(), '', '')).strip().decode() - jps_credentials = ( base64.b64encode( "{}:{}".format(jps_api_user, jps_api_password).encode() ) ).decode() + jamf_plist = "/Library/Preferences/com.jamfsoftware.jamf.plist" + jps_api_user = ( + DecryptString( + (args.api_username).strip(), + "", + "" + )).strip().decode() + jps_api_password = ( + DecryptString( + (args.api_password).strip(), + "", + "") + ).strip().decode() + jps_credentials = ( + base64.b64encode( + "{}:{}".format(jps_api_user, jps_api_password).encode() + )).decode() time_stamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") - archive_file = '/private/tmp/{}_logs'.format(time_stamp) + archive_file = "/private/tmp/{}_logs".format(time_stamp) archive_max_size = 40000000 # Get the systems' Jamf Pro Server if os.path.exists(jamf_plist): jamf_plist_contents = plistReader(jamf_plist, verbose) - jps_url = jamf_plist_contents['jss_url'] + jps_url = jamf_plist_contents["jss_url"] if verbose: - print('Jamf Pro Server URL: {}'.format(jps_url)) + print("Jamf Pro Server URL: {}".format(jps_url)) else: - print('ERROR: Missing the Jamf Pro configuration file!') + print("ERROR: Missing the Jamf Pro configuration file!") sys.exit(1) # Get the system's UUID - hw_UUID = get_system('uuid') + hw_UUID = get_system("uuid") if verbose: - print('System UUID: {}'.format(hw_UUID)) + print("System UUID: {}".format(hw_UUID)) ################################################## # Bits staged... if verbose: - print('Requested files: {}'.format(upload_items)) + print("Requested files: {}".format(upload_items)) if database_items: - print('Requested databases: {}'.format(database_items)) + print("Requested databases: {}".format(database_items)) if args.directory: if os.path.exists(upload_items): parent_directory = os.path.abspath(os.path.join(upload_items, os.pardir)) - shutil.make_archive(archive_file, 'zip', parent_directory, upload_items ) + shutil.make_archive(archive_file, "zip", parent_directory, upload_items ) else: - print('ERROR: Unable to locate the provided directory!') + print("ERROR: Unable to locate the provided directory!") sys.exit(4) else: - zip_file = zipfile.ZipFile('{}.zip'.format(archive_file), 'w') + zip_file = zipfile.ZipFile("{}.zip".format(archive_file), "w") for upload_item in upload_items: if verbose: - print('Archiving file: {}'.format(os.path.abspath(upload_item))) + print("Archiving file: {}".format(os.path.abspath(upload_item))) if os.path.exists(upload_item): zip_file.write(os.path.abspath(upload_item), compress_type=zipfile.ZIP_DEFLATED) else: - print('WARNING: Unable to locate the specified file!') + print("WARNING: Unable to locate the specified file!") for database_item in database_items: - if os.path.exists(database_item['database']): + if os.path.exists(database_item["database"]): if verbose: - print('Archiving tables from database: {}'.format(os.path.abspath(database_item['database']))) - for table in database_item['tables']: + print( + "Archiving tables from database: {}".format( + os.path.abspath(database_item["database"]))) + for table in database_item["tables"]: if verbose: - print('Creating csv and archiving table: {}'.format(table)) - file_name = dbTableWriter(database_item['database'], table) + print("Creating csv and archiving table: {}".format(table)) + file_name = dbTableWriter(database_item["database"], table) zip_file.write(os.path.abspath(file_name), compress_type=zipfile.ZIP_DEFLATED) else: - print('WARNING: Unable to locate the specified file!') + print("WARNING: Unable to locate the specified file!") zip_file.close() - archive_size = os.path.getsize('{}.zip'.format(archive_file)) + archive_size = os.path.getsize("{}.zip".format(archive_file)) if verbose: - print('Archive name: {}.zip'.format(archive_file)) - print('Archive size: {}'.format(archive_size)) + print("Archive name: {}.zip".format(archive_file)) + print("Archive size: {}".format(archive_size)) if archive_size > archive_max_size: - print('Aborting: File size is larger than allowed!') + print("Aborting: File size is larger than allowed!") sys.exit(2) # Query the API to get the computer ID - status_code, json_data = apiGET(jps_url=jps_url, jps_credentials=jps_credentials, endpoint='/computers/udid/{uuid}'.format(uuid=hw_UUID), verbose=verbose) + status_code, json_data = apiGET( + jps_url=jps_url, + jps_credentials=jps_credentials, + endpoint="/computers/udid/{uuid}".format(uuid=hw_UUID), + verbose=verbose + ) if int(status_code) == 200: - computer_id = json_data.get('computer').get('general').get('id') + computer_id = json_data.get("computer").get("general").get("id") if verbose: - print('Computer ID: {}'.format(computer_id)) + print("Computer ID: {}".format(computer_id)) else: - print('ERROR: Failed to retrieve devices\' computer ID!') + print("ERROR: Failed to retrieve devices\' computer ID!") sys.exit(5) # Upload file via the API - status_code, content = apiPOST(jps_url=jps_url, jps_credentials=jps_credentials, endpoint='/fileuploads/computers/id/{id}'.format(id=computer_id), file_to_upload='{}.zip'.format(archive_file), archive_size=archive_size, verbose=verbose) + status_code, content = apiPOST( + jps_url=jps_url, + jps_credentials=jps_credentials, + endpoint="/fileuploads/computers/id/{id}".format(id=computer_id), + file_to_upload="{}.zip".format(archive_file), + archive_size=archive_size, + verbose=verbose + ) if int(status_code) == 204: if content: if verbose: - print('Response: {}'.format(content)) - print('Upload complete!') + print("Response: {}".format(content)) + print("Upload complete!") else: - print('ERROR: Failed to upload file to the JPS!') + print("ERROR: Failed to upload file to the JPS!") sys.exit(6) + if __name__ == "__main__": main()