black formatting

This commit is contained in:
Matt Keeley
2024-08-10 23:42:33 -07:00
parent 5bf8a19648
commit eeff00fcbd
8 changed files with 235 additions and 119 deletions

View File

@@ -1 +1 @@
# modules/__init__.py
# modules/__init__.py

View File

@@ -2,6 +2,7 @@
import dns.resolver
class BIMI:
def __init__(self, domain, dns_server=None):
self.domain = domain
@@ -22,9 +23,9 @@ class BIMI:
resolver = dns.resolver.Resolver()
if self.dns_server:
resolver.nameservers = [self.dns_server]
bimi = resolver.resolve(f'default._bimi.{self.domain}', 'TXT')
bimi = resolver.resolve(f"default._bimi.{self.domain}", "TXT")
for record in bimi:
if 'v=BIMI' in str(record):
if "v=BIMI" in str(record):
return record
return None
except Exception:
@@ -53,7 +54,9 @@ class BIMI:
return self.version, self.location, self.authority
def __str__(self):
return (f"BIMI Record: {self.bimi_record}\n"
f"Version: {self.version}\n"
f"Location: {self.location}\n"
f"Authority: {self.authority}")
return (
f"BIMI Record: {self.bimi_record}\n"
f"Version: {self.version}\n"
f"Location: {self.location}\n"
f"Authority: {self.authority}"
)

View File

@@ -3,6 +3,7 @@
import dns.resolver
import tldextract
class DMARC:
def __init__(self, domain, dns_server=None):
self.domain = domain
@@ -28,7 +29,7 @@ class DMARC:
subdomain = tldextract.extract(self.domain).registered_domain
if subdomain != self.domain:
return self.get_dmarc_record_for_domain(subdomain)
return self.get_dmarc_record_for_domain(self.domain)
def get_dmarc_record_for_domain(self, domain):
@@ -42,7 +43,7 @@ class DMARC:
for dns_data in dmarc:
if "DMARC1" in str(dns_data):
return str(dns_data).replace('"', '')
return str(dns_data).replace('"', "")
return None
def get_dmarc_policy(self):
@@ -82,10 +83,12 @@ class DMARC:
return None
def __str__(self):
return (f"DMARC Record: {self.dmarc_record}\n"
f"Policy: {self.policy}\n"
f"Pct: {self.pct}\n"
f"ASPF: {self.aspf}\n"
f"Subdomain Policy: {self.sp}\n"
f"Forensic Report URI: {self.fo}\n"
f"Aggregate Report URI: {self.rua}")
return (
f"DMARC Record: {self.dmarc_record}\n"
f"Policy: {self.policy}\n"
f"Pct: {self.pct}\n"
f"ASPF: {self.aspf}\n"
f"Subdomain Policy: {self.sp}\n"
f"Forensic Report URI: {self.fo}\n"
f"Aggregate Report URI: {self.rua}"
)

View File

@@ -6,6 +6,7 @@ from .spf import SPF
from .dmarc import DMARC
from .bimi import BIMI
class DNS:
def __init__(self, domain):
self.domain = domain
@@ -21,9 +22,9 @@ class DNS:
def get_soa_record(self):
"""Sets the SOA record and DNS server of a given domain."""
resolver = dns.resolver.Resolver()
resolver.nameservers = ['1.1.1.1']
resolver.nameservers = ["1.1.1.1"]
try:
query = resolver.resolve(self.domain, 'SOA')
query = resolver.resolve(self.domain, "SOA")
except Exception:
return
if query:
@@ -44,7 +45,7 @@ class DNS:
if self.spf_record.spf_record and self.dmarc_record.dmarc_record:
return
for ip_address in ['1.1.1.1', '8.8.8.8', '9.9.9.9']:
for ip_address in ["1.1.1.1", "8.8.8.8", "9.9.9.9"]:
self.spf_record = SPF(self.domain, ip_address)
self.dmarc_record = DMARC(self.domain, ip_address)
self.bimi_record = BIMI(self.domain, ip_address)
@@ -52,7 +53,7 @@ class DNS:
self.dns_server = ip_address
return
self.dns_server = '1.1.1.1'
self.dns_server = "1.1.1.1"
def get_txt_record(self, record_type):
"""Returns the TXT record of a given type for the domain."""
@@ -65,9 +66,11 @@ class DNS:
return None
def __str__(self):
return (f"Domain: {self.domain}\n"
f"SOA Record: {self.soa_record}\n"
f"DNS Server: {self.dns_server}\n"
f"SPF Record: {self.spf_record.spf_record}\n"
f"DMARC Record: {self.dmarc_record.dmarc_record}\n"
f"BIMI Record: {self.bimi_record.bimi_record}")
return (
f"Domain: {self.domain}\n"
f"SOA Record: {self.soa_record}\n"
f"DNS Server: {self.dns_server}\n"
f"SPF Record: {self.spf_record.spf_record}\n"
f"DMARC Record: {self.dmarc_record.dmarc_record}\n"
f"BIMI Record: {self.bimi_record.bimi_record}"
)

