v1.6.0 = Pass multiple args to & removed mutual exclusions from switches

+ Added the ability to pass multiple files and/or directories to be collected
+ Removed mutual exclusion from the defaults, file, and directory switch
+ Added logic to parse switches/values passed from Jamf Pro when using multiple values in each passed Script Parameter
+ Moved archiving logic to a function/context manager
+ Using built-in plistlib to read plists now
+ Standardizing functions
This commit is contained in:
Zack T
2022-05-13 18:57:40 -07:00
parent b148edbf0c
commit 75d80ecdfd

View File

@@ -1,9 +1,9 @@
#!/opt/ManagedFrameworks/Python.framework/Versions/Current/bin/python3
"""
Script Name: jamf_CollectDiagnostics.py
Script Name: Collect-Diagnostics.py
By: Zack Thompson / Created: 8/22/2019
Version: 1.5.0 / Updated: 12/03/2021 By: ZT
Version: 1.6.0 / Updated: 5/2/2022 By: ZT
Description: This script allows you to upload a compressed
zip of specified files to a computers' inventory record.
@@ -14,18 +14,20 @@ import argparse
import base64
import csv
import datetime
from Foundation import NSBundle
import json
import objc
import os
import plistlib
import requests
import shutil
import re
import shlex
import sqlite3
import subprocess
import sys
import zipfile
from Foundation import NSBundle, NSString
import objc
import requests
# Jamf Function to obfuscate credentials.
def DecryptString(inputString, salt, passphrase):
@@ -33,66 +35,52 @@ def DecryptString(inputString, salt, passphrase):
Usage: >>> DecryptString("Encrypted String", "Salt", "Passphrase")
"""
return runUtility(
"echo '{inputString}' | /usr/bin/openssl enc -aes256 -d -a -A -S '{salt}' -k '{passphrase}'".format(
salt=salt, passphrase=passphrase, inputString=inputString))
return execute_process(
"/usr/bin/openssl enc -aes256 -d -a -A -S '{salt}' -k '{passphrase}'".format(
salt=salt, passphrase=passphrase), input=inputString)["stdout"]
def runUtility(command):
"""A helper function for subprocess.
def execute_process(command, input=None):
"""
A helper function for subprocess.
Args:
command: String containing the commands and arguments that will be passed to a shell.
command (str): The command line level syntax that would be written in a
shell script or a terminal window
Returns:
stdout: output of the command
dict: Results in a dictionary
"""
try:
process = subprocess.check_output(command, shell=True)
except subprocess.CalledProcessError as error:
print ("Error code: {}".format(error.returncode))
print ("Error: {}".format(error))
process = "error"
# Validate that command is not a string
if not isinstance(command, str):
raise TypeError("Command must be a str type")
return process
# Format the command
command = shlex.split(command)
# Run the command
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False,
universal_newlines=True
)
def plistReader(plist_file_path, verbose):
"""A helper function to get the contents of a Property List.
Args:
plist_file_path: A .plist file to read in.
Returns:
stdout: Returns the contents of the plist file.
"""
if input:
(stdout, stderr) = process.communicate(input=input)
if os.path.exists(plist_file_path):
if verbose:
print("Opening plist: {}".format(plist_file_path))
try:
# Get the contents of the plist file.
with open(plist_file_path, "rb") as plist_file:
plist_Contents = plistlib.load(plist_file)
except Exception:
file_cmd = "/usr/bin/file --mime-encoding {}".format(plist_file_path)
file_response = runUtility(file_cmd)
file_type = file_response.split(": ")[1].strip()
if verbose:
print("File Type: {}".format(file_type))
if file_type == "binary":
if verbose:
print("Converting plist...")
plutil_cmd = "/usr/bin/plutil -convert xml1 {}".format(plist_file_path)
plutil_response = runUtility(plutil_cmd)
# Get the contents of the plist file.
with open(plist_file_path, "rb") as plist_file:
plist_Contents = plistlib.load(plist_file)
else:
print("ERROR: Unable to locate the specified plist file!")
sys.exit(3)
(stdout, stderr) = process.communicate()
return plist_Contents
return {
"stdout": (stdout).strip(),
"stderr": (stderr).strip() if stderr != None else None,
"exitcode": process.returncode,
"success": True if process.returncode == 0 else False
}
# Modified from: https://stackoverflow.com/a/36211470
@@ -126,46 +114,86 @@ def dbTableWriter(database, table):
return os.path.abspath(file_name)
# Credit to (Mikey Mike/Froger/Pudquick/etc) for this logic:
# https://gist.github.com/pudquick/c7dd1262bd81a32663f0
def get_system(attribute):
"""A helper function to get specific system attributes.
def get_system_info():
"""
A helper function to get specific system attributes.
Credit: Mikey Mike/Froger/Pudquick/etc
Source: https://gist.github.com/pudquick/c7dd1262bd81a32663f0
Notes: Modified from source
Args:
type: The system attribute desired.
attribute: The system attribute desired.
Returns:
stdout: The system attribute value.
"""
IOKit_bundle = NSBundle.bundleWithIdentifier_("com.apple.framework.IOKit")
functions = [
( "IOServiceGetMatchingService", b"II@" ),
( "IOServiceMatching", b"@*" ),
( "IORegistryEntryCreateCFProperty", b"@I@@I" )
("IORegistryEntryCreateCFProperty", b"@I@@I"),
("IOServiceGetMatchingService", b"II@"),
("IOServiceMatching", b"@*")
]
objc.loadBundleFunctions(IOKit_bundle, globals(), functions)
def io_key(keyname):
return IORegistryEntryCreateCFProperty(
int( IOServiceGetMatchingService(
0, IOServiceMatching(
"IOPlatformExpertDevice".encode("utf-8")))), keyname, None, 0)
def io_key(key_name, service_name="IOPlatformExpertDevice"):
service = IOServiceMatching(service_name.encode("utf-8"))
key = NSString.stringWithString_(key_name)
return IORegistryEntryCreateCFProperty(IOServiceGetMatchingService(0, service), key, None, 0)
def get_hardware_uuid():
return io_key("IOPlatformUUID")
# def get_hardware_serial():
# return io_key("IOPlatformSerialNumber".encode("utf-8"))
# return io_key("IOPlatformSerialNumber")
# def get_board_id():
# return str(io_key("board-id".encode("utf-8"))).rstrip("\x00")
# try:
# return bytes(io_key("board-id")).decode().rstrip("\x00")
# except TypeError:
# return ""
options = {"uuid" : get_hardware_uuid #,
# "serial" : get_hardware_serial,
# "boardID" : get_board_id
# def get_model_id():
# return bytes(io_key("model")).decode().rstrip("\x00")
# def lookup_model(lookup_code):
# xml = requests.get("https://support-sp.apple.com/sp/product?cc={}".format(lookup_code)).text
# try:
# tree = ElementTree.fromstringlist(xml)
# return tree.find(".//configCode").text
# except ElementTree.ParseError as err:
# print("Failed to retrieve model name: {}".format(err.strerror))
# return ""
# serial_number = get_hardware_serial()
# sn_length = len(serial_number)
# model = ""
# if sn_length == 10:
# results = execute_process("/usr/sbin/ioreg -arc IOPlatformDevice -k product-name")
# if results["success"]:
# plist_contents = plistlib.loads(results["stdout"].encode())
# model = plist_contents[0].get("product-name").decode().rstrip("\x00")
# elif sn_length == 12:
# model = lookup_model(serial_number[-4:])
# elif sn_length == 11:
# model = lookup_model(serial_number[-3:])
return {
# "serial_number": serial_number,
"uuid": get_hardware_uuid(),
# "board_id": get_board_id(),
# "model_id": get_model_id(),
# "model_friendly": model
}
return options[attribute]()
def apiGET(**parameters):
"""A helper function that performs a GET to the Jamf API.
@@ -196,7 +224,8 @@ def apiGET(**parameters):
statusCode = response.status_code
json_response = response.json()
except Exception:
except Exception as error:
print("Requests error:\n{}".format(error))
# If `requests` fails, resort to using curl.
if parameters.get("verbose"):
@@ -207,8 +236,8 @@ def apiGET(**parameters):
'statusCode:%{{http_code}}' --location --header 'Accept: application/json' \
--header 'Authorization: Basic {jps_credentials}' --url {url} --request GET".format(
jps_credentials=parameters.get("jps_credentials"), url=url)
response = runUtility(curl_cmd)
json_content, statusCode = response.split(b"statusCode:")
response = execute_process(curl_cmd)['stdout']
json_content, statusCode = response.split("statusCode:")
json_response = json.loads(json_content)
return statusCode, json_response
@@ -268,14 +297,67 @@ def apiPOST(**parameters):
file_to_upload=parameters.get("file_to_upload")
)
response = runUtility(curl_cmd)
content, statusCode = response.split(b"statusCode:")
response = execute_process(curl_cmd)['stdout']
content, statusCode = response.split("statusCode:")
return statusCode, content
def archiver(path, archive, mode="a", verbose=True):
"""A Context Manager for creating or modifying a compressed archive.
Args:
path (str): Path to a file or directory to include in the archive
archive (str): Path to the archive file; will be created if it does not exist
mode (str, optional): The mode that will be used to open the archive. Defaults to "a".
verbose (bool, optional): Print verbose messages. Defaults to True.
"""
if verbose:
print("Archiving: {}".format(os.path.abspath(path)))
if os.path.exists(path):
with zipfile.ZipFile(archive, 'a', zipfile.ZIP_DEFLATED) as zip_file:
if os.path.isdir(path):
for root, dirs, files in os.walk(path):
for file in files:
zip_file.write(
os.path.join(root, file),
os.path.relpath(
os.path.join(root, file),
os.path.join(path, '..')
)
)
else:
zip_file.write(os.path.abspath(path), compress_type=zipfile.ZIP_DEFLATED)
else:
print("WARNING: Unable to locate the specified file!")
def main():
# print("All calling args: {}".format(sys.argv))
# print("All calling args: {}\n".format(sys.argv))
parse_args = []
for arg in sys.argv:
if arg != "":
if re.match(r'.*("|\').*', arg):
parse_args.extend(shlex.split(arg))
else:
parse_args.append(arg)
# print("Parsed args: {}\n".format(parse_args))
##################################################
# Define Script Parameters
@@ -283,43 +365,48 @@ def main():
parser = argparse.ArgumentParser(
description="This script allows you to upload a compressed zip \
of specified files to a computers' inventory record")
collection = parser.add_mutually_exclusive_group()
parser.add_argument("--api-username", "-u",
help="Provide the encrypted string for the API Username", required=True)
parser.add_argument("--api-password", "-p",
help="Provide the encrypted string for the API Password", required=True)
collection.add_argument("--defaults", default=True,
parser.add_argument("--defaults", default=True,
help="Collects the default files.", required=False)
collection.add_argument("--file", "-f", type=str, nargs=1,
parser.add_argument("--file", "-f", type=str, nargs="*",
help="Specify specific file to collect.", required=False)
collection.add_argument("--directory", "-d", metavar="/path/to/directory/", type=str,
parser.add_argument("--directory", "-d", metavar="/path/to/directory/", type=str, nargs="*",
help="Specify a specific directory to collect.", required=False)
parser.add_argument("--quiet", "-q", action="store_true",
help="Do not print verbose messages.", required=False)
args = parser.parse_known_args()
args = parser.parse_known_args(args=parse_args)
args = args[0]
print("Argparse args: {}".format(args))
# sys.exit(0)
if len(sys.argv) > 1:
upload_items = []
if args.file:
upload_items = []
upload_items.append((args.file[0]).strip())
elif args.directory:
upload_items = (args.directory).strip()
elif args.defaults:
upload_items = [
"/private/var/log/jamf.log",
"/private/var/log/install.log",
"/private/var/log/system.log",
"/private/var/log/jamf_RecoveryAgent.log",
"/private/var/log/jamf_ReliableEnrollment.log",
"/private/var/log/32bitApps_inventory.log",
"/opt/ManagedFrameworks/EA_History.log"
for file in args.file:
upload_items.append((file).strip())
if args.directory:
for folder in args.directory:
upload_items.append((folder).strip())
if args.defaults:
upload_items.extend(
[
"/private/var/log/jamf.log",
"/private/var/log/install.log",
"/private/var/log/system.log",
"/private/var/log/jamf_RecoveryAgent.log",
"/private/var/log/jamf_ReliableEnrollment.log",
"/private/var/log/32bitApps_inventory.log",
"/opt/ManagedFrameworks/EA_History.log"
]
)
# Setup databases that we want to collect info from
db_kext = {}
@@ -345,25 +432,30 @@ def main():
(args.api_username).strip(),
"<SALT>",
"<PASSPHRASE>"
)).strip().decode()
)
).strip()
jps_api_password = (
DecryptString(
(args.api_password).strip(),
"<SALT>",
"<PASSPHRASE>")
).strip().decode()
)
).strip()
jps_credentials = (
base64.b64encode(
"{}:{}".format(jps_api_user, jps_api_password).encode()
)).decode()
time_stamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
archive_file = "/private/tmp/{}_logs".format(time_stamp)
archive_max_size = 40000000
archive_file = "/private/tmp/{}_logs.zip".format(time_stamp)
archive_max_size = 50000000 # 50MB
# Get the systems' Jamf Pro Server
if os.path.exists(jamf_plist):
jamf_plist_contents = plistReader(jamf_plist, verbose)
with open(jamf_plist, "rb") as plist:
jamf_plist_contents = plistlib.load(plist)
jps_url = jamf_plist_contents["jss_url"]
if verbose:
print("Jamf Pro Server URL: {}".format(jps_url))
else:
@@ -371,7 +463,7 @@ def main():
sys.exit(1)
# Get the system's UUID
hw_UUID = get_system("uuid")
hw_UUID = get_system_info().get("uuid")
if verbose:
print("System UUID: {}".format(hw_UUID))
@@ -383,41 +475,28 @@ def main():
if database_items:
print("Requested databases: {}".format(database_items))
if args.directory:
if os.path.exists(upload_items):
parent_directory = os.path.abspath(os.path.join(upload_items, os.pardir))
shutil.make_archive(archive_file, "zip", parent_directory, upload_items )
else:
print("ERROR: Unable to locate the provided directory!")
sys.exit(4)
else:
zip_file = zipfile.ZipFile("{}.zip".format(archive_file), "w")
for upload_item in upload_items:
for upload_item in upload_items:
archiver(upload_item, archive=archive_file, verbose=verbose)
for database_item in database_items:
if os.path.exists(database_item["database"]):
if verbose:
print("Archiving file: {}".format(os.path.abspath(upload_item)))
if os.path.exists(upload_item):
zip_file.write(os.path.abspath(upload_item), compress_type=zipfile.ZIP_DEFLATED)
else:
print("WARNING: Unable to locate the specified file!")
for database_item in database_items:
if os.path.exists(database_item["database"]):
print(
"Archiving tables from database: {}".format(
os.path.abspath(database_item["database"])))
for table in database_item["tables"]:
if verbose:
print(
"Archiving tables from database: {}".format(
os.path.abspath(database_item["database"])))
for table in database_item["tables"]:
if verbose:
print("Creating csv and archiving table: {}".format(table))
file_name = dbTableWriter(database_item["database"], table)
zip_file.write(os.path.abspath(file_name), compress_type=zipfile.ZIP_DEFLATED)
else:
print("WARNING: Unable to locate the specified file!")
print("Creating csv and archiving table: {}".format(table))
file_name = dbTableWriter(database_item["database"], table)
zip_file.close()
archiver(os.path.abspath(file_name), archive=archive_file, verbose=verbose)
else:
print("WARNING: Unable to locate the specified database!")
archive_size = os.path.getsize(archive_file)
archive_size = os.path.getsize("{}.zip".format(archive_file))
if verbose:
print("Archive name: {}.zip".format(archive_file))
print("Archive name: {}".format(archive_file))
print("Archive size: {}".format(archive_size))
if archive_size > archive_max_size:
@@ -445,7 +524,7 @@ def main():
jps_url=jps_url,
jps_credentials=jps_credentials,
endpoint="/fileuploads/computers/id/{id}".format(id=computer_id),
file_to_upload="{}.zip".format(archive_file),
file_to_upload=archive_file,
archive_size=archive_size,
verbose=verbose
)