fix: cleaning up code

This commit is contained in:
Matt Keeley
2023-02-22 12:43:07 -07:00
parent 01dcc6167e
commit e7472cdb66
4 changed files with 206 additions and 201 deletions

54
libs/Printer.py Normal file
View File

@@ -0,0 +1,54 @@
from libs.PrettyOutput import (
output_good,
output_bad,
output_warning,
output_info,
output_error,
output_indifferent
)
def printer(domain, dns_server, spf_record, spf_all, spf_includes,
dmarc_record, subdomain, p, pct, aspf, sp, fo, rua, is_spoofable):
output_indifferent(f"Domain: {domain}" )
output_indifferent(f"Is subdomain: {subdomain}" )
output_indifferent(f"DNS Server: {dns_server}")
if spf_record:
output_info(f"SPF record: {spf_record}")
if spf_all is None: output_info("SPF does not contain an `All` items.")
elif spf_all == "2many": output_warning("SPF record contains multiple `All` items.")
else: output_info(f"SPF all record: {spf_all}")
if spf_includes > 10: output_warning(f"Too many SPF include lookups {spf_includes}.")
else: output_info(f"SPF include count: {spf_includes}")
else: output_info("No SPF record found.")
if dmarc_record:
output_info(f"DMARC record: {dmarc_record}")
if p: output_info(f"Found DMARC policy: {p}")
else: output_info("No DMARC policy found.")
if pct: output_info(f"Found DMARC pct: {pct}")
else: output_info("No DMARC pct found.")
if aspf: output_info(f"Found DMARC aspf: {aspf}")
else: output_info("No DMARC aspf found.")
if sp: output_info(f"Found DMARC subdomain policy: {sp}")
else: output_info("No DMARC subdomain policy found.")
if fo: output_indifferent(f"Forensics reports will be sent: {fo}")
else: output_info("No DMARC foresnic report location found.")
if rua: output_indifferent(f"Aggregate reports will be sent to: {rua}")
else: output_info("No DMARC aggregate report location found.")
# Can change this to 3.10 case if you wanna. Im not going to.
if is_spoofable == 0: output_good("Spoofing possible for " + domain)
elif is_spoofable == 1: output_good("Subdomain spoofing possible for " + domain)
elif is_spoofable == 2: output_good("Organizational domain spoofing possible for " + domain)
elif is_spoofable == 3: output_warning("Spoofing might be possible for " + domain)
elif is_spoofable == 4: output_warning("Spoofing might be possible (Mailbox dependant) for " + domain)
elif is_spoofable == 5: output_warning("Organizational domain spoofing may be possible for " + domain)
elif is_spoofable == 6: output_warning("Subdomain spoofing might be possible (Mailbox dependant) for " + domain)
elif is_spoofable == 7: output_bad("Spoofing not possible for " + domain)
elif is_spoofable == 8: output_good("Subdomain spoofing possible for " + domain);output_good("Organizational domain spoofing possible for " + domain)
elif is_spoofable == 9: output_good("Subdomain spoofing possible for " + domain);output_warning("Organizational domain spoofing may be possible for " + domain)
else: output_error("Something went wrong.")
def error(msg):
output_error(msg)

42
libs/SpoofyLogic.py Normal file
View File