View File

@@ -7,6 +7,7 @@ from colorama import init, Fore, Style
# Initialize colorama
init()
def output_message(symbol, message, level="info"):
"""Generic function to print messages with different colors and symbols based on the level."""
colors = {
@@ -15,11 +16,12 @@ def output_message(symbol, message, level="info"):
"bad": Fore.RED + Style.BRIGHT,
"indifferent": Fore.BLUE + Style.BRIGHT,
"error": Fore.RED + Style.BRIGHT + "!!! ",
"info": Fore.WHITE + Style.BRIGHT
"info": Fore.WHITE + Style.BRIGHT,
}
color = colors.get(level, Fore.WHITE + Style.BRIGHT)
print(color + f"{symbol} {message}" + Style.RESET_ALL)
def write_to_excel(data, file_name="output.xlsx"):
"""Writes a DataFrame of data to an Excel file, appending if the file exists."""
if os.path.exists(file_name) and os.path.getsize(file_name) > 0:
@@ -30,27 +32,28 @@ def write_to_excel(data, file_name="output.xlsx"):
else:
pd.DataFrame(data).to_excel(file_name, index=False)
def printer(**kwargs):
"""Utility function to print the results of DMARC, SPF, and BIMI checks in the original format."""
domain = kwargs.get('DOMAIN')
subdomain = kwargs.get('DOMAIN_TYPE') == 'subdomain'
dns_server = kwargs.get('DNS_SERVER')
spf_record = kwargs.get('SPF')
spf_all = kwargs.get('SPF_MULTIPLE_ALLS')
spf_dns_query_count = kwargs.get('SPF_NUM_DNS_QUERIES')
dmarc_record = kwargs.get('DMARC')
p = kwargs.get('DMARC_POLICY')
pct = kwargs.get('DMARC_PCT')
aspf = kwargs.get('DMARC_ASPF')
sp = kwargs.get('DMARC_SP')
fo = kwargs.get('DMARC_FORENSIC_REPORT')
rua = kwargs.get('DMARC_AGGREGATE_REPORT')
bimi_record = kwargs.get('BIMI_RECORD')
vbimi = kwargs.get('BIMI_VERSION')
location = kwargs.get('BIMI_LOCATION')
authority = kwargs.get('BIMI_AUTHORITY')
spoofable = kwargs.get('SPOOFING_POSSIBLE')
spoofing_type = kwargs.get('SPOOFING_TYPE')
domain = kwargs.get("DOMAIN")
subdomain = kwargs.get("DOMAIN_TYPE") == "subdomain"
dns_server = kwargs.get("DNS_SERVER")
spf_record = kwargs.get("SPF")
spf_all = kwargs.get("SPF_MULTIPLE_ALLS")
spf_dns_query_count = kwargs.get("SPF_NUM_DNS_QUERIES")
dmarc_record = kwargs.get("DMARC")
p = kwargs.get("DMARC_POLICY")
pct = kwargs.get("DMARC_PCT")
aspf = kwargs.get("DMARC_ASPF")
sp = kwargs.get("DMARC_SP")
fo = kwargs.get("DMARC_FORENSIC_REPORT")
rua = kwargs.get("DMARC_AGGREGATE_REPORT")
bimi_record = kwargs.get("BIMI_RECORD")
vbimi = kwargs.get("BIMI_VERSION")
location = kwargs.get("BIMI_LOCATION")
authority = kwargs.get("BIMI_AUTHORITY")
spoofable = kwargs.get("SPOOFING_POSSIBLE")
spoofing_type = kwargs.get("SPOOFING_TYPE")
output_message("[*]", f"Domain: {domain}", "indifferent")
output_message("[*]", f"Is subdomain: {subdomain}", "indifferent")
@@ -61,21 +64,55 @@ def printer(**kwargs):
if spf_all is None:
output_message("[*]", "SPF does not contain an `All` item.", "info")
elif spf_all == "2many":
output_message("[?]", "SPF record contains multiple `All` items.", "warning")
output_message(
"[?]", "SPF record contains multiple `All` items.", "warning"
)
else:
output_message("[*]", f"SPF all record: {spf_all}", "info")
output_message("[*]", f"SPF DNS query count: {spf_dns_query_count}" if spf_dns_query_count <= 10 else f"Too many SPF DNS query lookups {spf_dns_query_count}.", "info")
output_message(
"[*]",
f"SPF DNS query count: {spf_dns_query_count}"
if spf_dns_query_count <= 10
else f"Too many SPF DNS query lookups {spf_dns_query_count}.",
"info",
)
else:
output_message("[?]", "No SPF record found.", "warning")
if dmarc_record:
output_message("[*]", f"DMARC record: {dmarc_record}", "info")
output_message("[*]", f"Found DMARC policy: {p}" if p else "No DMARC policy found.", "info")
output_message("[*]", f"Found DMARC pct: {pct}" if pct else "No DMARC pct found.", "info")
output_message("[*]", f"Found DMARC aspf: {aspf}" if aspf else "No DMARC aspf found.", "info")
output_message("[*]", f"Found DMARC subdomain policy: {sp}" if sp else "No DMARC subdomain policy found.", "info")
output_message("[*]", f"Forensics reports will be sent: {fo}" if fo else "No DMARC forensics report location found.", "indifferent")
output_message("[*]", f"Aggregate reports will be sent to: {rua}" if rua else "No DMARC aggregate report location found.", "indifferent")
output_message(
"[*]", f"Found DMARC policy: {p}" if p else "No DMARC policy found.", "info"
)
output_message(
"[*]", f"Found DMARC pct: {pct}" if pct else "No DMARC pct found.", "info"
)
output_message(
"[*]",
f"Found DMARC aspf: {aspf}" if aspf else "No DMARC aspf found.",
"info",
)
output_message(
"[*]",
f"Found DMARC subdomain policy: {sp}"
if sp
else "No DMARC subdomain policy found.",
"info",
)
output_message(
"[*]",
f"Forensics reports will be sent: {fo}"
if fo
else "No DMARC forensics report location found.",
"indifferent",
)
output_message(
"[*]",
f"Aggregate reports will be sent to: {rua}"
if rua
else "No DMARC aggregate report location found.",
"indifferent",
)
else:
output_message("[?]", "No DMARC record found.", "warning")

