#!/usr/bin/env python3 # filename: generate_guidance.py # description: Process a given baseline, and output guidance files import sys import plistlib import glob import os import yaml import re import argparse import subprocess import logging import tempfile import base64 import shutil import json import hashlib from datetime import date from xlwt import Workbook, easyxf from string import Template from itertools import groupby from uuid import uuid4 from zipfile import ZipFile class MacSecurityRule: def __init__( self, title, rule_id, severity, discussion, check, fix, cci, cce, nist_controls, nist_171, disa_stig, srg, sfr, cis, cmmc, indigo, custom_refs, odv, tags, result_value, mobileconfig, mobileconfig_info, customized, ): self.rule_title = title self.rule_id = rule_id self.rule_severity = severity self.rule_discussion = discussion self.rule_check = check self.rule_fix = fix self.rule_cci = cci self.rule_cce = cce self.rule_80053r5 = nist_controls self.rule_800171 = nist_171 self.rule_disa_stig = disa_stig self.rule_srg = srg self.rule_sfr = sfr self.rule_cis = cis self.rule_cmmc = cmmc self.rule_indigo = indigo self.rule_custom_refs = custom_refs self.rule_odv = odv self.rule_result_value = result_value self.rule_tags = tags self.rule_mobileconfig = mobileconfig self.rule_mobileconfig_info = mobileconfig_info self.rule_customized = customized def create_asciidoc(self, adoc_rule_template): """Pass an AsciiDoc template as file object to return formatted AsciiDOC""" rule_adoc = "" rule_adoc = adoc_rule_template.substitute( rule_title=self.rule_title, rule_id=self.rule_id, rule_severity=self.rule_severity, rule_discussion=self.rule_discussion, rule_check=self.rule_check, rule_fix=self.rule_fix, rule_cci=self.rule_cci, rule_80053r5=self.rule_80053r5, rule_disa_stig=self.rule_disa_stig, rule_cis=self.rule_cis, rule_cmmc=self.rule_cmmc, rule_indigo=self.rule_indigo, rule_srg=self.rule_srg, rule_result=self.rule_result_value, ) return rule_adoc def create_mobileconfig(self): pass def ulify(elements): string = "\n" for s in elements: string += "* " + str(s) + "\n" return string def group_ulify(elements): string = "\n * " for s in elements: string += str(s) + ", " return string[:-2] def group_ulify_comment(elements): string = "\n * " for s in elements: string += str(s) + ", " return string[:-2] def get_check_code(check_yaml): try: check_string = check_yaml.split("[source,bash]")[1] except: return check_yaml # print check_string check_code = re.search(r"----\n?(.*?)\n?----", check_string, re.DOTALL) # print(check_code.group(1).rstrip()) return check_code.group(1).strip() def quotify(fix_code): string = fix_code.replace("'", "'\"'\"'") string = string.replace("%", "%%") return string def get_fix_code(fix_yaml): fix_string = fix_yaml.split("[source,bash]")[1] fix_code = re.search(r"----\n?(.*?)\n?----", fix_string, re.DOTALL) return fix_code.group(1) def format_mobileconfig_fix(mobileconfig): """Takes a list of domains and setting from a mobileconfig, and reformats it for the output of the fix section of the guide.""" rulefix = "" for domain, settings in mobileconfig.items(): if domain == "com.apple.ManagedClient.preferences": rulefix = rulefix + ( f"NOTE: The following settings are in the ({domain}) payload. This payload requires the additional settings to be sub-payloads within, containing their defined payload types.\n\n" ) rulefix = rulefix + format_mobileconfig_fix(settings) else: rulefix = rulefix + ( f"Create a configuration profile containing the following keys in the ({domain}) payload type:\n\n" ) rulefix = rulefix + "[source,xml]\n----\n" for item in settings.items(): rulefix = rulefix + (f"{item[0]}\n") if type(item[1]) == bool: rulefix = rulefix + (f"<{str(item[1]).lower()}/>\n") elif type(item[1]) == list: rulefix = rulefix + "\n" for setting in item[1]: rulefix = rulefix + (f" {setting}\n") rulefix = rulefix + "\n" elif type(item[1]) == int: rulefix = rulefix + (f"{item[1]}\n") elif type(item[1]) == str: rulefix = rulefix + (f"{item[1]}\n") elif type(item[1]) == dict: rulefix = rulefix + "\n" for k, v in item[1].items(): if type(v) == dict: rulefix = rulefix + (f" {k}\n") rulefix = rulefix + (f" \n") for x, y in v.items(): rulefix = rulefix + (f" {x}\n") rulefix = rulefix + (f" {y}\n") rulefix = rulefix + (f" \n") break if isinstance(v, list): rulefix = rulefix + " \n" for setting in v: rulefix = rulefix + ( f" {setting}\n" ) rulefix = rulefix + " \n" else: rulefix = rulefix + (f" {k}\n") rulefix = rulefix + (f" {v}\n") rulefix = rulefix + "\n" rulefix = rulefix + "----\n\n" return rulefix class AdocTemplate: def __init__(self, name, path, template_file): self.name = name self.path = path self.template_file = template_file class PayloadDict: """Class to create and manipulate Configuration Profiles. The actual plist content can be accessed as a dictionary via the 'data' attribute. """ def __init__( self, identifier, uuid=False, description="", organization="", displayname="" ): self.data = {} self.data["PayloadVersion"] = 1 self.data["PayloadOrganization"] = organization if uuid: self.data["PayloadUUID"] = uuid else: self.data["PayloadUUID"] = makeNewUUID() self.data["PayloadType"] = "Configuration" self.data["PayloadScope"] = "System" self.data["PayloadDescription"] = description self.data["PayloadDisplayName"] = displayname self.data["PayloadIdentifier"] = identifier self.data["ConsentText"] = { "default": "THE SOFTWARE IS PROVIDED 'AS IS' WITHOUT ANY WARRANTY OF ANY KIND, EITHER EXPRESSED, IMPLIED, OR STATUTORY, INCLUDING, BUT NOT LIMITED TO, ANY WARRANTY THAT THE SOFTWARE WILL CONFORM TO SPECIFICATIONS, ANY IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND FREEDOM FROM INFRINGEMENT, AND ANY WARRANTY THAT THE DOCUMENTATION WILL CONFORM TO THE SOFTWARE, OR ANY WARRANTY THAT THE SOFTWARE WILL BE ERROR FREE. IN NO EVENT SHALL NIST BE LIABLE FOR ANY DAMAGES, INCLUDING, BUT NOT LIMITED TO, DIRECT, INDIRECT, SPECIAL OR CONSEQUENTIAL DAMAGES, ARISING OUT OF, RESULTING FROM, OR IN ANY WAY CONNECTED WITH THIS SOFTWARE, WHETHER OR NOT BASED UPON WARRANTY, CONTRACT, TORT, OR OTHERWISE, WHETHER OR NOT INJURY WAS SUSTAINED BY PERSONS OR PROPERTY OR OTHERWISE, AND WHETHER OR NOT LOSS WAS SUSTAINED FROM, OR AROSE OUT OF THE RESULTS OF, OR USE OF, THE SOFTWARE OR SERVICES PROVIDED HEREUNDER." } # An empty list for 'sub payloads' that we'll fill later self.data["PayloadContent"] = [] def _updatePayload(self, payload_content_dict, baseline_name): """Update the profile with the payload settings. Takes the settings dictionary which will be the PayloadContent dict within the payload. Handles the boilerplate, naming and descriptive elements. """ # description = "Configuration settings for the {} preference domain.".format(payload_type) payload_dict = {} # Boilerplate payload_dict["PayloadVersion"] = 1 payload_dict["PayloadUUID"] = makeNewUUID() payload_dict["PayloadType"] = payload_content_dict["PayloadType"] payload_dict["PayloadIdentifier"] = ( f"mscp.{payload_content_dict['PayloadType']}.{payload_dict['PayloadUUID']}" ) payload_dict["PayloadContent"] = payload_content_dict # Add the payload to the profile self.data.update(payload_dict) def _addPayload(self, payload_content_dict, baseline_name): """Add a payload to the profile. Takes the settings dictionary which will be the PayloadContent dict within the payload. Handles the boilerplate, naming and descriptive elements. """ # description = "Configuration settings for the {} preference domain.".format(payload_type) payload_dict = {} # Boilerplate payload_dict["PayloadVersion"] = 1 payload_dict["PayloadUUID"] = makeNewUUID() payload_dict["PayloadType"] = payload_content_dict["PayloadType"] payload_dict["PayloadIdentifier"] = ( f"mscp.{payload_content_dict['PayloadType']}.{payload_dict['PayloadUUID']}" ) payload_dict["PayloadContent"] = payload_content_dict # Add the payload to the profile # print payload_dict del payload_dict["PayloadContent"]["PayloadType"] self.data["PayloadContent"].append(payload_dict) def addNewPayload(self, payload_type, settings, baseline_name): """Add a payload to the profile. Takes the settings dictionary which will be the PayloadContent dict within the payload. Handles the boilerplate, naming and descriptive elements. """ # description = "Configuration settings for the {} preference domain.".format(payload_type) payload_dict = {} # Boilerplate payload_dict["PayloadVersion"] = 1 payload_dict["PayloadUUID"] = makeNewUUID() payload_dict["PayloadType"] = payload_type payload_dict["PayloadIdentifier"] = ( f"mscp.{payload_type}.{payload_dict['PayloadUUID']}" ) # Add the settings to the payload for setting in settings: for k, v in setting.items(): payload_dict[k] = v # Add the payload to the profile self.data["PayloadContent"].append(payload_dict) def addMCXPayload(self, settings, baseline_name): """Add a payload to the profile. Takes the settings dictionary which will be the PayloadContent dict within the payload. Handles the boilerplate, naming and descriptive elements. """ keys = settings[1] plist_dict = {} for key in keys.split(): plist_dict[key] = settings[2] # description = "Configuration settings for the {} preference domain.".format(payload_type) payload_dict = {} state = "Forced" domain = settings[0] # Boilerplate payload_dict[domain] = {} payload_dict[domain][state] = [] payload_dict[domain][state].append({}) payload_dict[domain][state][0]["mcx_preference_settings"] = plist_dict payload_dict["PayloadType"] = "com.apple.ManagedClient.preferences" self._addPayload(payload_dict, baseline_name) def finalizeAndSave(self, output_path): """Perform last modifications and save to configuration profile.""" plistlib.dump(self.data, output_path) print(f"Configuration profile written to {output_path.name}") def finalizeAndSavePlist(self, output_path): """Perform last modifications and save to an output plist.""" output_file_path = output_path.name preferences_path = os.path.dirname(output_file_path) settings_dict = {} for i in self.data["PayloadContent"]: if i["PayloadType"] == "com.apple.ManagedClient.preferences": for key, value in i["PayloadContent"].items(): domain = key preferences_output_file = os.path.join( preferences_path, domain + ".plist" ) if not os.path.exists(preferences_output_file): with open(preferences_output_file, "w"): pass with open(preferences_output_file, "rb") as fp: try: settings_dict = plistlib.load(fp) except: settings_dict = {} with open(preferences_output_file, "wb") as fp: for setting in value["Forced"]: for key, value in setting[ "mcx_preference_settings" ].items(): settings_dict[key] = value # preferences_output_path = open(preferences_output_file, 'wb') plistlib.dump(settings_dict, fp) print(f"Settings plist written to {preferences_output_file}") settings_dict.clear() try: os.unlink(output_file_path) except: continue else: if os.path.exists(output_file_path): with open(output_file_path, "rb") as fp: try: settings_dict = plistlib.load(fp) except: settings_dict = {} for key, value in i.items(): if not key.startswith("Payload"): settings_dict[key] = value plistlib.dump(settings_dict, output_path) print(f"Settings plist written to {output_path.name}") def makeNewUUID(): return str(uuid4()) def concatenate_payload_settings(settings): """Takes a list of dictionaries, removed duplicate entries and concatenates an array of settings for the same key""" settings_list = [] settings_dict = {} for item in settings: for key, value in item.items(): if isinstance(value, list): settings_dict.setdefault(key, []).append(value[0]) else: settings_dict.setdefault(key, value) if item not in settings_list: settings_list.append(item) return [settings_dict] def generate_profiles( baseline_name, build_path, parent_dir, baseline_yaml, signing, hash="", generate_domain=True, generate_consolidated=True ): """Generate the configuration profiles for the rules in the provided baseline YAML file""" # import profile_manifests.plist manifests_file = os.path.join(parent_dir, "includes", "supported_payloads.yaml") with open(manifests_file) as r: manifests = yaml.load(r, Loader=yaml.SafeLoader) # Output folder unsigned_mobileconfig_output_path = os.path.join( f"{build_path}", "mobileconfigs", "unsigned" ) if not (os.path.isdir(unsigned_mobileconfig_output_path)): try: os.makedirs(unsigned_mobileconfig_output_path) except OSError: print( "Creation of the directory %s failed" % unsigned_mobileconfig_output_path ) if signing: signed_mobileconfig_output_path = os.path.join( f"{build_path}", "mobileconfigs", "signed" ) if not (os.path.isdir(signed_mobileconfig_output_path)): try: os.makedirs(signed_mobileconfig_output_path) except OSError: print( "Creation of the directory %s failed" % signed_mobileconfig_output_path ) settings_plist_output_path = os.path.join( f"{build_path}", "mobileconfigs", "preferences" ) if not (os.path.isdir(settings_plist_output_path)): try: os.makedirs(settings_plist_output_path) except OSError: print("Creation of the directory %s failed" % settings_plist_output_path) # setup lists and dictionaries profile_errors = [] profile_types = {} mount_controls = {} for sections in baseline_yaml["profile"]: for profile_rule in sections["rules"]: logging.debug(f"checking for rule file for {profile_rule}") if glob.glob( "../custom/rules/**/{}.y*ml".format(profile_rule), recursive=True ): rule = glob.glob( "../custom/rules/**/{}.y*ml".format(profile_rule), recursive=True )[0] custom = True logging.debug(f"{rule}") elif glob.glob("../rules/*/{}.y*ml".format(profile_rule)): rule = glob.glob("../rules/*/{}.y*ml".format(profile_rule))[0] custom = False logging.debug(f"{rule}") # for rule in glob.glob('../rules/*/{}.y*ml'.format(profile_rule)) + glob.glob('../custom/rules/**/{}.y*ml'.format(profile_rule),recursive=True): rule_yaml = get_rule_yaml(rule, baseline_yaml, custom) if rule_yaml["mobileconfig"]: for payload_type, info in rule_yaml["mobileconfig_info"].items(): valid = True try: if payload_type not in manifests["payloads_types"]: profile_errors.append(rule) raise ValueError( "{}: Payload Type is not supported".format(payload_type) ) else: pass except (KeyError, ValueError) as e: profile_errors.append(rule) logging.debug(e) valid = False try: if isinstance(info, list): raise ValueError("Payload key is non-conforming") else: pass except (KeyError, ValueError) as e: profile_errors.append(rule) logging.debug(e) valid = False if valid: if payload_type == "com.apple.systemuiserver": for setting_key, setting_value in info[ "mount-controls" ].items(): mount_controls[setting_key] = setting_value payload_settings = {"mount-controls": mount_controls} profile_types.setdefault(payload_type, []).append( payload_settings ) elif payload_type == "com.apple.ManagedClient.preferences": for payload_domain, settings in info.items(): for key, value in settings.items(): payload_settings = (payload_domain, key, value) profile_types.setdefault(payload_type, []).append( payload_settings ) else: for profile_key, key_value in info.items(): payload_settings = {profile_key: key_value} profile_types.setdefault(payload_type, []).append( payload_settings ) if len(profile_errors) > 0: print( "There are errors in the following files, please correct the .yaml file(s)!" ) for error in profile_errors: print(error) consolidated_profile = PayloadDict( identifier="consolidated." + baseline_name, uuid=False, organization="macOS Security Compliance Project", displayname=f"{baseline_name} settings", description=f"Consolidated configuration settings for {baseline_name}." ) # process the payloads from the yaml file and generate new config profile for each type for payload, settings in profile_types.items(): if payload.startswith("."): unsigned_mobileconfig_file_path = os.path.join( unsigned_mobileconfig_output_path, "com.apple" + payload + ".mobileconfig", ) settings_plist_file_path = os.path.join( settings_plist_output_path, "com.apple" + payload + ".plist" ) if signing: signed_mobileconfig_file_path = os.path.join( signed_mobileconfig_output_path, "com.apple" + payload + ".mobileconfig", ) else: unsigned_mobileconfig_file_path = os.path.join( unsigned_mobileconfig_output_path, payload + ".mobileconfig" ) settings_plist_file_path = os.path.join( settings_plist_output_path, payload + ".plist" ) if signing: signed_mobileconfig_file_path = os.path.join( signed_mobileconfig_output_path, payload + ".mobileconfig" ) identifier = payload + f".{baseline_name}" created = date.today() description = ( "Created: {}\nConfiguration settings for the {} preference domain.".format( created, payload ) ) organization = "macOS Security Compliance Project" displayname = f"[{baseline_name}] {payload} settings" newProfile = PayloadDict( identifier=identifier, uuid=False, organization=organization, displayname=displayname, description=description, ) if payload == "com.apple.ManagedClient.preferences": for item in settings: newProfile.addMCXPayload(item, baseline_name) consolidated_profile.addMCXPayload(item, baseline_name) # handle these payloads for array settings elif ( (payload == "com.apple.applicationaccess.new") or (payload == "com.apple.systempreferences") or (payload == "com.apple.SetupAssistant.managed") ): newProfile.addNewPayload(payload, concatenate_payload_settings(settings), baseline_name) consolidated_profile.addNewPayload(payload, concatenate_payload_settings(settings), baseline_name) else: newProfile.addNewPayload(payload, settings, baseline_name) consolidated_profile.addNewPayload(payload, settings, baseline_name) if generate_domain: with open(settings_plist_file_path, "wb") as settings_plist_file: newProfile.finalizeAndSavePlist(settings_plist_file) with open(unsigned_mobileconfig_file_path, "wb") as unsigned_mobileconfig_file: newProfile.finalizeAndSave(unsigned_mobileconfig_file) if signing: sign_config_profile(unsigned_mobileconfig_file_path, signed_mobileconfig_file_path, hash) if generate_consolidated: consolidated_mobileconfig_file_path = os.path.join(unsigned_mobileconfig_output_path, f"{baseline_name}.mobileconfig") with open(consolidated_mobileconfig_file_path, "wb") as consolidated_mobileconfig_file: consolidated_profile.finalizeAndSave(consolidated_mobileconfig_file) if signing: signed_consolidated_mobileconfig_path = os.path.join(signed_mobileconfig_output_path, f"{baseline_name}.mobileconfig") sign_config_profile(consolidated_mobileconfig_file_path, signed_consolidated_mobileconfig_path, hash) print( f""" CAUTION: These configuration profiles are intended for evaluation in a TEST environment. Certain configuration profiles (Smartcards), when applied could leave a system in a state where a user can no longer login with a password. Please use caution when applying configuration settings to a system. NOTE: If an MDM is already being leveraged, many of these profile settings may be available through the vendor. """ ) def zip_folder(folder_to_zip): with ZipFile(folder_to_zip + ".zip", "w") as zip_object: for folder_name, sub_folders, file_names in os.walk(folder_to_zip): for filename in file_names: # Create filepath of files in directory file_path = os.path.join(folder_name, filename) arcname = os.path.join(folder_name[len(folder_to_zip) :], filename) # Add files to zip file zip_object.write(file_path, arcname) return zip_object.filename def create_ddm_activation(identifier, ddm_output_path): ddm_output_path = f"{ddm_output_path}/activations" ddm_identifier = ( f"{identifier.replace('config', 'activation').replace('asset', 'activation')}" ) ddm_json = {} ddm_json["Identifier"] = ddm_identifier ddm_json["Type"] = "com.apple.activation.simple" ddm_json["Payload"] = {"StandardConfigurations": [identifier]} ddm_object = json.dumps(ddm_json, indent=4) logging.debug(f"Building declarative activation for {ddm_identifier}...") # Writing the .json to disk if not (os.path.isdir(ddm_output_path)): try: os.makedirs(ddm_output_path) except OSError: print("Creation of the directory %s failed" % ddm_output_path) with open(ddm_output_path + "/" + ddm_identifier + ".json", "w") as outfile: outfile.write(ddm_object) return def create_ddm_conf(identifier, service, ddm_output_path): ddm_output_path = f"{ddm_output_path}/configurations" ddm_identifier = f"{identifier.replace('asset', 'config')}" ddm_json = {} ddm_json["Identifier"] = ddm_identifier ddm_json["Type"] = "com.apple.configuration.services.configuration-files" ddm_json["Payload"] = {"ServiceType": service, "DataAssetReference": identifier} ddm_object = json.dumps(ddm_json, indent=4) logging.debug(f"Building declarative configuration for {ddm_identifier}...") # Writing the .json to disk if not (os.path.isdir(ddm_output_path)): try: os.makedirs(ddm_output_path) except OSError: print("Creation of the directory %s failed" % ddm_output_path) with open(ddm_output_path + "/" + ddm_identifier + ".json", "w") as outfile: outfile.write(ddm_object) return def generate_ddm(baseline_name, build_path, parent_dir, baseline_yaml): """Generate the declarative management artifacts for the rules in the provided baseline YAML file""" # import mscp-data mscp_data_file = os.path.join(parent_dir, "includes", "mscp-data.yaml") with open(mscp_data_file) as r: mscp_data_yaml = yaml.load(r, Loader=yaml.SafeLoader) # Output folder ddm_output_path = os.path.join(f"{build_path}", "declarative") if not (os.path.isdir(ddm_output_path)): try: os.makedirs(ddm_output_path) except OSError: print("Creation of the directory %s failed" % ddm_output_path) # setup lists and dictionaries ddm_rules = [] ddm_dict = {} for sections in baseline_yaml["profile"]: for profile_rule in sections["rules"]: logging.debug(f"checking for rule file for {profile_rule}") if glob.glob( "../custom/rules/**/{}.y*ml".format(profile_rule), recursive=True ): rule = glob.glob( "../custom/rules/**/{}.y*ml".format(profile_rule), recursive=True )[0] custom = True logging.debug(f"{rule}") elif glob.glob("../rules/*/{}.y*ml".format(profile_rule)): rule = glob.glob("../rules/*/{}.y*ml".format(profile_rule))[0] custom = False logging.debug(f"{rule}") rule_yaml = get_rule_yaml(rule, baseline_yaml, custom) if "ddm_info" in rule_yaml.keys(): if rule_yaml["ddm_info"]: logging.debug(f"adding {rule_yaml['id']}") ddm_rules.append(rule_yaml) for ddm_rule in ddm_rules: if ( ddm_rule["ddm_info"]["declarationtype"] == "com.apple.configuration.services.configuration-files" ): # verify the ddm service is supported if ddm_rule["ddm_info"]["service"] in mscp_data_yaml["ddm"]["services"]: config_file_path = mscp_data_yaml["ddm"]["services"][ ddm_rule["ddm_info"]["service"] ] else: print(f"{ddm_rule['ddm_info']['service']} service NOT found") # set up configuration file config_file_output_path = os.path.join( f"{ddm_output_path}/" + ddm_rule["ddm_info"]["service"] + config_file_path ) if not (os.path.isdir(config_file_output_path)): try: os.makedirs(config_file_output_path) except OSError: print( "Creation of the directory %s failed" % config_file_output_path ) # write the configuration file service_config_file = open( config_file_output_path + ddm_rule["ddm_info"]["config_file"], "a" ) if ddm_rule["ddm_info"]["configuration_key"] == "file": service_config_file.write( f"{ddm_rule['ddm_info']['configuration_value']}\n" ) else: service_config_file.write( f"{ddm_rule['ddm_info']['configuration_key']} {ddm_rule['ddm_info']['configuration_value']}\n" ) # add configuration-files type to ddm_dict ddm_dict.setdefault(ddm_rule["ddm_info"]["declarationtype"], {}).update({}) service_config_file.close() else: ddm_key = ddm_rule["ddm_info"]["ddm_key"] ddm_key_value = ddm_rule["ddm_info"]["ddm_value"] if ddm_key in ddm_dict.get(ddm_rule["ddm_info"]["declarationtype"], ""): ddm_dict[ddm_rule["ddm_info"]["declarationtype"]][ddm_key].update( ddm_key_value ) else: ddm_dict.setdefault(ddm_rule["ddm_info"]["declarationtype"], {}).update( {ddm_key: ddm_key_value} ) for ddm_type in mscp_data_yaml["ddm"]["supported_types"]: if ddm_type not in ddm_dict.keys(): continue if ddm_type == "com.apple.configuration.services.configuration-files": # build zip files for configs for service in mscp_data_yaml["ddm"]["services"]: for root, dirs, files in os.walk(ddm_output_path): for folder in dirs: if folder == service: logging.debug( f"Found Configuration files for {service}, zipping..." ) service_path = os.path.join(ddm_output_path, service) zip_file = zip_folder(service_path) shutil.rmtree(service_path) # get SHA hash of file sha256_hash = hashlib.sha256() with open(zip_file, "rb") as f: # Read and update hash string value in blocks of 4K for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) zip_sha = sha256_hash.hexdigest() ddm_identifier = f"org.mscp.{baseline_name}.asset.{service.split('.')[2]}" # create declaration for asset created ddm_json = {} ddm_json["Identifier"] = ddm_identifier ddm_json["Type"] = "com.apple.asset.data" ddm_json["Payload"] = {} ddm_json["Payload"]["Reference"] = {} ddm_json["Payload"]["Reference"]["ContentType"] = ( "application/zip" ) ddm_json["Payload"]["Reference"]["DataURL"] = ( f"https://hostname.site.com/{service}.zip" ) ddm_json["Payload"]["Reference"]["Hash-SHA-256"] = zip_sha ddm_json["Authentication"] = {} ddm_json["Authentication"]["Type"] = "None" ddm_object = json.dumps(ddm_json, indent=4) # Writing the .json to disk ddm_asset_output_path = f"{ddm_output_path}/assets" if not (os.path.isdir(ddm_asset_output_path)): try: os.makedirs(ddm_asset_output_path) except OSError: print( "Creation of the directory %s failed" % ddm_asset_output_path ) with open( ddm_asset_output_path + "/" + ddm_identifier + ".json", "w", ) as outfile: outfile.write(ddm_object) # move .zips to assets shutil.move(zip_file, ddm_asset_output_path) # create activation create_ddm_activation(ddm_identifier, ddm_output_path) # create configuration declaration for assets create_ddm_conf(ddm_identifier, service, ddm_output_path) else: logging.debug(f"Building any declarations for {ddm_type}...") ddm_identifier = f"org.mscp.{baseline_name}.config.{ddm_type.replace('com.apple.configuration.', '')}" ddm_json = {} ddm_json["Identifier"] = ddm_identifier ddm_json["Type"] = ddm_type ddm_json["Payload"] = ddm_dict[ddm_type] ddm_object = json.dumps(ddm_json, indent=4) # Writing the .json to disk ddm_config_output_path = f"{ddm_output_path}/configurations" if not (os.path.isdir(ddm_config_output_path)): try: os.makedirs(ddm_config_output_path) except OSError: print( "Creation of the directory %s failed" % ddm_config_output_path ) with open( ddm_config_output_path + "/" + ddm_identifier + ".json", "w" ) as outfile: outfile.write(ddm_object) # create activation create_ddm_activation(ddm_identifier, ddm_output_path) def default_audit_plist(baseline_name, build_path, baseline_yaml): """ "Generate the default audit plist file to define exemptions""" # Output folder plist_output_path = os.path.join(f"{build_path}", "preferences") if not (os.path.isdir(plist_output_path)): try: os.makedirs(plist_output_path) except OSError: print("Creation of the directory %s failed" % plist_output_path) plist_file_path = os.path.join( plist_output_path, "org." + baseline_name + ".audit.plist" ) with open(plist_file_path, "wb") as plist_file: plist_dict = {} for sections in baseline_yaml["profile"]: for profile_rule in sections["rules"]: if profile_rule.startswith("supplemental"): continue plist_dict[profile_rule] = {"exempt": False} plistlib.dump(plist_dict, plist_file) def generate_script(baseline_name, audit_name, build_path, baseline_yaml, reference): """Generates the zsh script from the rules in the baseline YAML""" compliance_script_file = open( build_path + "/" + baseline_name + "_compliance.sh", "w" ) check_function_string = "" fix_function_string = "" # create header of fix zsh script check_zsh_header = f"""#!/bin/zsh --no-rcs ## This script will attempt to audit all of the settings based on the installed profile. ## This script is provided as-is and should be fully tested on a system that is not in a production environment. ################### Variables ################### pwpolicy_file="" ################### DEBUG MODE - hold shift when running the script ################### shiftKeyDown=$(osascript -l JavaScript -e "ObjC.import('Cocoa'); ($.NSEvent.modifierFlags & $.NSEventModifierFlagShift) > 1") if [[ $shiftKeyDown == "true" ]]; then echo "-----DEBUG-----" set -o xtrace -o verbose fi ################### COMMANDS START BELOW THIS LINE ################### # Check if the current shell is Zsh if [[ -z "$ZSH_NAME" ]]; then echo "ERROR: This script must be run in Zsh." exit 1 fi ## Must be run as root if [[ $EUID -ne 0 ]]; then echo "ERROR: This script must be run as root" exit 1 fi # path to PlistBuddy plb="/usr/libexec/PlistBuddy" # get the currently logged in user CURRENT_USER=$(/usr/bin/defaults read /Library/Preferences/com.apple.loginwindow lastUserName) CURR_USER_UID=$(/usr/bin/id -u $CURRENT_USER) # get system architecture arch=$(/usr/bin/arch) # configure colors for text RED='\\e[31m' STD='\\e[39m' GREEN='\\e[32m' YELLOW='\\e[33m' audit_plist="/Library/Preferences/org.{audit_name}.audit.plist" audit_log="/Library/Logs/{audit_name}_baseline.log" # pause function pause(){{ vared -p "Press [Enter] key to continue..." -c fackEnterKey }} # logging function logmessage(){{ if [[ ! $quiet ]];then echo "$(date -u) $1" | /usr/bin/tee -a "$audit_log" elif [[ ${{quiet[2][2]}} == 1 ]];then if [[ $1 == *" failed"* ]] || [[ $1 == *"exemption"* ]] ;then echo "$(date -u) $1" | /usr/bin/tee -a "$audit_log" else echo "$(date -u) $1" | /usr/bin/tee -a "$audit_log" > /dev/null fi else echo "$(date -u) $1" | /usr/bin/tee -a "$audit_log" > /dev/null fi }} ask() {{ # if fix flag is passed, assume YES for everything if [[ $fix ]] || [[ $cfc ]]; then return 0 fi while true; do if [ "${{2:-}}" = "Y" ]; then prompt="Y/n" default=Y elif [ "${{2:-}}" = "N" ]; then prompt="y/N" default=N else prompt="y/n" default= fi # Ask the question - use /dev/tty in case stdin is redirected from somewhere else printf "${{YELLOW}} $1 [$prompt] ${{STD}}" read REPLY # Default? if [ -z "$REPLY" ]; then REPLY=$default fi # Check if the reply is valid case "$REPLY" in Y*|y*) return 0 ;; N*|n*) return 1 ;; esac done }} # function to display menus show_menus() {{ lastComplianceScan=$(defaults read /Library/Preferences/org.{baseline_name}.audit.plist lastComplianceCheck) if [[ $lastComplianceScan == "" ]];then lastComplianceScan="No scans have been run" fi /usr/bin/clear echo "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" echo " M A I N - M E N U" echo " macOS Security Compliance Tool" echo "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" echo "Last compliance scan: $lastComplianceScan\n" echo "1. View Last Compliance Report" echo "2. Run New Compliance Scan" echo "3. Run Commands to remediate non-compliant settings" echo "4. Exit" }} # function to read options read_options(){{ local choice vared -p "Enter choice [ 1 - 4 ] " -c choice case $choice in 1) view_report ;; 2) run_scan ;; 3) run_fix ;; 4) exit 0;; *) echo -e "${{RED}}Error: please choose an option 1-4...${{STD}}" && sleep 1 esac }} # function to reset and remove plist file. Used to clear out any previous findings reset_plist(){{ if [[ $reset_all ]];then echo "Clearing results from all MSCP baselines" find /Library/Preferences -name "org.*.audit.plist" -exec rm -f '{{}}' \\; find /Library/Logs -name "*_baseline.log" -exec rm -f '{{}}' \\; else echo "Clearing results from /Library/Preferences/org.{baseline_name}.audit.plist" rm -f /Library/Preferences/org.{audit_name}.audit.plist rm -f /Library/Logs/{audit_name}_baseline.log fi }} # Generate the Compliant and Non-Compliant counts. Returns: Array (Compliant, Non-Compliant) compliance_count(){{ compliant=0 non_compliant=0 exempt_count=0 rule_names=($(/usr/libexec/PlistBuddy -c "Print" $audit_plist | awk '/= Dict/ {{print $1}}')) for rule in ${{rule_names[@]}}; do finding=$(/usr/libexec/PlistBuddy -c "Print $rule:finding" $audit_plist) if [[ $finding == "false" ]];then compliant=$((compliant+1)) elif [[ $finding == "true" ]];then is_exempt=$(/usr/bin/osascript -l JavaScript << EOS 2>/dev/null ObjC.unwrap($.NSUserDefaults.alloc.initWithSuiteName('org.{baseline_name}.audit').objectForKey("$rule"))["exempt"] EOS ) if [[ $is_exempt == "1" ]]; then exempt_count=$((exempt_count+1)) non_compliant=$((non_compliant+1)) else non_compliant=$((non_compliant+1)) fi fi done # Enable output of just the compliant or non-compliant numbers. if [[ $1 = "compliant" ]] then echo $compliant elif [[ $1 = "non-compliant" ]] then echo $non_compliant else # no matching args output the array array=($compliant $non_compliant $exempt_count) echo ${{array[@]}} fi }} generate_report(){{ count=($(compliance_count)) compliant=${{count[1]}} non_compliant=${{count[2]}} exempt_rules=${{count[3]}} total=$((non_compliant + compliant)) percentage=$(printf %.2f $(( (compliant + exempt_rules) * 100. / total )) ) echo echo "Number of tests passed: ${{GREEN}}$compliant${{STD}}" echo "Number of test FAILED: ${{RED}}$non_compliant${{STD}}" echo "Number of exempt rules: ${{YELLOW}}$exempt_rules${{STD}}" echo "You are ${{YELLOW}}$percentage%${{STD}} percent compliant!" pause }} view_report(){{ if [[ $lastComplianceScan == "No scans have been run" ]];then echo "no report to run, please run new scan" pause else generate_report fi }} # Designed for use with MDM - single unformatted output of the Compliance Report generate_stats(){{ count=($(compliance_count)) compliant=${{count[1]}} non_compliant=${{count[2]}} total=$((non_compliant + compliant)) percentage=$(printf %.2f $(( compliant * 100. / total )) ) echo "PASSED: $compliant FAILED: $non_compliant, $percentage percent compliant!" }} run_scan(){{ # append to existing logfile if [[ $(/usr/bin/tail -n 1 "$audit_log" 2>/dev/null) = *"Remediation complete" ]]; then echo "$(date -u) Beginning {baseline_name} baseline scan" >> "$audit_log" else echo "$(date -u) Beginning {baseline_name} baseline scan" > "$audit_log" fi # run mcxrefresh /usr/bin/mcxrefresh -u $CURR_USER_UID # write timestamp of last compliance check /usr/bin/defaults write "$audit_plist" lastComplianceCheck "$(date +"%Y-%m-%d %H:%M:%S%z")" """ # Read all rules in the section and output the check functions for sections in baseline_yaml["profile"]: for profile_rule in sections["rules"]: logging.debug(f"checking for rule file for {profile_rule}") if glob.glob( "../custom/rules/**/{}.y*ml".format(profile_rule), recursive=True ): rule = glob.glob( "../custom/rules/**/{}.y*ml".format(profile_rule), recursive=True )[0] custom = True logging.debug(f"{rule}") elif glob.glob("../rules/*/{}.y*ml".format(profile_rule)): rule = glob.glob("../rules/*/{}.y*ml".format(profile_rule))[0] custom = False logging.debug(f"{rule}") rule_yaml = get_rule_yaml(rule, baseline_yaml, custom) if rule_yaml["id"].startswith("supplemental"): continue arch = "" try: if "manual" in rule_yaml["tags"]: continue if "arm64" in rule_yaml["tags"]: arch = "arm64" elif "i386" in rule_yaml["tags"]: arch = "i386" except: pass # grab the 800-53 controls try: rule_yaml["references"]["800-53r5"] except KeyError: nist_80053r5 = "N/A" else: nist_80053r5 = rule_yaml["references"]["800-53r5"] cis_ref = ["cis", "cis_lvl1", "cis_lvl2", "cisv8"] if reference == "default": log_reference_id = [rule_yaml["id"]] elif reference in cis_ref: if "v8" in reference: log_reference_id = [ f"CIS Controls-{', '.join(map(str, rule_yaml['references']['cis']['controls v8']))}" ] else: log_reference_id = [ f"CIS-{rule_yaml['references']['cis']['benchmark'][0]}" ] else: try: rule_yaml["references"][reference] except KeyError: try: rule_yaml["references"]["custom"][reference] except KeyError: log_reference_id = [rule_yaml["id"]] else: if isinstance( rule_yaml["references"]["custom"][reference], list ): log_reference_id = rule_yaml["references"]["custom"][ reference ] + [rule_yaml["id"]] else: log_reference_id = [ rule_yaml["references"]["custom"][reference] ] + [rule_yaml["id"]] else: if isinstance(rule_yaml["references"][reference], list): log_reference_id = rule_yaml["references"][reference] + [ rule_yaml["id"] ] else: log_reference_id = [rule_yaml["references"][reference]] + [ rule_yaml["id"] ] # group the controls if not nist_80053r5 == "N/A": nist_80053r5.sort() res = [ list(i) for j, i in groupby(nist_80053r5, lambda a: a.split("(")[0]) ] nist_controls = "" for i in res: nist_controls += group_ulify(i) else: nist_controls = "N/A" # print checks and result try: check = rule_yaml["check"] except KeyError: print("no check found for {}".format(rule_yaml["id"])) continue try: result = rule_yaml["result"] except KeyError: continue if "integer" in result: result_value = result["integer"] elif "boolean" in result: result_value = str(result["boolean"]).lower() elif "string" in result: result_value = result["string"] elif "base64" in result: result_string_bytes = f"{result['base64']}\n".encode("UTF-8") result_encoded = base64.b64encode(result_string_bytes) result["base64"] = result_encoded.decode() result_value = result["base64"] else: continue # write the checks zsh_check_text = """ #####----- Rule: {0} -----##### ## Addresses the following NIST 800-53 controls: {1} rule_arch="{6}" if [[ "$arch" == "$rule_arch" ]] || [[ -z "$rule_arch" ]]; then unset result_value result_value=$({2}\n) # expected result {3} # check to see if rule is exempt unset exempt unset exempt_reason exempt=$(/usr/bin/osascript -l JavaScript << EOS 2>/dev/null ObjC.unwrap($.NSUserDefaults.alloc.initWithSuiteName('org.{7}.audit').objectForKey('{0}'))["exempt"] EOS ) exempt_reason=$(/usr/bin/osascript -l JavaScript << EOS 2>/dev/null ObjC.unwrap($.NSUserDefaults.alloc.initWithSuiteName('org.{7}.audit').objectForKey('{0}'))["exempt_reason"] EOS ) customref="$(echo "{5}" | rev | cut -d ' ' -f 2- | rev)" customref="$(echo "$customref" | tr " " ",")" if [[ $result_value == "{4}" ]]; then logmessage "{5} passed (Result: $result_value, Expected: \\"{3}\\")" /usr/bin/defaults write "$audit_plist" {0} -dict-add finding -bool NO if [[ ! "$customref" == "{0}" ]]; then /usr/bin/defaults write "$audit_plist" {0} -dict-add reference -string "$customref" fi /usr/bin/logger "mSCP: {7} - {5} passed (Result: $result_value, Expected: "{3}")" else if [[ ! $exempt == "1" ]] || [[ -z $exempt ]];then logmessage "{5} failed (Result: $result_value, Expected: \\"{3}\\")" /usr/bin/defaults write "$audit_plist" {0} -dict-add finding -bool YES if [[ ! "$customref" == "{0}" ]]; then /usr/bin/defaults write "$audit_plist" {0} -dict-add reference -string "$customref" fi /usr/bin/logger "mSCP: {7} - {5} failed (Result: $result_value, Expected: "{3}")" else logmessage "{5} failed (Result: $result_value, Expected: \\"{3}\\") - Exemption Allowed (Reason: \\"$exempt_reason\\")" /usr/bin/defaults write "$audit_plist" {0} -dict-add finding -bool YES if [[ ! "$customref" == "{0}" ]]; then /usr/bin/defaults write "$audit_plist" {0} -dict-add reference -string "$customref" fi /usr/bin/logger "mSCP: {7} - {5} failed (Result: $result_value, Expected: "{3}") - Exemption Allowed (Reason: "$exempt_reason")" /bin/sleep 1 fi fi else logmessage "{5} does not apply to this architecture" /usr/bin/defaults write "$audit_plist" {0} -dict-add finding -bool NO fi """.format( rule_yaml["id"], nist_controls.replace("\n", "\n#"), check.strip(), str(result), result_value, " ".join(log_reference_id), arch, baseline_name, ) check_function_string = check_function_string + zsh_check_text # print fix and result try: rule_yaml["fix"] except KeyError: fix_text = "N/A" else: fix_text = rule_yaml["fix"] or ["n/a"] # write the fixes if "[source,bash]" in fix_text: nist_controls_commented = nist_controls.replace("\n", "\n#") zsh_fix_text = f""" #####----- Rule: {rule_yaml["id"]} -----##### ## Addresses the following NIST 800-53 controls: {nist_controls_commented} # check to see if rule is exempt unset exempt unset exempt_reason exempt=$(/usr/bin/osascript -l JavaScript << EOS 2>/dev/null ObjC.unwrap($.NSUserDefaults.alloc.initWithSuiteName('org.{baseline_name}.audit').objectForKey('{rule_yaml["id"]}'))["exempt"] EOS ) exempt_reason=$(/usr/bin/osascript -l JavaScript << EOS 2>/dev/null ObjC.unwrap($.NSUserDefaults.alloc.initWithSuiteName('org.{baseline_name}.audit').objectForKey('{rule_yaml["id"]}'))["exempt_reason"] EOS ) {rule_yaml["id"]}_audit_score=$($plb -c "print {rule_yaml["id"]}:finding" $audit_plist) if [[ ! $exempt == "1" ]] || [[ -z $exempt ]];then if [[ ${rule_yaml["id"]}_audit_score == "true" ]]; then ask '{rule_yaml["id"]} - Run the command(s)-> {quotify(get_fix_code(rule_yaml["fix"]).strip())} ' N if [[ $? == 0 ]]; then logmessage "Running the command to configure the settings for: {rule_yaml["id"]} ..." {get_fix_code(rule_yaml["fix"]).strip()} fi else logmessage "Settings for: {rule_yaml["id"]} already configured, continuing..." fi elif [[ ! -z "$exempt_reason" ]];then logmessage "{rule_yaml["id"]} has an exemption, remediation skipped (Reason: \"$exempt_reason\")" fi """ fix_function_string = fix_function_string + zsh_fix_text # write the footer for the check functions zsh_check_footer = """ lastComplianceScan=$(defaults read "$audit_plist" lastComplianceCheck) echo "Results written to $audit_plist" if [[ ! $check ]] && [[ ! $cfc ]];then pause fi } 2>/dev/null run_fix(){ if [[ ! -e "$audit_plist" ]]; then echo "Audit plist doesn't exist, please run Audit Check First" | tee -a "$audit_log" if [[ ! $fix ]]; then pause show_menus read_options else exit 1 fi fi if [[ ! $fix ]] && [[ ! $cfc ]]; then ask 'THE SOFTWARE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY OF ANY KIND, EITHER EXPRESSED, IMPLIED, OR STATUTORY, INCLUDING, BUT NOT LIMITED TO, ANY WARRANTY THAT THE SOFTWARE WILL CONFORM TO SPECIFICATIONS, ANY IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND FREEDOM FROM INFRINGEMENT, AND ANY WARRANTY THAT THE DOCUMENTATION WILL CONFORM TO THE SOFTWARE, OR ANY WARRANTY THAT THE SOFTWARE WILL BE ERROR FREE. IN NO EVENT SHALL NIST BE LIABLE FOR ANY DAMAGES, INCLUDING, BUT NOT LIMITED TO, DIRECT, INDIRECT, SPECIAL OR CONSEQUENTIAL DAMAGES, ARISING OUT OF, RESULTING FROM, OR IN ANY WAY CONNECTED WITH THIS SOFTWARE, WHETHER OR NOT BASED UPON WARRANTY, CONTRACT, TORT, OR OTHERWISE, WHETHER OR NOT INJURY WAS SUSTAINED BY PERSONS OR PROPERTY OR OTHERWISE, AND WHETHER OR NOT LOSS WAS SUSTAINED FROM, OR AROSE OUT OF THE RESULTS OF, OR USE OF, THE SOFTWARE OR SERVICES PROVIDED HEREUNDER. WOULD YOU LIKE TO CONTINUE? ' N if [[ $? != 0 ]]; then show_menus read_options fi fi # append to existing logfile echo "$(date -u) Beginning remediation of non-compliant settings" >> "$audit_log" # remove uchg on audit_control /usr/bin/chflags nouchg /etc/security/audit_control # run mcxrefresh /usr/bin/mcxrefresh -u $CURR_USER_UID """ # write the footer for the script zsh_fix_footer = f""" echo "$(date -u) Remediation complete" >> "$audit_log" }} 2>/dev/null usage=( "$0 Usage" "$0 [--check] [--fix] [--cfc] [--stats] [--compliant] [--non_compliant] [--reset] [--reset-all] [--quiet=]" " " "Optional parameters:" "--check : run the compliance checks without interaction" "--fix : run the remediation commands without interaction" "--cfc : runs a check, fix, check without interaction" "--stats : display the statistics from last compliance check" "--compliant : reports the number of compliant checks" "--non_compliant : reports the number of non_compliant checks" "--reset : clear out all results for current baseline" "--reset-all : clear out all results for ALL MSCP baselines" "--quiet= : 1 - show only failed and exempted checks in output" " 2 - show minimal output" ) # Look for managed arguments for compliance script if [[ $# -eq 0 ]];then compliance_args=$(/usr/bin/osascript -l JavaScript << 'EOS' var defaults = $.NSUserDefaults.alloc.initWithSuiteName('org.{audit_name}.audit'); var args = defaults.objectForKey('compliance_args'); if (args && args.count > 0) {{ var result = []; for (var i = 0; i < args.count; i++) {{ result.push(ObjC.unwrap(args.objectAtIndex(i))); }} result.join(' '); }} EOS ) if [[ -n "$compliance_args" ]]; then logmessage "Managed arguments found for compliance script, setting: $compliance_args" set -- ${{(z)compliance_args}} fi fi zparseopts -D -E -help=flag_help -check=check -fix=fix -stats=stats -compliant=compliant_opt -non_compliant=non_compliant_opt -reset=reset -reset-all=reset_all -cfc=cfc -quiet:=quiet || {{ print -l $usage && return }} [[ -z "$flag_help" ]] || {{ print -l $usage && return }} if [[ ! -z $quiet ]];then [[ ! -z ${{quiet[2][2]}} ]] || {{ print -l $usage && return }} fi if [[ $reset ]] || [[ $reset_all ]]; then reset_plist; fi if [[ $check ]] || [[ $fix ]] || [[ $cfc ]] || [[ $stats ]] || [[ $compliant_opt ]] || [[ $non_compliant_opt ]]; then if [[ $fix ]]; then run_fix; fi if [[ $check ]]; then run_scan; fi if [[ $cfc ]]; then run_scan; run_fix; run_scan; fi if [[ $stats ]];then generate_stats; fi if [[ $compliant_opt ]];then compliance_count "compliant"; fi if [[ $non_compliant_opt ]];then compliance_count "non-compliant"; fi else while true; do show_menus read_options done fi """ # write out the compliance script compliance_script_file.write(check_zsh_header) compliance_script_file.write(check_function_string) compliance_script_file.write(zsh_check_footer) compliance_script_file.write(fix_function_string) compliance_script_file.write(zsh_fix_footer) print(f"Finished building {compliance_script_file.name}") # make the compliance script executable os.chmod(compliance_script_file.name, 0o755) # fix_script_file.close() compliance_script_file.close() def fill_in_odv(resulting_yaml, parent_values): fields_to_process = ["title", "discussion", "check", "fix"] _has_odv = False if "odv" in resulting_yaml: try: if type(resulting_yaml["odv"][parent_values]) == int: odv = resulting_yaml["odv"][parent_values] else: odv = str(resulting_yaml["odv"][parent_values]) _has_odv = True except KeyError: try: if type(resulting_yaml["odv"]["custom"]) == int: odv = resulting_yaml["odv"]["custom"] else: odv = str(resulting_yaml["odv"]["custom"]) _has_odv = True except KeyError: if type(resulting_yaml["odv"]["recommended"]) == int: odv = resulting_yaml["odv"]["recommended"] else: odv = str(resulting_yaml["odv"]["recommended"]) _has_odv = True else: pass if _has_odv: for field in fields_to_process: if "$ODV" in resulting_yaml[field]: resulting_yaml[field] = resulting_yaml[field].replace("$ODV", str(odv)) if "result" in resulting_yaml.keys(): for result_value in resulting_yaml["result"]: if "$ODV" in str(resulting_yaml["result"][result_value]): resulting_yaml["result"][result_value] = odv if resulting_yaml["mobileconfig_info"]: for mobileconfig_type in resulting_yaml["mobileconfig_info"]: if isinstance( resulting_yaml["mobileconfig_info"][mobileconfig_type], dict ): for mobileconfig_value in resulting_yaml["mobileconfig_info"][ mobileconfig_type ]: if "$ODV" in str( resulting_yaml["mobileconfig_info"][mobileconfig_type][ mobileconfig_value ] ): if ( type( resulting_yaml["mobileconfig_info"][ mobileconfig_type ][mobileconfig_value] ) == dict ): for k, v in resulting_yaml["mobileconfig_info"][ mobileconfig_type ][mobileconfig_value].items(): if v == "$ODV": resulting_yaml["mobileconfig_info"][ mobileconfig_type ][mobileconfig_value][k] = odv else: resulting_yaml["mobileconfig_info"][mobileconfig_type][ mobileconfig_value ] = odv if "ddm_info" in resulting_yaml.keys(): for ddm_type, value in resulting_yaml["ddm_info"].items(): if isinstance(value, dict): for _value in value: if "$ODV" in str(value[_value]): resulting_yaml["ddm_info"][ddm_type][_value] = odv if "$ODV" in value: resulting_yaml["ddm_info"][ddm_type] = odv def get_rule_yaml( rule_file, baseline_yaml, custom=False, ): """Takes a rule file, checks for a custom version, and returns the yaml for the rule""" global resulting_yaml resulting_yaml = {} names = [ os.path.basename(x) for x in glob.glob("../custom/rules/**/*.y*ml", recursive=True) ] file_name = os.path.basename(rule_file) # get parent values try: parent_values = baseline_yaml["parent_values"] except KeyError: parent_values = "recommended" if custom: print(f"Custom settings found for rule: {rule_file}") try: override_path = glob.glob( "../custom/rules/**/{}".format(file_name), recursive=True )[0] except IndexError: override_path = glob.glob( "../custom/rules/{}".format(file_name), recursive=True )[0] with open(override_path) as r: rule_yaml = yaml.load(r, Loader=yaml.SafeLoader) else: with open(rule_file) as r: rule_yaml = yaml.load(r, Loader=yaml.SafeLoader) try: og_rule_path = glob.glob("../rules/**/{}".format(file_name), recursive=True)[0] except IndexError: # assume this is a completely new rule og_rule_path = glob.glob( "../custom/rules/**/{}".format(file_name), recursive=True )[0] resulting_yaml["customized"] = ["customized rule"] # get original/default rule yaml for comparison with open(og_rule_path) as og: og_rule_yaml = yaml.load(og, Loader=yaml.SafeLoader) for yaml_field in og_rule_yaml: # print('processing field {} for rule {}'.format(yaml_field, file_name)) if yaml_field == "references": if not "references" in resulting_yaml: resulting_yaml["references"] = {} for ref in og_rule_yaml["references"]: try: if og_rule_yaml["references"][ref] == rule_yaml["references"][ref]: resulting_yaml["references"][ref] = og_rule_yaml["references"][ ref ] else: resulting_yaml["references"][ref] = rule_yaml["references"][ref] except KeyError: # reference not found in original rule yaml, trying to use reference from custom rule try: resulting_yaml["references"][ref] = rule_yaml["references"][ref] except KeyError: resulting_yaml["references"][ref] = og_rule_yaml["references"][ ref ] try: if "custom" in rule_yaml["references"]: resulting_yaml["references"]["custom"] = rule_yaml[ "references" ]["custom"] if "customized" in resulting_yaml: if ( "customized references" not in resulting_yaml["customized"] ): resulting_yaml["customized"].append( "customized references" ) else: resulting_yaml["customized"] = ["customized references"] except: pass elif yaml_field == "tags": # try to concatenate tags from both original yaml and custom yaml try: if og_rule_yaml["tags"] == rule_yaml["tags"]: # print("using default data in yaml field {}".format("tags")) resulting_yaml["tags"] = og_rule_yaml["tags"] else: # print("Found custom tags... concatenating them") resulting_yaml["tags"] = og_rule_yaml["tags"] + rule_yaml["tags"] except KeyError: resulting_yaml["tags"] = og_rule_yaml["tags"] else: try: if og_rule_yaml[yaml_field] == rule_yaml[yaml_field]: # print("using default data in yaml field {}".format(yaml_field)) resulting_yaml[yaml_field] = og_rule_yaml[yaml_field] else: # print('using CUSTOM value for yaml field {} in rule {}'.format(yaml_field, file_name)) resulting_yaml[yaml_field] = rule_yaml[yaml_field] if "customized" in resulting_yaml: resulting_yaml["customized"].append( "customized {}".format(yaml_field) ) else: resulting_yaml["customized"] = [ "customized {}".format(yaml_field) ] except KeyError: resulting_yaml[yaml_field] = og_rule_yaml[yaml_field] fill_in_odv(resulting_yaml, parent_values) return resulting_yaml def generate_xls(baseline_name, build_path, baseline_yaml): """Using the baseline yaml file, create an XLS document containing the YAML fields""" baseline_rules = create_rules(baseline_yaml) # File path setup file_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(file_dir) # Output files xls_output_file = f"{build_path}/{baseline_name}.xls" wb = Workbook() sheet1 = wb.add_sheet("Sheet 1", cell_overwrite_ok=True) topWrap = easyxf("align: vert top; alignment: wrap True") top = easyxf("align: vert top") headers = easyxf("font: bold on") counter = 1 column_counter = 18 custom_ref_column = {} sheet1.write(0, 0, "CCE", headers) sheet1.write(0, 1, "Rule ID", headers) sheet1.write(0, 2, "Title", headers) sheet1.write(0, 3, "Discussion", headers) sheet1.write(0, 4, "Mechanism", headers) sheet1.write(0, 5, "Check", headers) sheet1.write(0, 6, "Check Result", headers) sheet1.write(0, 7, "Fix", headers) sheet1.write(0, 8, "800-53r5", headers) sheet1.write(0, 9, "800-171", headers) sheet1.write(0, 10, "SRG", headers) sheet1.write(0, 11, "SFR", headers) sheet1.write(0, 12, "DISA STIG", headers) sheet1.write(0, 13, "CIS Benchmark", headers) sheet1.write(0, 14, "CIS v8", headers) sheet1.write(0, 15, "CMMC", headers) sheet1.write(0, 16, "indigo", headers) sheet1.write(0, 17, "CCI", headers) sheet1.write(0, 18, "Severity", headers) sheet1.write(0, 19, "Modified Rule", headers) sheet1.set_panes_frozen(True) sheet1.set_horz_split_pos(1) sheet1.set_vert_split_pos(2) for rule in baseline_rules: if rule.rule_id.startswith("supplemental") or rule.rule_id.startswith("srg"): continue sheet1.write(counter, 0, rule.rule_cce, top) sheet1.col(0).width = 256 * 15 sheet1.write(counter, 1, rule.rule_id, top) sheet1.col(1).width = 512 * 25 sheet1.write(counter, 2, rule.rule_title, top) sheet1.col(2).width = 600 * 30 sheet1.write(counter, 3, str(rule.rule_discussion), topWrap) sheet1.col(3).width = 700 * 35 mechanism = "Manual" if "[source,bash]" in rule.rule_fix: mechanism = "Script" if "This is implemented by a Configuration Profile." in rule.rule_fix: mechanism = "Configuration Profile" if "inherent" in rule.rule_tags: mechanism = "The control cannot be configured out of compliance." if "permanent" in rule.rule_tags: mechanism = "The control is not able to be configure to meet the requirement. It is recommended to implement a third-party solution to meet the control." if "not_applicable" in rule.rule_tags: mechanism = ( " The control is not applicable when configuring a macOS system." ) sheet1.write(counter, 4, mechanism, top) sheet1.col(4).width = 256 * 25 sheet1.write(counter, 5, rule.rule_check.replace(r"\|", "|"), topWrap) sheet1.col(5).width = 750 * 50 sheet1.write(counter, 6, str(rule.rule_result_value), topWrap) sheet1.col(6).width = 256 * 25 if rule.rule_mobileconfig: sheet1.write( counter, 7, format_mobileconfig_fix(rule.rule_mobileconfig_info), topWrap, ) # print(format_mobileconfig_fix(rule.rule_mobileconfig_info)) # sheet1.write(counter, 7, str( # configProfile(rule_file)), topWrap) else: sheet1.write(counter, 7, str(rule.rule_fix.replace(r"\|", "|")), topWrap) sheet1.col(7).width = 1000 * 50 baseline_refs = (str(rule.rule_80053r5)).strip("[]'") baseline_refs = baseline_refs.replace(", ", "\n").replace("'", "") sheet1.write(counter, 8, baseline_refs, topWrap) sheet1.col(8).width = 256 * 15 nist171_refs = (str(rule.rule_800171)).strip("[]'") nist171_refs = nist171_refs.replace(", ", "\n").replace("'", "") sheet1.write(counter, 9, nist171_refs, topWrap) sheet1.col(9).width = 256 * 15 srg_refs = (str(rule.rule_srg)).strip("[]'") srg_refs = srg_refs.replace(", ", "\n").replace("'", "") sheet1.write(counter, 10, srg_refs, topWrap) sheet1.col(10).width = 500 * 15 sfr_refs = (str(rule.rule_sfr)).strip("[]'") sfr_refs = sfr_refs.replace(", ", "\n").replace("'", "") sheet1.write(counter, 11, sfr_refs, topWrap) sheet1.col(11).width = 500 * 15 disa_refs = (str(rule.rule_disa_stig)).strip("[]'") disa_refs = disa_refs.replace(", ", "\n").replace("'", "") sheet1.write(counter, 12, disa_refs, topWrap) sheet1.col(12).width = 500 * 15 cis = "" if rule.rule_cis != ["None"]: for title, ref in rule.rule_cis.items(): if title.lower() == "benchmark": sheet1.write(counter, 13, ref, topWrap) sheet1.col(13).width = 500 * 15 if title.lower() == "controls v8": cis = str(ref).strip("[]'") cis = cis.replace(", ", "\n") sheet1.write(counter, 14, cis, topWrap) sheet1.col(14).width = 500 * 15 cmmc_refs = (str(rule.rule_cmmc)).strip("[]'") cmmc_refs = cmmc_refs.replace(", ", "\n").replace("'", "") sheet1.write(counter, 15, cmmc_refs, topWrap) sheet1.col(15).width = 500 * 15 indigo_refs = (str(rule.rule_indigo)).strip("[]'") indigo_refs = indigo_refs.replace(", ", "\n").replace("'", "") sheet1.write(counter, 16, indigo_refs, topWrap) sheet1.col(16).width = 500 * 15 cci = (str(rule.rule_cci)).strip("[]'") cci = cci.replace(", ", "\n").replace("'", "") sheet1.write(counter, 17, cci, topWrap) sheet1.col(17).width = 400 * 15 # determine severity # uses 'parent_values' from baseline.yaml file to determine which/if any severity to use # while support for a dictionary will work for generating the excel sheet, having a dictionary in severity may break third-party apps severity = "" if isinstance(rule.rule_severity, dict): try: severity = f"{rule.rule_severity[baseline_yaml['parent_values']]}" except KeyError: severity = "" elif isinstance(rule.rule_severity, str): severity = f"{rule.rule_severity}" sheet1.write(counter, 18, severity, topWrap) sheet1.col(18).width = 400 * 15 customized = (str(rule.rule_customized)).strip("[]'") customized = customized.replace(", ", "\n").replace("'", "") sheet1.write(counter, 19, customized, topWrap) sheet1.col(19).width = 400 * 15 if rule.rule_custom_refs != ["None"]: for title, ref in rule.rule_custom_refs.items(): if title not in custom_ref_column: custom_ref_column[title] = column_counter column_counter = column_counter + 1 sheet1.write(0, custom_ref_column[title], title, headers) sheet1.col(custom_ref_column[title]).width = 512 * 25 added_ref = (str(ref)).strip("[]'") added_ref = added_ref.replace(", ", "\n").replace("'", "") sheet1.write(counter, custom_ref_column[title], added_ref, topWrap) tall_style = easyxf("font:height 640;") # 36pt sheet1.row(counter).set_style(tall_style) counter = counter + 1 wb.save(xls_output_file) print(f"Finished building {xls_output_file}") def create_rules(baseline_yaml): """Takes a baseline yaml file and parses the rules, returns a list of containing rules""" all_rules = [] # expected keys and references keys = [ "mobileconfig", "macOS", "severity", "title", "check", "fix", "tags", "id", "references", "odv", "result", "discussion", "customized", ] references = [ "disa_stig", "cci", "cce", "800-53r5", "800-171r3", "cis", "cmmc", "indigo", "srg", "sfr", "custom", ] for sections in baseline_yaml["profile"]: for profile_rule in sections["rules"]: if glob.glob( "../custom/rules/**/{}.y*ml".format(profile_rule), recursive=True ): rule = glob.glob( "../custom/rules/**/{}.y*ml".format(profile_rule), recursive=True )[0] custom = True elif glob.glob("../rules/*/{}.y*ml".format(profile_rule)): rule = glob.glob("../rules/*/{}.y*ml".format(profile_rule))[0] custom = False # for rule in glob.glob('../rules/*/{}.y*ml'.format(profile_rule)) + glob.glob('../custom/rules/**/{}.y*ml'.format(profile_rule),recursive=True): rule_yaml = get_rule_yaml(rule, baseline_yaml, custom) for key in keys: try: rule_yaml[key] except: # print("{} key missing ..for {}".format(key, rule)) rule_yaml.update({key: ""}) if key == "references": for reference in references: try: rule_yaml[key][reference] # print("FOUND reference {} for key {} for rule {}".format(reference, key, rule)) except: # print("expected reference '{}' is missing in key '{}' for rule{}".format(reference, key, rule)) rule_yaml[key].update({reference: ["None"]}) all_rules.append( MacSecurityRule( rule_yaml["title"].replace("|", r"\|"), rule_yaml["id"].replace("|", r"\|"), rule_yaml["severity"], rule_yaml["discussion"], # .replace('|', r'\|'), rule_yaml["check"].replace("|", r"\|"), rule_yaml["fix"].replace("|", r"\|"), rule_yaml["references"]["cci"], rule_yaml["references"]["cce"], rule_yaml["references"]["800-53r5"], rule_yaml["references"]["800-171r3"], rule_yaml["references"]["disa_stig"], rule_yaml["references"]["srg"], rule_yaml["references"]["sfr"], rule_yaml["references"]["cis"], rule_yaml["references"]["cmmc"], rule_yaml["references"]["indigo"], rule_yaml["references"]["custom"], rule_yaml["odv"], rule_yaml["tags"], rule_yaml["result"], rule_yaml["mobileconfig"], rule_yaml["mobileconfig_info"], rule_yaml["customized"], ) ) return all_rules def create_args(): """configure the arguments used in the script, returns the parsed arguements""" parser = argparse.ArgumentParser( description="Given a baseline, create guidance documents and files." ) parser.add_argument( "baseline", default=None, help="Baseline YAML file used to create the guide.", type=argparse.FileType("rt"), ) parser.add_argument( "-c", "--clean", default=None, help=argparse.SUPPRESS, action="store_true" ) parser.add_argument( "-d", "--debug", default=None, help=argparse.SUPPRESS, action="store_true" ) parser.add_argument( "-D", "--ddm", default=None, help="Generate declarative management artifacts for the rules.", action="store_true", ) parser.add_argument( "-l", "--logo", default=None, help="Full path to logo file to be included in the guide.", action="store", ) parser.add_argument( "-p", "--profiles", default=None, help="Generate domain-specific configuration profiles for the rules.", action="store_true", ) parser.add_argument( "-P", "--consolidated-profile", default=None, help="Generate consolidated configuration profile for all rules.", action="store_true", ) parser.add_argument( "-r", "--reference", default=None, help="Use the reference ID instead of rule ID for identification.", ) parser.add_argument( "-s", "--script", default=None, help="Generate the compliance script for the rules.", action="store_true", ) # add gary argument to include tags for XCCDF generation, with a nod to Gary the SCAP guru parser.add_argument( "-g", "--gary", default=None, help=argparse.SUPPRESS, action="store_true" ) parser.add_argument( "-x", "--xls", default=None, help="Generate the excel (xls) document for the rules.", action="store_true", ) parser.add_argument( "-H", "--hash", default=None, help="sign the configuration profiles with subject key ID (hash value without spaces)", ) parser.add_argument( "-a", "--audit_name", default=None, help="name of audit plist and log - defaults to baseline name", ) return parser.parse_args() def is_asciidoctor_installed(): """Checks to see if the ruby gem for asciidoctor is installed""" # cmd = "gem list asciidoctor -i" cmd = "which asciidoctor" process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE) output, error = process.communicate() # return path to asciidoctor return output.decode("utf-8").strip() def is_asciidoctor_pdf_installed(): """Checks to see if the ruby gem for asciidoctor-pdf is installed""" # cmd = "gem list asciidoctor-pdf -i" cmd = "which asciidoctor-pdf" process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE) output, error = process.communicate() return output.decode("utf-8").strip() def verify_signing_hash(hash): """Attempts to validate the existence of the certificate provided by the hash""" with tempfile.NamedTemporaryFile(mode="w") as in_file: unsigned_tmp_file_path = in_file.name in_file.write("temporary file for signing") cmd = f"security cms -S -Z {hash} -i {unsigned_tmp_file_path}" FNULL = open(os.devnull, "w") process = subprocess.Popen(cmd.split(), stdout=FNULL, stderr=FNULL) output, error = process.communicate() if process.returncode == 0: return True else: return False def sign_config_profile(in_file, out_file, hash): """Signs the configuration profile using the identity associated with the provided hash""" cmd = f"security cms -S -Z {hash} -i {in_file} -o {out_file}" process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE) output, error = process.communicate() print(f"Signed Configuration profile written to {out_file}") return output.decode("utf-8") def parse_custom_references(reference): string = "\n" for item in reference: if isinstance(reference[item], list): string += "!" + str(item) + "\n!\n" for i in reference[item]: string += "* " + str(i) + "\n" else: string += "!" + str(item) + "!* " + str(reference[item]) + "\n" return string def parse_cis_references(reference): string = "\n" for item in reference: if isinstance(reference[item], list): string += "!CIS " + str(item).title() + "\n!\n" string += "* " for i in reference[item]: string += str(i) + "\n * " string = string[:-2] + "\n" else: string += "!" + str(item) + "!* " + str(reference[item]) + "\n" return string # Might have to do something similar to above for cmmc def main(): args = create_args() if args.debug: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.WARNING) output_basename = os.path.basename(args.baseline.name) output_filename = os.path.splitext(output_basename)[0] baseline_name = os.path.splitext(output_basename)[0] # .capitalize() file_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(file_dir) # stash current working directory original_working_directory = os.getcwd() # switch to the scripts directory os.chdir(file_dir) audit_name = args.audit_name if args.logo: logo = args.logo pdf_logo_path = logo else: logo = "../../templates/images/mscp_banner.png" pdf_logo_path = "../templates/images/mscp_banner.png" # convert logo to base64 for inline processing b64logo = base64.b64encode(open(pdf_logo_path, "rb").read()) build_path = os.path.join(parent_dir, "build", f"{baseline_name}") if not (os.path.isdir(build_path)): try: os.makedirs(build_path) except OSError: print(f"Creation of the directory {build_path} failed") else: for filename in os.listdir(build_path): file_path = os.path.join(build_path, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: print("Failed to delete %s. Reason: %s" % (file_path, e)) adoc_output_file = open(f"{build_path}/{output_filename}.adoc", "w") print("Profile YAML:", args.baseline.name) print("Output path:", adoc_output_file.name) if args.hash: signing = True if not verify_signing_hash(args.hash): sys.exit( "Cannot use the provided hash to sign. Please make sure you provide the subject key ID hash from an installed certificate" ) else: signing = False if args.reference: use_custom_reference = True log_reference = args.reference else: log_reference = "default" use_custom_reference = False baseline_yaml = yaml.load(args.baseline, Loader=yaml.SafeLoader) version_file = os.path.join(parent_dir, "VERSION.yaml") with open(version_file) as r: version_yaml = yaml.load(r, Loader=yaml.SafeLoader) adoc_templates = [ "adoc_rule_ios", "adoc_rule", "adoc_supplemental", "adoc_rule_no_setting", "adoc_rule_custom_refs", "adoc_section", "adoc_header", "adoc_footer", "adoc_foreword", "adoc_scope", "adoc_authors", "adoc_acronyms", "adoc_additional_docs", ] adoc_templates_dict = {} for template in adoc_templates: # custom template exists if template + ".adoc" in glob.glob1("../custom/templates/", "*.adoc"): print(f"Custom template found for : {template}") adoc_templates_dict[template] = f"../custom/templates/{template}.adoc" else: adoc_templates_dict[template] = f"../templates/{template}.adoc" # check for custom PDF theme (must have theme in the name and end with .yml) pdf_theme = "mscp-theme.yml" themes = glob.glob("../custom/templates/*theme*.yml") if len(themes) > 1: print( "Found multiple custom themes in directory, only one can exist, using default" ) elif len(themes) == 1: print(f"Found custom PDF theme: {themes[0]}") pdf_theme = themes[0] # Setup AsciiDoc templates with open(adoc_templates_dict["adoc_rule_ios"]) as adoc_rule_ios_file: adoc_rule_ios_template = Template(adoc_rule_ios_file.read()) with open(adoc_templates_dict["adoc_rule"]) as adoc_rule_file: adoc_rule_template = Template(adoc_rule_file.read()) with open(adoc_templates_dict["adoc_supplemental"]) as adoc_supplemental_file: adoc_supplemental_template = Template(adoc_supplemental_file.read()) with open(adoc_templates_dict["adoc_rule_no_setting"]) as adoc_rule_no_setting_file: adoc_rule_no_setting_template = Template(adoc_rule_no_setting_file.read()) with open( adoc_templates_dict["adoc_rule_custom_refs"] ) as adoc_rule_custom_refs_file: adoc_rule_custom_refs_template = Template(adoc_rule_custom_refs_file.read()) with open(adoc_templates_dict["adoc_section"]) as adoc_section_file: adoc_section_template = Template(adoc_section_file.read()) with open(adoc_templates_dict["adoc_header"]) as adoc_header_file: adoc_header_template = Template(adoc_header_file.read()) with open(adoc_templates_dict["adoc_footer"]) as adoc_footer_file: adoc_footer_template = Template(adoc_footer_file.read()) with open(adoc_templates_dict["adoc_foreword"]) as adoc_foreword_file: adoc_foreword_template = adoc_foreword_file.read() + "\n" with open(adoc_templates_dict["adoc_scope"]) as adoc_scope_file: adoc_scope_template = Template(adoc_scope_file.read() + "\n") with open(adoc_templates_dict["adoc_authors"]) as adoc_authors_file: adoc_authors_template = Template(adoc_authors_file.read() + "\n") with open(adoc_templates_dict["adoc_acronyms"]) as adoc_acronyms_file: adoc_acronyms_template = adoc_acronyms_file.read() + "\n" with open(adoc_templates_dict["adoc_additional_docs"]) as adoc_additional_docs_file: adoc_additional_docs_template = adoc_additional_docs_file.read() + "\n" # set tag attribute if "STIG" in baseline_yaml["title"].upper(): adoc_STIG_show = ":show_STIG:" else: adoc_STIG_show = ":show_STIG!:" if "CIS" in baseline_yaml["title"].upper(): adoc_cis_show = ":show_cis:" else: adoc_cis_show = ":show_cis!:" if "CMMC" in baseline_yaml["title"].upper(): adoc_cmmc_show = ":show_CMMC:" else: adoc_cmmc_show = ":show_CMMC!:" if "indigo" in baseline_yaml["title"]: adoc_indigo_show = ":show_indigo:" else: adoc_indigo_show = ":show_indigo!:" if "800" in baseline_yaml["title"]: adoc_171_show = ":show_171:" else: adoc_171_show = ":show_171!:" if args.gary: adoc_tag_show = ":show_tags:" adoc_STIG_show = ":show_STIG:" adoc_cis_show = ":show_cis:" adoc_cmmc_show = ":show_CMMC:" adoc_indigo_show = ":show_indigo:" adoc_171_show = ":show_171:" else: adoc_tag_show = ":show_tags!:" if "Tailored from" in baseline_yaml["title"]: s = baseline_yaml["title"].split(":")[1] adoc_html_subtitle = s.split("(")[0] adoc_html_subtitle2 = s[s.find("(") + 1 : s.find(")")] adoc_document_subtitle2 = f":document-subtitle2: {adoc_html_subtitle2}" else: adoc_html_subtitle = baseline_yaml["title"].split(":")[1] adoc_document_subtitle2 = ":document-subtitle2:" # Create header header_adoc = adoc_header_template.substitute( description=baseline_yaml["description"], html_header_title=baseline_yaml["title"], html_title=baseline_yaml["title"].split(":")[0], html_subtitle=adoc_html_subtitle, document_subtitle2=adoc_document_subtitle2, logo=logo, pdflogo=b64logo.decode("ascii"), pdf_theme=pdf_theme, tag_attribute=adoc_tag_show, nist171_attribute=adoc_171_show, stig_attribute=adoc_STIG_show, cis_attribute=adoc_cis_show, cmmc_attribute=adoc_cmmc_show, indigo_attribute=adoc_indigo_show, version=version_yaml["version"], os_version=version_yaml["os"], release_date=version_yaml["date"], ) # Create scope scope_adoc = adoc_scope_template.substitute( scope_description=baseline_yaml["description"] ) # Create author authors_adoc = adoc_authors_template.substitute( authors_list=baseline_yaml["authors"] ) # Output header adoc_output_file.write(header_adoc) # write foreword, authors, acronyms, supporting docs adoc_output_file.write(adoc_foreword_template) adoc_output_file.write(scope_adoc) adoc_output_file.write(authors_adoc) adoc_output_file.write(adoc_acronyms_template) adoc_output_file.write(adoc_additional_docs_template) # Create sections and rules for sections in baseline_yaml["profile"]: section_yaml_file = sections["section"].lower() + ".yaml" # check for custom section if section_yaml_file in glob.glob1("../custom/sections/", "*.y*ml"): # print(f"Custom settings found for section: {sections['section']}") override_section = os.path.join(f"../custom/sections/{section_yaml_file}") with open(override_section) as r: section_yaml = yaml.load(r, Loader=yaml.SafeLoader) else: with open(f"../sections/{section_yaml_file}") as s: section_yaml = yaml.load(s, Loader=yaml.SafeLoader) # Read section info and output it section_adoc = adoc_section_template.substitute( section_name=section_yaml["name"], description=section_yaml["description"] ) adoc_output_file.write(section_adoc) # Read all rules in the section and output them for rule in sections["rules"]: logging.debug(f"processing rule id: {rule}") rule_path = glob.glob("../rules/*/{}.y*ml".format(rule)) if not rule_path: print( f"Rule file not found in library, checking in custom folder for rule: {rule}" ) rule_path = glob.glob( "../custom/rules/**/{}.y*ml".format(rule), recursive=True ) try: rule_file = os.path.basename(rule_path[0]) except IndexError: logging.debug( f"defined rule {rule} does not have valid yaml file, check that rule ID and filename match." ) # check for custom rule if glob.glob("../custom/rules/**/{}.y*ml".format(rule), recursive=True): print(f"Custom settings found for rule: {rule}") # override_rule = glob.glob('../custom/rules/**/{}'.format(rule_file), recursive=True)[0] rule_location = glob.glob( "../custom/rules/**/{}.y*ml".format(rule), recursive=True )[0] custom = True else: rule_location = rule_path[0] custom = False rule_yaml = get_rule_yaml(rule_location, baseline_yaml, custom) # Determine if the references exist and set accordingly try: rule_yaml["references"]["cci"] except KeyError: cci = "N/A" else: cci = ulify(rule_yaml["references"]["cci"]) try: rule_yaml["references"]["cce"] except KeyError: cce = "- N/A" else: cce = ulify(rule_yaml["references"]["cce"]) try: rule_yaml["references"]["800-53r5"] except KeyError: nist_80053r5 = "N/A" else: nist_80053r5 = rule_yaml["references"]["800-53r5"] try: rule_yaml["references"]["800-171r3"] except KeyError: nist_800171 = "- N/A" else: nist_800171 = ulify(rule_yaml["references"]["800-171r3"]) try: rule_yaml["references"]["disa_stig"] except KeyError: disa_stig = "- N/A" else: disa_stig = ulify(rule_yaml["references"]["disa_stig"]) try: rule_yaml["references"]["cis"] except KeyError: cis = "" else: cis = parse_cis_references(rule_yaml["references"]["cis"]) try: rule_yaml["references"]["cmmc"] except KeyError: cmmc = "" else: cmmc = ulify(rule_yaml["references"]["cmmc"]) try: rule_yaml["references"]["indigo"] except KeyError: indigo = "" else: indigo = ulify(rule_yaml["references"]["indigo"]) try: rule_yaml["references"]["srg"] except KeyError: srg = "- N/A" else: srg = ulify(rule_yaml["references"]["srg"]) try: rule_yaml["references"]["sfr"] except KeyError: sfr = "- N/A" else: sfr = ulify(rule_yaml["references"]["sfr"]) try: rule_yaml["references"]["custom"] except KeyError: custom_refs = "" else: custom_refs = parse_custom_references(rule_yaml["references"]["custom"]) try: rule_yaml["fix"] except KeyError: rulefix = "No fix Found" else: rulefix = rule_yaml["fix"] # .replace('|', r'\|') try: rule_yaml["tags"] except KeyError: tags = "none" else: tags = ulify(rule_yaml["tags"]) try: result = rule_yaml["result"] except KeyError: result = "N/A" if "integer" in result: result_value = result["integer"] result_type = "integer" elif "boolean" in result: result_value = result["boolean"] result_type = "boolean" elif "string" in result: result_value = result["string"] result_type = "string" elif "base64" in result: result_value = result["base64"] else: result_value = "N/A" # determine severity, if severity is determined, build asciidoc table row for references # uses 'parent_values' from baseline.yaml file to determine which/if any severity to use severity = "" if "severity" in rule_yaml.keys(): if isinstance(rule_yaml["severity"], dict): try: severity = f"|Severity\n|{rule_yaml['severity'][baseline_yaml['parent_values']]}" except KeyError: severity = "" # determine if configprofile try: rule_yaml["mobileconfig"] except KeyError: pass else: if rule_yaml["mobileconfig"]: rulefix = format_mobileconfig_fix(rule_yaml["mobileconfig_info"]) # process nist controls for grouping if not nist_80053r5 == "N/A": nist_80053r5.sort() res = [ list(i) for j, i in groupby(nist_80053r5, lambda a: a.split("(")[0]) ] nist_controls = "" for i in res: nist_controls += group_ulify(i) else: nist_controls = "- N/A" if "manual" in tags: discussion = ( rule_yaml["discussion"] + "\n\nNOTE: This rule is marked as manual and may not be able to be automated. It is also excluded in the compliance scan and will not report any results.\n" ) else: discussion = rule_yaml["discussion"] if "supplemental" in tags: rule_adoc = adoc_supplemental_template.substitute( rule_title=rule_yaml["title"].replace("|", r"\|"), rule_id=rule_yaml["id"].replace("|", r"\|"), rule_discussion=discussion, ) elif custom_refs: rule_adoc = adoc_rule_custom_refs_template.substitute( rule_title=rule_yaml["title"].replace("|", r"\|"), rule_id=rule_yaml["id"].replace("|", r"\|"), rule_discussion=discussion, # .replace('|', r'\|'), rule_check=rule_yaml["check"], # .replace('|', r'\|'), rule_fix=rulefix, rule_cci=cci, rule_80053r5=nist_controls, rule_800171=nist_800171, rule_disa_stig=disa_stig, rule_cis=cis, rule_cmmc=cmmc, rule_indigo=indigo, rule_cce=cce, rule_custom_refs=custom_refs, rule_tags=tags, rule_srg=srg, rule_sfr=sfr, rule_result=result_value, severity=severity, ) elif ("permanent" in tags) or ("inherent" in tags) or ("n_a" in tags): rule_adoc = adoc_rule_no_setting_template.substitute( rule_title=rule_yaml["title"].replace("|", r"\|"), rule_id=rule_yaml["id"].replace("|", r"\|"), rule_discussion=discussion, # .replace('|', r'\|'), rule_check=rule_yaml["check"], # .replace('|', r'\|'), rule_fix=rulefix, rule_80053r5=nist_controls, rule_800171=nist_800171, rule_disa_stig=disa_stig, rule_cis=cis, rule_cmmc=cmmc, rule_indigo=indigo, rule_cce=cce, rule_tags=tags, rule_srg=srg, ) else: # using the same rule template for ios/ipados/visionos if ( version_yaml["platform"] == "iOS/iPadOS" or version_yaml["platform"] == "visionOS" ): rule_adoc = adoc_rule_ios_template.substitute( rule_title=rule_yaml["title"].replace("|", r"\|"), rule_id=rule_yaml["id"].replace("|", r"\|"), rule_discussion=discussion, # .replace('|', r'\|'), rule_check=rule_yaml["check"], # .replace('|', r'\|'), rule_fix=rulefix, rule_cci=cci, rule_80053r5=nist_controls, rule_800171=nist_800171, rule_disa_stig=disa_stig, rule_cis=cis, rule_cmmc=cmmc, rule_indigo=indigo, rule_cce=cce, rule_tags=tags, rule_srg=srg, rule_sfr=sfr, rule_result=result_value, severity=severity, ) else: rule_adoc = adoc_rule_template.substitute( rule_title=rule_yaml["title"].replace("|", r"\|"), rule_id=rule_yaml["id"].replace("|", r"\|"), rule_discussion=discussion, # .replace('|', r'\|'), rule_check=rule_yaml["check"], # .replace('|', r'\|'), rule_fix=rulefix, rule_cci=cci, rule_80053r5=nist_controls, rule_800171=nist_800171, rule_disa_stig=disa_stig, rule_cis=cis, rule_cmmc=cmmc, rule_indigo=indigo, rule_cce=cce, rule_tags=tags, rule_srg=srg, rule_sfr=sfr, rule_result=result_value, severity=severity, ) adoc_output_file.write(rule_adoc) # Create footer footer_adoc = adoc_footer_template.substitute() # Output footer adoc_output_file.write(footer_adoc) adoc_output_file.close() if args.audit_name: audit_name = args.audit_name else: audit_name = baseline_name if args.profiles or args.consolidated_profile: # Build message based on what's being generated messages = [] if args.profiles: messages.append("domain-specific") if args.consolidated_profile: messages.append("consolidated") print(f"Generating {' and '.join(messages)} configuration profiles...") # Single call to generate_profiles with both parameters generate_profiles( baseline_name, build_path, parent_dir, baseline_yaml, signing, args.hash, generate_domain=args.profiles, generate_consolidated=args.consolidated_profile ) if args.ddm: print("Generating declarative components...") generate_ddm(baseline_name, build_path, parent_dir, baseline_yaml) if args.script: print("Generating compliance script...") generate_script( baseline_name, audit_name, build_path, baseline_yaml, log_reference ) default_audit_plist(baseline_name, build_path, baseline_yaml) if args.xls: print("Generating excel document...") generate_xls(baseline_name, build_path, baseline_yaml) asciidoctor_path = is_asciidoctor_installed() if asciidoctor_path != "": print("Generating HTML file from AsciiDoc...") cmd = f"{asciidoctor_path} '{adoc_output_file.name}'" process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) process.communicate() elif os.path.exists("../bin/asciidoctor"): print("Generating HTML file from AsciiDoc...") cmd = f"'../bin/asciidoctor' '{adoc_output_file.name}'" process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) process.communicate() elif not os.path.exists("../bin/asciidoctor"): print( "Installing gem requirements - asciidoctor, asciidoctor-pdf, and rouge..." ) cmd = [ "/usr/bin/bundle", "install", "--gemfile", "../Gemfile", "--binstubs", "--path", "mscp_gems", ] subprocess.run(cmd) print("Generating HTML file from AsciiDoc...") cmd = f"'../bin/asciidoctor' '{adoc_output_file.name}'" process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) process.communicate() else: print( "If you would like to generate the PDF file from the AsciiDoc file, install the ruby gem for asciidoctor" ) # Don't create PDF if we are generating SCAP if not args.gary: asciidoctorPDF_path = is_asciidoctor_pdf_installed() if asciidoctorPDF_path != "": print("Generating PDF file from AsciiDoc...") cmd = f"{asciidoctorPDF_path} '{adoc_output_file.name}'" process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) process.communicate() elif os.path.exists("../bin/asciidoctor-pdf"): print("Generating PDF file from AsciiDoc...") cmd = f"'../bin/asciidoctor-pdf' '{adoc_output_file.name}'" process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) process.communicate() else: print( "If you would like to generate the PDF file from the AsciiDoc file, install the ruby gem for asciidoctor-pdf" ) # finally revert back to the prior directory os.chdir(original_working_directory) if __name__ == "__main__": main()