@@ -0,0 +1,42 @@
def is_spoofable(domain, p, aspf, spf_record, spf_all, spf_includes, sp, pct):
try:
if pct and int(pct) != 100: return 3
elif spf_record is None:
if p is None: return 0
else: return 7
elif spf_includes > 10 and p is None:
return 0
elif spf_all == "2many":
if p == "none": return 3
else: return 7
elif spf_all and p is None: return 0
elif spf_all == "-all":
if p and aspf and sp == "none": return 1
elif aspf is None and sp == "none": return 1
elif p == "none" and (aspf == "r" or aspf is None) and sp is None: return 4
elif p == "none" and aspf == "r" and (sp == "reject" or sp == "quarentine"): return 2
elif p == "none" and aspf is None and (sp == "reject" or sp == "quarentine"): return 5
elif p == "none" and aspf is None and sp == "none": return 8
else: return 7
elif spf_all == "~all":
if p == "none" and sp == "reject" or sp == "quarentine": return 2
elif p == "none" and sp is None: return 0
elif p == "none" and sp == "none": return 8
elif (p == "reject" or p == "quarentine") and aspf is None and sp == "none": return 1
elif (p == "reject" or p == "quarentine") and aspf and sp == "none": return 1
else: return 7
elif spf_all == "?all":
if (p == "reject" or p == "quarentine") and aspf and sp == "none": return 6
elif (p == "reject" or p == "quarentine") and aspf is None and sp == "none": return 6
elif p == "none" and aspf == "r" and sp is None: return 0
elif p == "none" and aspf == "r" and sp == "none": return 8
elif p == "none" and aspf == "s" or None and sp == "none": return 8
elif p == "none" and aspf == "s" or None and sp is None: return 6
elif p == "none" and aspf and (sp == "reject" or sp == "quarentine"):return 5
elif p == "none" and aspf is None and sp == "reject": return 5
else: return 7
else: return 7
except:
print("If you hit this error message, Open an issue with your testcase.")

View File

@@ -1,3 +1,5 @@
colorama
dnspython>= 2.2.1
tldextract
tldextract
pandas
openpyxl

307
spoofy.py
View File