View File

@@ -3,6 +3,7 @@
import dns.resolver
import re
class SPF:
def __init__(self, domain, dns_server=None):
self.domain = domain
@@ -23,11 +24,11 @@ class SPF:
if not domain:
domain = self.domain
resolver = dns.resolver.Resolver()
resolver.nameservers = [self.dns_server, '1.1.1.1', '8.8.8.8']
query_result = resolver.resolve(domain, 'TXT')
resolver.nameservers = [self.dns_server, "1.1.1.1", "8.8.8.8"]
query_result = resolver.resolve(domain, "TXT")
for record in query_result:
if 'spf1' in str(record):
spf_record = str(record).replace('"', '')
if "spf1" in str(record):
spf_record = str(record).replace('"', "")
return spf_record
return None
except Exception:
@@ -35,18 +36,18 @@ class SPF:
def get_spf_all_string(self):
"""Returns the string value of the 'all' mechanism in the SPF record."""
spf_record = self.spf_record
visited_domains = set()
while spf_record:
all_matches = re.findall(r'[-~?+]all', spf_record)
all_matches = re.findall(r"[-~?+]all", spf_record)
if len(all_matches) == 1:
return all_matches[0]
elif len(all_matches) > 1:
return '2many'
return "2many"
redirect_match = re.search(r'redirect=([\w.-]+)', spf_record)
redirect_match = re.search(r"redirect=([\w.-]+)", spf_record)
if redirect_match:
redirect_domain = redirect_match.group(1)
if redirect_domain in visited_domains:
@@ -57,42 +58,45 @@ class SPF:
break
return None
def get_spf_dns_queries(self):
"""Returns the number of dns queries, redirects, and other mechanisms in the SPF record for a given domain."""
def count_dns_queries(spf_record):
count = 0
for item in spf_record.split():
if item.startswith("include:") or item.startswith("redirect="):
if item.startswith("include:"):
url = item.replace('include:', '')
url = item.replace("include:", "")
elif item.startswith("redirect="):
url = item.replace('redirect=', '')
url = item.replace("redirect=", "")
count += 1
try:
# Recursively fetch and count dns queries or redirects in the SPF record of the referenced domain
answers = dns.resolver.resolve(url, 'TXT')
answers = dns.resolver.resolve(url, "TXT")
for rdata in answers:
for txt_string in rdata.strings:
txt_record = txt_string.decode('utf-8')
if txt_record.startswith('v=spf1'):
txt_record = txt_string.decode("utf-8")
if txt_record.startswith("v=spf1"):
count += count_dns_queries(txt_record)
except Exception:
pass
# Count occurrences of 'a', 'mx', 'ptr', and 'exists' mechanisms
count += len(re.findall(r"[ ,+]a[ ,:]", spf_record))
count += len(re.findall(r"[ ,+]mx[ ,:]", spf_record))
count += len(re.findall(r"[ ]ptr[ ]", spf_record))
count += len(re.findall(r"exists[:]", spf_record))
return count
return count_dns_queries(self.spf_record)
def __str__(self):
return (f"SPF Record: {self.spf_record}\n"
f"All Mechanism: {self.all_mechanism}\n"
f"DNS Query Count: {self.spf_dns_query_count}\n"
f"Too Many DNS Queries: {self.too_many_dns_queries}")
return (
f"SPF Record: {self.spf_record}\n"
f"All Mechanism: {self.all_mechanism}\n"
f"DNS Query Count: {self.spf_dns_query_count}\n"
f"Too Many DNS Queries: {self.too_many_dns_queries}"
)

