v1.5.0 = Switched to requests and cleanup

+ Switched to using `requests` instead of `urllib`
+ Cleaned up formatting, etc.
+ Added additional file to collect
This commit is contained in:
Zack T
2021-12-03 17:00:42 -07:00
parent 7b852daad8
commit 84cc6d848a

View File

@@ -1,14 +1,13 @@
#!/opt/ManagedFrameworks/Python.framework/Versions/Current/bin/python3
"""
###################################################################################################
# Script Name: jamf_CollectDiagnostics.py
# By: Zack Thompson / Created: 8/22/2019
# Version: 1.4.1 / Updated: 12/03/2021 By: ZT
#
# Description: This script allows you to upload a compressed zip of specified files to a
# computers' inventory record.
#
###################################################################################################
Script Name: jamf_CollectDiagnostics.py
By: Zack Thompson / Created: 8/22/2019
Version: 1.5.0 / Updated: 12/03/2021 By: ZT
Description: This script allows you to upload a compressed
zip of specified files to a computers' inventory record.
"""
import argparse
@@ -20,20 +19,23 @@ import json
import objc
import os
import plistlib
import requests
import shutil
import sqlite3
import subprocess
import sys
import zipfile
from urllib import request as urllib
# Jamf Function to obfuscate credentials.
def DecryptString(inputString, salt, passphrase):
"""Usage: >>> DecryptString("Encrypted String", "Salt", "Passphrase")"""
result = runUtility('echo \'{inputString}\' | /usr/bin/openssl enc -aes256 -d -a -A -S \'{salt}\' -k \'{passphrase}\''.format(salt=salt, passphrase=passphrase, inputString=inputString))
return result
"""
Usage: >>> DecryptString("Encrypted String", "Salt", "Passphrase")
"""
return runUtility(
"echo '{inputString}' | /usr/bin/openssl enc -aes256 -d -a -A -S '{salt}' -k '{passphrase}'".format(
salt=salt, passphrase=passphrase, inputString=inputString))
def runUtility(command):
@@ -47,8 +49,8 @@ def runUtility(command):
try:
process = subprocess.check_output(command, shell=True)
except subprocess.CalledProcessError as error:
print ('Error code: {}'.format(error.returncode))
print ('Error: {}'.format(error))
print ("Error code: {}".format(error.returncode))
print ("Error: {}".format(error))
process = "error"
return process
@@ -64,30 +66,30 @@ def plistReader(plist_file_path, verbose):
if os.path.exists(plist_file_path):
if verbose:
print('Opening plist: {}'.format(plist_file_path))
print("Opening plist: {}".format(plist_file_path))
try:
# Get the contents of the plist file.
with open(plist_file_path, "rb") as plist_file:
plist_Contents = plistlib.load(plist_file)
except Exception:
file_cmd = '/usr/bin/file --mime-encoding {}'.format(plist_file_path)
file_cmd = "/usr/bin/file --mime-encoding {}".format(plist_file_path)
file_response = runUtility(file_cmd)
file_type = file_response.split(': ')[1].strip()
file_type = file_response.split(": ")[1].strip()
if verbose:
print('File Type: {}'.format(file_type))
print("File Type: {}".format(file_type))
if file_type == 'binary':
if file_type == "binary":
if verbose:
print('Converting plist...')
plutil_cmd = '/usr/bin/plutil -convert xml1 {}'.format(plist_file_path)
print("Converting plist...")
plutil_cmd = "/usr/bin/plutil -convert xml1 {}".format(plist_file_path)
plutil_response = runUtility(plutil_cmd)
# Get the contents of the plist file.
with open(plist_file_path, "rb") as plist_file:
plist_Contents = plistlib.load(plist_file)
else:
print('ERROR: Unable to locate the specified plist file!')
print("ERROR: Unable to locate the specified plist file!")
sys.exit(3)
return plist_Contents
@@ -103,7 +105,7 @@ def dbTableWriter(database, table):
file: Returns the abspath of the file.
"""
file_name = '/private/tmp/{}.csv'.format(table)
file_name = "/private/tmp/{}.csv".format(table)
# Setup database connection
db_connect = sqlite3.connect(database)
@@ -113,7 +115,7 @@ def dbTableWriter(database, table):
database.execute("select * from {}".format(table))
# Write to file
with open(file_name,'w') as table_csv:
with open(file_name,"w") as table_csv:
csv_out = csv.writer(table_csv)
# Write header
csv_out.writerow([description[0] for description in database.description])
@@ -124,7 +126,8 @@ def dbTableWriter(database, table):
return os.path.abspath(file_name)
# Credit to (Mikey Mike/Froger/Pudquick/etc) for this logic: https://gist.github.com/pudquick/c7dd1262bd81a32663f0
# Credit to (Mikey Mike/Froger/Pudquick/etc) for this logic:
# https://gist.github.com/pudquick/c7dd1262bd81a32663f0
def get_system(attribute):
"""A helper function to get specific system attributes.
Args:
@@ -133,12 +136,19 @@ def get_system(attribute):
stdout: The system attribute value.
"""
IOKit_bundle = NSBundle.bundleWithIdentifier_('com.apple.framework.IOKit')
functions = [("IOServiceGetMatchingService", b"II@"), ("IOServiceMatching", b"@*"), ("IORegistryEntryCreateCFProperty", b"@I@@I"),]
IOKit_bundle = NSBundle.bundleWithIdentifier_("com.apple.framework.IOKit")
functions = [
( "IOServiceGetMatchingService", b"II@" ),
( "IOServiceMatching", b"@*" ),
( "IORegistryEntryCreateCFProperty", b"@I@@I" )
]
objc.loadBundleFunctions(IOKit_bundle, globals(), functions)
def io_key(keyname):
return IORegistryEntryCreateCFProperty(int(IOServiceGetMatchingService(0, IOServiceMatching("IOPlatformExpertDevice".encode("utf-8")))), keyname, None, 0)
return IORegistryEntryCreateCFProperty(
int( IOServiceGetMatchingService(
0, IOServiceMatching(
"IOPlatformExpertDevice".encode("utf-8")))), keyname, None, 0)
def get_hardware_uuid():
return io_key("IOPlatformUUID")
@@ -147,18 +157,21 @@ def get_system(attribute):
# return io_key("IOPlatformSerialNumber".encode("utf-8"))
# def get_board_id():
# return str(io_key("board-id".encode("utf-8"))).rstrip('\x00')
# return str(io_key("board-id".encode("utf-8"))).rstrip("\x00")
options = {'uuid' : get_hardware_uuid #,
# 'serial' : get_hardware_serial,
# 'boardID' : get_board_id
options = {"uuid" : get_hardware_uuid #,
# "serial" : get_hardware_serial,
# "boardID" : get_board_id
}
return options[attribute]()
def apiGET(**parameters):
"""A helper function that performs a GET to the Jamf API. Attempts to first use the python urllib2 library, but if that fails, falls back to the system curl.
"""A helper function that performs a GET to the Jamf API.
Attempts to first use the Python `requests` library,
but if that fails, falls back to the system curl.
Args:
jps_url: Jamf Pro Server URL
jps_credentials: base64 encoded credentials
@@ -167,35 +180,45 @@ def apiGET(**parameters):
stdout: json data from the response contents
"""
url = parameters.get('jps_url') + 'JSSResource' + parameters.get('endpoint')
if parameters.get('verbose'):
print('API URL: {}'.format(url))
url = "{}JSSResource{}".format(parameters.get("jps_url"), parameters.get("endpoint"))
if parameters.get("verbose"):
print("API URL: {}".format(url))
try:
if parameters.get('verbose'):
print('Trying urllib...')
headers = {'Accept': 'application/json', 'Authorization': 'Basic ' + parameters.get('jps_credentials')}
request = urllib.Request(url, headers=headers)
response = urllib.urlopen(request)
statusCode = response.code
json_response = json.loads(response.read())
if parameters.get("verbose"):
print("Trying `requests`...")
headers = {
"Accept": "application/json",
"Authorization": "Basic {}".format(parameters.get("jps_credentials"))
}
response = requests.get(url, headers=headers)
statusCode = response.status_code
json_response = response.json()
except Exception:
# If urllib fails, resort to using curl.
# If `requests` fails, resort to using curl.
if parameters.get("verbose"):
print("Trying curl...")
if parameters.get('verbose'):
print('Trying curl...')
# Build the command.
curl_cmd = '/usr/bin/curl --silent --show-error --no-buffer --fail --write-out "statusCode:%{{http_code}}" --location --header "Accept: application/json" --header "Authorization: Basic {jps_credentials}" --url {url} --request GET'.format(jps_credentials=parameters.get('jps_credentials'), url=url)
curl_cmd = "/usr/bin/curl --silent --show-error --no-buffer --fail --write-out \
'statusCode:%{{http_code}}' --location --header 'Accept: application/json' \
--header 'Authorization: Basic {jps_credentials}' --url {url} --request GET".format(
jps_credentials=parameters.get("jps_credentials"), url=url)
response = runUtility(curl_cmd)
json_content, statusCode = response.split(b'statusCode:')
json_content, statusCode = response.split(b"statusCode:")
json_response = json.loads(json_content)
return statusCode, json_response
def apiPOST(**parameters):
"""A helper function that performs a POST to the Jamf API. Attempts to first use the python urllib2 library, but if that fails, falls back to the system curl.
"""A helper function that performs a POST to the Jamf API.
Attempts to first use the python `requests` library,
but if that fails, falls back to the system curl.
Args:
jps_url: Jamf Pro Server URL
jps_credentials: base64 encoded credentials
@@ -206,62 +229,81 @@ def apiPOST(**parameters):
stdout: the response contents
"""
url = parameters.get('jps_url') + 'JSSResource' + parameters.get('endpoint')
if parameters.get('verbose'):
print('API URL: {}'.format(url))
if parameters.get('verbose'):
print('Uploading file: {}'.format(parameters.get('file_to_upload')))
url = "{}JSSResource{}".format(parameters.get("jps_url"), parameters.get("endpoint"))
if parameters.get("verbose"):
print("API URL: {}".format(url))
print("Uploading file: {}".format(parameters.get("file_to_upload")))
##### Unable to quite get urllib to work at the moment...
# try:
# if parameters.get('verbose'): print('Trying urllib...')
# basename = os.path.basename(parameters.get('file_to_upload'))
# # headers = {'Content-Disposition': 'name="{0}"'.format(parameters.get('file_to_upload')), 'Authorization': 'Basic ' + parameters.get('jps_credentials'), 'Content-Type': 'multipart/form-data', 'Content-Length': parameters.get('archive_size')}
# headers = {'Authorization': 'Basic ' + parameters.get('jps_credentials'), "Content-type" : "application/zip", 'Content-Length': parameters.get('archive_size')}
# request = urllib.Request(url, open(parameters.get('file_to_upload'), "rb"), headers=headers)
# response = urllib.urlopen(request)
# statusCode = response.code
# content = response.read()
##### Unable to get requests nor urllib to work...
# if parameters.get("verbose"):
# print("Trying `requests`...")
# files = {
# "name": (None, open(parameters.get("file_to_upload"), "rb"))
# }
# body, content_type = requests.models.RequestEncodingMixin._encode_files(files, {})
# headers = {
# "Authorization": "Basic {}".format(parameters.get("jps_credentials")),
# "Content-Type": content_type
# }
# response = requests.post(url, data=body, headers=headers)
# statusCode = response.status_code
# content = response.text
# except Exception:
# If urllib fails, resort to using curl.
if parameters.get('verbose'):
print('Trying curl...')
if parameters.get("verbose"):
print("Trying curl...")
# Build the command.
curl_cmd = '/usr/bin/curl --silent --show-error --no-buffer --fail --write-out "statusCode:%{{http_code}}" --location --header "Accept: application/json" --header "Authorization: Basic {jps_credentials}" --url {url} --request POST --form name=@{file_to_upload}'.format(jps_credentials=parameters.get('jps_credentials'), url=url, file_to_upload=parameters.get('file_to_upload'))
curl_cmd = "/usr/bin/curl --silent --show-error --no-buffer --fail --write-out \
'statusCode:%{{http_code}}' --location --header 'Accept: application/json' \
--header 'Authorization: Basic {jps_credentials}' --url {url} --request POST \
--form name=@{file_to_upload}".format(
jps_credentials=parameters.get("jps_credentials"), url=url,
file_to_upload=parameters.get("file_to_upload")
)
response = runUtility(curl_cmd)
content, statusCode = response.split(b'statusCode:')
content, statusCode = response.split(b"statusCode:")
return statusCode, content
def main():
# print('All calling args: {}'.format(sys.argv))
# print("All calling args: {}".format(sys.argv))
##################################################
# Define Script Parameters
parser = argparse.ArgumentParser(description="This script allows you to upload a compressed zip of specified files to a computers' inventory record")
parser = argparse.ArgumentParser(
description="This script allows you to upload a compressed zip \
of specified files to a computers' inventory record")
collection = parser.add_mutually_exclusive_group()
parser.add_argument('--api-username', '-u', help='Provide the encrypted string for the API Username', required=True)
parser.add_argument('--api-password', '-p', help='Provide the encrypted string for the API Password', required=True)
collection.add_argument('--defaults', default=True, help='Collects the default files.', required=False)
collection.add_argument('--file', '-f', type=str, nargs=1, help='Specify specific file to collect.', required=False)
collection.add_argument('--directory', '-d', metavar='/path/to/directory/', type=str, help='Specify a specific directory to collect.', required=False)
parser.add_argument('--quiet', '-q', action='store_true', help='Do not print verbose messages.', required=False)
parser.add_argument("--api-username", "-u",
help="Provide the encrypted string for the API Username", required=True)
parser.add_argument("--api-password", "-p",
help="Provide the encrypted string for the API Password", required=True)
collection.add_argument("--defaults", default=True,
help="Collects the default files.", required=False)
collection.add_argument("--file", "-f", type=str, nargs=1,
help="Specify specific file to collect.", required=False)
collection.add_argument("--directory", "-d", metavar="/path/to/directory/", type=str,
help="Specify a specific directory to collect.", required=False)
parser.add_argument("--quiet", "-q", action="store_true",
help="Do not print verbose messages.", required=False)
args = parser.parse_known_args()
args = args[0]
# args = parser.parse_known_args()
# print('args: {}'.format(args))
# # args = args[0]
print("Argparse args: {}".format(args))
# sys.exit(0)
print('Argparse args: {}'.format(args))
if len(sys.argv) > 1:
if args.file:
upload_items = []
@@ -269,13 +311,21 @@ def main():
elif args.directory:
upload_items = (args.directory).strip()
elif args.defaults:
upload_items = ['/private/var/log/jamf.log', '/private/var/log/install.log', '/private/var/log/system.log', '/private/var/log/jamf_RecoveryAgent.log', '/private/var/log/jamf_ReliableEnrollment.log', '/private/var/log/32bitApps_inventory.log' ]
upload_items = [
"/private/var/log/jamf.log",
"/private/var/log/install.log",
"/private/var/log/system.log",
"/private/var/log/jamf_RecoveryAgent.log",
"/private/var/log/jamf_ReliableEnrollment.log",
"/private/var/log/32bitApps_inventory.log",
"/opt/ManagedFrameworks/EA_History.log"
]
# Setup databases that we want to collect info from
db_kext = {}
database_items = []
db_kext['database'] = '/var/db/SystemPolicyConfiguration/KextPolicy'
db_kext['tables'] = [ 'kext_policy_mdm', 'kext_policy' ]
db_kext["database"] = "/var/db/SystemPolicyConfiguration/KextPolicy"
db_kext["tables"] = [ "kext_policy_mdm", "kext_policy" ]
database_items.append(db_kext)
if args.quiet:
@@ -289,98 +339,126 @@ def main():
##################################################
# Define Variables
jamf_plist = '/Library/Preferences/com.jamfsoftware.jamf.plist'
jps_api_user = (DecryptString((args.api_username).strip(), '<SALT>', '<PASSPHRASE>')).strip().decode()
jps_api_password = (DecryptString((args.api_password).strip(), '<SALT>', '<PASSPHRASE>')).strip().decode()
jps_credentials = ( base64.b64encode( "{}:{}".format(jps_api_user, jps_api_password).encode() ) ).decode()
jamf_plist = "/Library/Preferences/com.jamfsoftware.jamf.plist"
jps_api_user = (
DecryptString(
(args.api_username).strip(),
"<SALT>",
"<PASSPHRASE>"
)).strip().decode()
jps_api_password = (
DecryptString(
(args.api_password).strip(),
"<SALT>",
"<PASSPHRASE>")
).strip().decode()
jps_credentials = (
base64.b64encode(
"{}:{}".format(jps_api_user, jps_api_password).encode()
)).decode()
time_stamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
archive_file = '/private/tmp/{}_logs'.format(time_stamp)
archive_file = "/private/tmp/{}_logs".format(time_stamp)
archive_max_size = 40000000
# Get the systems' Jamf Pro Server
if os.path.exists(jamf_plist):
jamf_plist_contents = plistReader(jamf_plist, verbose)
jps_url = jamf_plist_contents['jss_url']
jps_url = jamf_plist_contents["jss_url"]
if verbose:
print('Jamf Pro Server URL: {}'.format(jps_url))
print("Jamf Pro Server URL: {}".format(jps_url))
else:
print('ERROR: Missing the Jamf Pro configuration file!')
print("ERROR: Missing the Jamf Pro configuration file!")
sys.exit(1)
# Get the system's UUID
hw_UUID = get_system('uuid')
hw_UUID = get_system("uuid")
if verbose:
print('System UUID: {}'.format(hw_UUID))
print("System UUID: {}".format(hw_UUID))
##################################################
# Bits staged...
if verbose:
print('Requested files: {}'.format(upload_items))
print("Requested files: {}".format(upload_items))
if database_items:
print('Requested databases: {}'.format(database_items))
print("Requested databases: {}".format(database_items))
if args.directory:
if os.path.exists(upload_items):
parent_directory = os.path.abspath(os.path.join(upload_items, os.pardir))
shutil.make_archive(archive_file, 'zip', parent_directory, upload_items )
shutil.make_archive(archive_file, "zip", parent_directory, upload_items )
else:
print('ERROR: Unable to locate the provided directory!')
print("ERROR: Unable to locate the provided directory!")
sys.exit(4)
else:
zip_file = zipfile.ZipFile('{}.zip'.format(archive_file), 'w')
zip_file = zipfile.ZipFile("{}.zip".format(archive_file), "w")
for upload_item in upload_items:
if verbose:
print('Archiving file: {}'.format(os.path.abspath(upload_item)))
print("Archiving file: {}".format(os.path.abspath(upload_item)))
if os.path.exists(upload_item):
zip_file.write(os.path.abspath(upload_item), compress_type=zipfile.ZIP_DEFLATED)
else:
print('WARNING: Unable to locate the specified file!')
print("WARNING: Unable to locate the specified file!")
for database_item in database_items:
if os.path.exists(database_item['database']):
if os.path.exists(database_item["database"]):
if verbose:
print('Archiving tables from database: {}'.format(os.path.abspath(database_item['database'])))
for table in database_item['tables']:
print(
"Archiving tables from database: {}".format(
os.path.abspath(database_item["database"])))
for table in database_item["tables"]:
if verbose:
print('Creating csv and archiving table: {}'.format(table))
file_name = dbTableWriter(database_item['database'], table)
print("Creating csv and archiving table: {}".format(table))
file_name = dbTableWriter(database_item["database"], table)
zip_file.write(os.path.abspath(file_name), compress_type=zipfile.ZIP_DEFLATED)
else:
print('WARNING: Unable to locate the specified file!')
print("WARNING: Unable to locate the specified file!")
zip_file.close()
archive_size = os.path.getsize('{}.zip'.format(archive_file))
archive_size = os.path.getsize("{}.zip".format(archive_file))
if verbose:
print('Archive name: {}.zip'.format(archive_file))
print('Archive size: {}'.format(archive_size))
print("Archive name: {}.zip".format(archive_file))
print("Archive size: {}".format(archive_size))
if archive_size > archive_max_size:
print('Aborting: File size is larger than allowed!')
print("Aborting: File size is larger than allowed!")
sys.exit(2)
# Query the API to get the computer ID
status_code, json_data = apiGET(jps_url=jps_url, jps_credentials=jps_credentials, endpoint='/computers/udid/{uuid}'.format(uuid=hw_UUID), verbose=verbose)
status_code, json_data = apiGET(
jps_url=jps_url,
jps_credentials=jps_credentials,
endpoint="/computers/udid/{uuid}".format(uuid=hw_UUID),
verbose=verbose
)
if int(status_code) == 200:
computer_id = json_data.get('computer').get('general').get('id')
computer_id = json_data.get("computer").get("general").get("id")
if verbose:
print('Computer ID: {}'.format(computer_id))
print("Computer ID: {}".format(computer_id))
else:
print('ERROR: Failed to retrieve devices\' computer ID!')
print("ERROR: Failed to retrieve devices\' computer ID!")
sys.exit(5)
# Upload file via the API
status_code, content = apiPOST(jps_url=jps_url, jps_credentials=jps_credentials, endpoint='/fileuploads/computers/id/{id}'.format(id=computer_id), file_to_upload='{}.zip'.format(archive_file), archive_size=archive_size, verbose=verbose)
status_code, content = apiPOST(
jps_url=jps_url,
jps_credentials=jps_credentials,
endpoint="/fileuploads/computers/id/{id}".format(id=computer_id),
file_to_upload="{}.zip".format(archive_file),
archive_size=archive_size,
verbose=verbose
)
if int(status_code) == 204:
if content:
if verbose:
print('Response: {}'.format(content))
print('Upload complete!')
print("Response: {}".format(content))
print("Upload complete!")
else:
print('ERROR: Failed to upload file to the JPS!')
print("ERROR: Failed to upload file to the JPS!")
sys.exit(6)
if __name__ == "__main__":
main()