@@ -3,44 +3,29 @@
import argparse, dns.resolver, tldextract, socket, re
from concurrent.futures import ThreadPoolExecutor, as_completed
from colorama import init as color_init
import pandas as pd
from libs.Printer import printer, error
from libs.SpoofyLogic import is_spoofable
from libs.PrettyOutput import (
output_good,
output_bad,
output_warning,
output_info,
output_error,
output_indifferent
)
temp = dns.resolver.Resolver()
# Changes on every lookup to the DNS server specified in the SOA record of the domain.
spoofy_resolver = dns.resolver.Resolver()
spoofy_resolver.nameservers = ['1.1.1.1']
def get_soa_record(domain):
temp.nameservers[0] = '1.1.1.1'
query = temp.resolve(domain, 'SOA')
dns_server = ""
if query:
for data in query: dns_server = str(data.mname)
return socket.gethostbyname(dns_server)
return None
def get_dns_server(domain):
try:
dns_server = ""
spoofy_resolver.nameservers = ['1.1.1.1']
query = spoofy_resolver.resolve(domain, 'SOA')
if query:
for data in query: dns_server = str(data.mname)
return socket.gethostbyname(dns_server)
else:
output_error("DNS Server was not found from SOA Record. Using default 1.1.1.1!")
except:
output_error("Failed to find SOA for domain.")
return "1.1.1.1"
def get_spf_record(domain):
def get_spf_record(domain, dns_server):
spf = None
try:
try: spf = spoofy_resolver.resolve(domain , 'TXT')
try:
temp.nameservers[0] = dns_server
spf = temp.resolve(domain , 'TXT')
except:
spoofy_resolver.nameservers[0] = '1.1.1.1'
spf = spoofy_resolver.resolve(domain , 'TXT')
spf_record = ""
return None
for dns_data in spf:
if 'spf1' in str(dns_data):
spf_record = str(dns_data).replace('"','')
@@ -50,209 +35,131 @@ def get_spf_record(domain):
except:
return None
def get_spf_all_string(spf_record):
count = spf_record.count(" ~all") + spf_record.count(" ?all") + spf_record.count(" -all")
if count == 1:
record = re.search("[-,~,?]all", spf_record).group(0)
output_info(f"Found SPF all record: {record}")
return record
elif count == 0:
output_info("SPF does not contain an `All` items.")
return None
else:
output_warning("SPF record contains multiple `All` items.")
return "2many"
elif count == 0: return None
else: return "2many"
def get_spf_includes(domain):
count = get_includes_for_domain(domain, [])
if count > 10:
output_warning(f"Too many SPF include lookups {count}.")
return count
else: return count
def get_list_of_includes(domain):
spf_record = ""
includes = []
def get_spf_includes(domain, count=0):
if count > 10: return count
try:
try:
spf_record = get_spf_record(domain)
except:
output_error("Could not find SPF record for " + domain)
spf_record = get_spf_record(domain, '1.1.1.1')
if spf_record:
count = len(re.compile("[ ,+]a[ , :]").findall(spf_record))
count += len(re.compile("[ ,+]a[ ,:]").findall(spf_record))
count += len(re.compile("[ ,+]mx[ ,:]").findall(spf_record))
count += len(re.compile("[ ]ptr[ ]").findall(spf_record))
count += len(re.compile("exists[:]").findall(spf_record))
for i in range(0, count): # Since other function is recursive and I dont want to figure out how to count, add a domain with 1 include.
includes.append("18f.gov")
for item in spf_record.split(' '):
url = item.replace('include:', '')
if "include:" in item:
includes.append(url)
except:
print("HIT EXCEPTION")
return includes
url = item.replace('include:', '')
count = get_spf_includes(url, count + 1)
except:
pass
# uncomment for debug. Shows if cannot find SPF record for a domain
#print("Could not find SPF record for " + domain)
return count
def get_includes_for_domain(domain, list):
# Recursively check the includes for a given domain
for i in get_list_of_includes(domain):
if i == "18f.gov": # this is scuffed but it works.
list.append(i)
get_includes_for_domain(i, list)
if i not in list:
list.append(i)
get_includes_for_domain(i, list)
return len(list)
def get_dmarc_record(domain):
if bool(tldextract.extract(domain).subdomain): return get_dmarc_org_policy(domain)
else:
try:
try: dmarc = spoofy_resolver.resolve('_dmarc.' + domain , 'TXT')
except:
spoofy_resolver.nameservers[0] = '1.1.1.1'
dmarc = spoofy_resolver.resolve('_dmarc.' + domain , 'TXT')
dmarc_record = ""
for dns_data in dmarc:
if 'DMARC1' in str(dns_data):
dmarc_record = str(dns_data).replace('"','')
output_info(f"Found DMARC record: {dmarc_record}")
break
return dmarc_record
def get_dmarc_record(domain, dns_server):
def dmarc(domain):
try:
temp.nameservers[0] = dns_server
dmarc = temp.resolve('_dmarc.' + domain , 'TXT')
except:
output_info("No DMARC record found")
return None
def get_dmarc_org_policy(subdomain):
domain = ".".join(subdomain.split('.')[1:])
try:
spoofy_resolver.nameservers = ['1.1.1.1']
dmarc = spoofy_resolver.resolve('_dmarc.' + domain , 'TXT')
return None
dmarc_record = ""
for dns_data in dmarc:
if 'DMARC1' in str(dns_data):
dmarc_record = str(dns_data).replace('"','')
output_info(f"Found DMARC record: {dmarc_record}")
break
return dmarc_record
except:
output_bad("No organizational record.")
dmarc_record = dmarc(domain)
if dmarc_record:
return dmarc_record
else:
subdomain = tldextract.extract(domain).registered_domain
if subdomain != domain:
sub_dmarc = dmarc(subdomain)
if sub_dmarc:
return sub_dmarc
return None
def get_dmarc_policy(dmarc_record):
if "p=" in str(dmarc_record):
policy = str(dmarc_record).split("p=")[1].split(";")[0]
output_info(f"Found DMARC policy: {policy}")
return policy
if "p=" in str(dmarc_record): return str(dmarc_record).split("p=")[1].split(";")[0]
else: return None
def get_dmarc_reports(dmarc_record):
if "ruf=" in str(dmarc_record) and "fo=1" in str(dmarc_record):
fo = str(dmarc_record).split("ruf=")[1].split(";")[0]
output_indifferent(f"Forensics reports will be sent: {fo}")
if "rua=" in str(dmarc_record):
rua = str(dmarc_record).split("rua=")[1].split(";")[0]
output_indifferent(f"Aggregate reports will be sent to: {rua}")
def get_dmarc_pct(dmarc_record):
if "pct" in str(dmarc_record):
pct = str(dmarc_record).split("pct=")[1].split(";")[0]
output_info(f"Found DMARC pct: {pct}")
return pct
if "pct" in str(dmarc_record): return str(dmarc_record).split("pct=")[1].split(";")[0]
else: return None
def get_dmarc_aspf(dmarc_record):
if "aspf=" in str(dmarc_record):
aspf = str(dmarc_record).split("aspf=")[1].split(";")[0]
output_info(f"Found DMARC aspf: {aspf}")
return aspf
if "aspf=" in str(dmarc_record): return str(dmarc_record).split("aspf=")[1].split(";")[0]
else: return None
def get_dmarc_subdomain_policy(dmarc_record):
if "sp=" in str(dmarc_record):
subdomain_policy = str(dmarc_record).split("sp=")[1].split(";")[0]
output_info(f"Found DMARC subdomain policy: {subdomain_policy}")
return subdomain_policy
if "sp=" in str(dmarc_record):return str(dmarc_record).split("sp=")[1].split(";")[0]
else: return None
def get_forensics_report(dmarc_record):
if "ruf=" in str(dmarc_record) and "fo=1" in str(dmarc_record): return str(dmarc_record).split("ruf=")[1].split(";")[0]
else: return None
# Table thanks to @Calamity
def is_spoofable(domain, p, aspf, spf_record, spf_all, spf_includes, sp, pct):
try:
if pct and int(pct) != 100:
output_warning("Spoofing might be possible for " + domain)
elif spf_record is None:
if p is None: output_good("Spoofing possible for " + domain)
else: output_bad("Spoofing not possible for " + domain)
elif spf_includes > 10 and p is None:
output_good("Spoofing possible for " + domain)
elif spf_all == "2many":
if p == "none": output_warning("Spoofing might be possible for " + domain)
else: output_bad("Spoofing not possible for " + domain)
elif spf_all and p is None: output_good("Spoofing possible for " + domain)
elif spf_all == "-all":
if p and aspf and sp == "none": output_good("Subdomain spoofing possible for " + domain)
elif aspf is None and sp == "none": output_good("Subdomain spoofing possible for " + domain)
elif p == "none" and (aspf == "r" or aspf is None) and sp is None: output_warning("Spoofing might be possible (Mailbox dependant) for " + domain)
elif p == "none" and aspf == "r" and (sp == "reject" or sp == "quarentine"): output_good("Organizational domain spoofing possible for " + domain)
elif p == "none" and aspf is None and (sp == "reject" or sp == "quarentine"): output_warning("Organizational domain spoofing may be possible for " + domain)
elif p == "none" and aspf is None and sp == "none": output_good("Subdomain spoofing possible for " + domain);output_warning("Organizational domain spoofing may be possible for " + domain)
else: output_bad("Spoofing not possible for " + domain)
elif spf_all == "~all":
if p == "none" and sp == "reject" or sp == "quarentine": output_good("Organizational domain spoofing possible for " + domain)
elif p == "none" and sp is None: output_good("Spoofing possible for " + domain)
elif p == "none" and sp == "none": output_good("Subdomain spoofing possible for " + domain);output_good("Organizational domain spoofing possible for " + domain)
elif (p == "reject" or p == "quarentine") and aspf is None and sp == "none": output_good("Subdomain spoofing possible for " + domain)
elif (p == "reject" or p == "quarentine") and aspf and sp == "none": output_good("Subdomain spoofing possible for " + domain)
else: output_bad("Spoofing not possible for " + domain)
elif spf_all == "?all":
if (p == "reject" or p == "quarentine") and aspf and sp == "none": output_warning("Subdomain spoofing might be possible (Mailbox dependant) for " + domain)
elif (p == "reject" or p == "quarentine") and aspf is None and sp == "none": output_warning("Subdomain spoofing might be possible (Mailbox dependant) for " + domain)
elif p == "none" and aspf == "r" and sp is None: output_good("Spoofing possible for " + domain)
elif p == "none" and aspf == "r" and sp == "none": output_good("Subdomain spoofing possible for " + domain);output_good("Organizational domain spoofing possible for " + domain)
elif p == "none" and aspf == "s" or None and sp == "none": output_good("Subdomain spoofing possible for " + domain);output_warning("Organizational domain spoofing may be possible for " + domain)
elif p == "none" and aspf == "s" or None and sp is None: output_warning("Subdomain spoofing might be possible (Mailbox dependant) for " + domain)
elif p == "none" and aspf and (sp == "reject" or sp == "quarentine"):output_warning("Organizational domain spoofing may be possible for " + domain)
elif p == "none" and aspf is None and sp == "reject": output_warning("Organizational domain spoofing may be possible for " + domain)
else: output_bad("Spoofing not possible for " + domain)
else: output_bad("Spoofing not possible for " + domain)
except:
output_error("If you hit this error message, something is really messed up.")
def get_aggregate_report(dmarc_record):
if "rua=" in str(dmarc_record): return str(dmarc_record).split("rua=")[1].split(";")[0]
else: return None
def find_dns_server(domain):
SOA = get_soa_record(domain)
if SOA:
temp.nameservers[0] = SOA
spf = get_spf_record(domain, SOA)
dmarc = get_dmarc_record(domain, SOA)
if (spf is not None) or (dmarc is not None):
return SOA, spf, dmarc
spf = get_spf_record(domain, '1.1.1.1')
dmarc = get_dmarc_record(domain, '1.1.1.1')
if (spf is not None) or (dmarc is not None):
return '1.1.1.1', spf, dmarc
spf = get_spf_record(domain, '8.8.8.8')
dmarc = get_dmarc_record(domain, '8.8.8.8')
if (spf is not None) or (dmarc is not None):
return '8.8.8.8', spf, dmarc
return None
def orchestrator(domains):
for domain in domains:
try:
# Initiate Variables
dns_server=None;
spf_record=None; spf_all=None; spf_includes=None;
dmarc_record=None; subdomain=bool(tldextract.extract(domain).subdomain);
p=None; pct=None; aspf=None; sp=None; forensics_report=None;
dns_server, spf_record, dmarc_record = find_dns_server(domain)
# If subdomain, find the org policy
# if not, find the dmarc record
if spf_record:
spf_all = get_spf_all_string(spf_record)
spf_includes = get_spf_includes(domain)
if dmarc_record:
p = get_dmarc_policy(dmarc_record)
pct = get_dmarc_pct(dmarc_record)
aspf = get_dmarc_aspf(dmarc_record)
sp = get_dmarc_subdomain_policy(dmarc_record)
fo = get_forensics_report(dmarc_record)
rua = get_aggregate_report(dmarc_record)
spoofable = is_spoofable(domain, p, aspf, spf_record, spf_all, spf_includes, sp, pct)
print(spoofable)
printer(domain, dns_server, spf_record, spf_all, spf_includes,
dmarc_record, subdomain, p, pct, aspf, sp, fo, rua, spoofable)
except: error("Domain format cannot be interpreted.")
def check_domains(domains):
for domain in domains:
try:
p=None; aspf=None; spf_record=None; spf_all=None;
spf_includes=None; sp=None; pct=None;
spoofy_resolver.nameservers[0] = get_dns_server(domain)
output_indifferent("Domain: " + domain)
spf_record = get_spf_record(domain)
if spf_record:
output_info(f"Found SPF record: {spf_record}")
spf_all = get_spf_all_string(spf_record)
spf_includes = get_spf_includes(domain)
else: output_info("No SPF record found.")
spoofy_resolver.nameservers[0] = get_dns_server(domain)
dmarc_record = get_dmarc_record(domain)
if dmarc_record:
p = get_dmarc_policy(dmarc_record)
aspf = get_dmarc_aspf(dmarc_record)
sp = get_dmarc_subdomain_policy(dmarc_record)
pct = get_dmarc_pct(dmarc_record)
is_spoofable(domain, p, aspf, spf_record, spf_all, spf_includes, sp, pct)
print("\n")
except: output_error("Domain format cannot be interpreted.")
if __name__ == "__main__":
@@ -268,7 +175,7 @@ if __name__ == "__main__":
try:
with open(options.iL, "r") as f:
for line in f: domains.append(line.strip('\n'))
except IOError: output_error("File doesnt exist or cannot be read.")
except IOError: error("File doesnt exist or cannot be read.")
if options.d:
domains.append(options.d)
check_domains(domains)
orchestrator(domains)