View File

@@ -2,6 +2,7 @@
import tldextract
class Spoofing:
def __init__(self, domain, p, aspf, spf_record, spf_all, spf_dns_queries, sp, pct):
self.domain = domain
@@ -45,11 +46,23 @@ class Spoofing:
return 1
elif self.aspf is None and self.sp == "none":
return 1
elif self.p == "none" and (self.aspf == "r" or self.aspf is None) and self.sp is None:
elif (
self.p == "none"
and (self.aspf == "r" or self.aspf is None)
and self.sp is None
):
return 4
elif self.p == "none" and self.aspf == "r" and (self.sp == "reject" or self.sp == "quarantine"):
elif (
self.p == "none"
and self.aspf == "r"
and (self.sp == "reject" or self.sp == "quarantine")
):
return 2
elif self.p == "none" and self.aspf is None and (self.sp == "reject" or self.sp == "quarantine"):
elif (
self.p == "none"
and self.aspf is None
and (self.sp == "reject" or self.sp == "quarantine")
):
return 5
elif self.p == "none" and self.aspf is None and self.sp == "none":
return 7
@@ -62,26 +75,48 @@ class Spoofing:
return 0
elif self.p == "none" and self.sp == "none":
return 7
elif (self.p == "reject" or self.p == "quarantine") and self.aspf is None and self.sp == "none":
elif (
(self.p == "reject" or self.p == "quarantine")
and self.aspf is None
and self.sp == "none"
):
return 1
elif (self.p == "reject" or self.p == "quarantine") and self.aspf and self.sp == "none":
elif (
(self.p == "reject" or self.p == "quarantine")
and self.aspf
and self.sp == "none"
):
return 1
else:
return 8
elif self.spf_all == "?all":
if (self.p == "reject" or self.p == "quarantine") and self.aspf and self.sp == "none":
if (
(self.p == "reject" or self.p == "quarantine")
and self.aspf
and self.sp == "none"
):
return 6
elif (self.p == "reject" or self.p == "quarantine") and self.aspf is None and self.sp == "none":
elif (
(self.p == "reject" or self.p == "quarantine")
and self.aspf is None
and self.sp == "none"
):
return 6
elif self.p == "none" and self.aspf == "r" and self.sp is None:
return 0
elif self.p == "none" and self.aspf == "r" and self.sp == "none":
return 7
elif self.p == "none" and self.aspf == "s" or None and self.sp == "none":
elif (
self.p == "none" and self.aspf == "s" or None and self.sp == "none"
):
return 7
elif self.p == "none" and self.aspf == "s" or None and self.sp is None:
return 6
elif self.p == "none" and self.aspf and (self.sp == "reject" or self.sp == "quarantine"):
elif (
self.p == "none"
and self.aspf
and (self.sp == "reject" or self.sp == "quarantine")
):
return 5
elif self.p == "none" and self.aspf is None and self.sp == "reject":
return 5
@@ -104,10 +139,12 @@ class Spoofing:
5: f"Organizational domain spoofing might be possible for {self.domain}.",
6: f"Subdomain spoofing might be possible (Mailbox dependent) for {self.domain}.",
7: f"Subdomain spoofing is possible and organizational domain spoofing might be possible for {self.domain}.",
8: f"Spoofing is not possible for {self.domain}."
8: f"Spoofing is not possible for {self.domain}.",
}
spoofing_type = spoofing_types.get(self.spoofable, f"Unknown spoofing type for {self.domain}.")
spoofing_type = spoofing_types.get(
self.spoofable, f"Unknown spoofing type for {self.domain}."
)
if self.spoofable in {0, 1, 3, 7}:
spoofing_possible = True
@@ -119,7 +156,9 @@ class Spoofing:
return spoofing_possible, spoofing_type
def __str__(self):
return (f"Domain: {self.domain}\n"
f"Domain Type: {self.domain_type}\n"
f"Spoofing Possible: {self.spoofing_possible}\n"
f"Spoofing Type: {self.spoofing_type}")
return (
f"Domain: {self.domain}\n"
f"Domain Type: {self.domain_type}\n"
f"Spoofing Possible: {self.spoofing_possible}\n"
f"Spoofing Type: {self.spoofing_type}"
)

View File

@@ -8,6 +8,7 @@ from modules.bimi import BIMI
from modules.spoofing import Spoofing
from modules import report
def process_domain(domain):
"""Process a domain to gather DNS, SPF, DMARC, and BIMI records, and evaluate spoofing potential."""
dns_info = DNS(domain)
@@ -33,36 +34,46 @@ def process_domain(domain):
bimi_location = bimi_info.location
bimi_authority = bimi_info.authority
spoofing_info = Spoofing(domain, dmarc_p, dmarc_aspf, spf_record, spf_all, spf_dns_query_count, dmarc_sp, dmarc_pct)
spoofing_info = Spoofing(
domain,
dmarc_p,
dmarc_aspf,
spf_record,
spf_all,
spf_dns_query_count,
dmarc_sp,
dmarc_pct,
)
domain_type = spoofing_info.domain_type
spoofing_possible = spoofing_info.spoofing_possible
spoofing_type = spoofing_info.spoofing_type
result = {
'DOMAIN': domain,
'DOMAIN_TYPE': domain_type,
'DNS_SERVER': dns_info.dns_server,
'SPF': spf_record,
'SPF_MULTIPLE_ALLS': spf_all,
'SPF_NUM_DNS_QUERIES': spf_dns_query_count,
'SPF_TOO_MANY_DNS_QUERIES': spf_too_many_dns_queries,
'DMARC': dmarc_record,
'DMARC_POLICY': dmarc_p,
'DMARC_PCT': dmarc_pct,
'DMARC_ASPF': dmarc_aspf,
'DMARC_SP': dmarc_sp,
'DMARC_FORENSIC_REPORT': dmarc_fo,
'DMARC_AGGREGATE_REPORT': dmarc_rua,
'BIMI_RECORD': bimi_record,
'BIMI_VERSION': bimi_version,
'BIMI_LOCATION': bimi_location,
'BIMI_AUTHORITY': bimi_authority,
'SPOOFING_POSSIBLE': spoofing_possible,
'SPOOFING_TYPE': spoofing_type
"DOMAIN": domain,
"DOMAIN_TYPE": domain_type,
"DNS_SERVER": dns_info.dns_server,
"SPF": spf_record,
"SPF_MULTIPLE_ALLS": spf_all,
"SPF_NUM_DNS_QUERIES": spf_dns_query_count,
"SPF_TOO_MANY_DNS_QUERIES": spf_too_many_dns_queries,
"DMARC": dmarc_record,
"DMARC_POLICY": dmarc_p,
"DMARC_PCT": dmarc_pct,
"DMARC_ASPF": dmarc_aspf,
"DMARC_SP": dmarc_sp,
"DMARC_FORENSIC_REPORT": dmarc_fo,
"DMARC_AGGREGATE_REPORT": dmarc_rua,
"BIMI_RECORD": bimi_record,
"BIMI_VERSION": bimi_version,
"BIMI_LOCATION": bimi_location,
"BIMI_AUTHORITY": bimi_authority,
"SPOOFING_POSSIBLE": spoofing_possible,
"SPOOFING_TYPE": spoofing_type,
}
return result
def worker(domain_queue, result_queue):
"""Worker function to process domains and put results into the result queue."""
while not domain_queue.empty():
@@ -72,20 +83,33 @@ def worker(domain_queue, result_queue):
result = process_domain(domain)
result_queue.put(result)
domain_queue.task_done()
def process_domain_and_output(domain, output, results):
"""Process a domain and handle output based on the specified format."""
result = process_domain(domain)
if output == 'stdout':
if output == "stdout":
report.printer(**result)
else:
results.append(result)
def main():
parser = argparse.ArgumentParser(description="Process domains to gather DNS, SPF, DMARC, and BIMI records.")
parser = argparse.ArgumentParser(
description="Process domains to gather DNS, SPF, DMARC, and BIMI records."
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-d", type=str, help="Single domain to process.")
group.add_argument("-iL", type=str, help="File containing a list of domains to process.")
parser.add_argument("-o", type=str, choices=['stdout', 'xls'], default='stdout', help="Output format: stdout or xls (default: stdout).")
group.add_argument(
"-iL", type=str, help="File containing a list of domains to process."
)
parser.add_argument(
"-o",
type=str,
choices=["stdout", "xls"],
default="stdout",
help="Output format: stdout or xls (default: stdout).",
)
args = parser.parse_args()
@@ -102,7 +126,9 @@ def main():
# Start threads to process each domain
for domain in domains:
thread = threading.Thread(target=process_domain_and_output, args=(domain, args.o, results))
thread = threading.Thread(
target=process_domain_and_output, args=(domain, args.o, results)
)
thread.start()
threads.append(thread)
@@ -111,9 +137,10 @@ def main():
thread.join()
# Handle output for xls format
if args.o == 'xls' and results:
if args.o == "xls" and results:
report.write_to_excel(results)
print("Results written to output.xlsx")
if __name__ == "__main__":
main()
main()