diff --git a/modules/__init__.py b/modules/__init__.py index 5a3656f..f307358 100644 --- a/modules/__init__.py +++ b/modules/__init__.py @@ -1 +1 @@ -# modules/__init__.py \ No newline at end of file +# modules/__init__.py diff --git a/modules/bimi.py b/modules/bimi.py index 80e8c44..240c604 100644 --- a/modules/bimi.py +++ b/modules/bimi.py @@ -2,6 +2,7 @@ import dns.resolver + class BIMI: def __init__(self, domain, dns_server=None): self.domain = domain @@ -22,9 +23,9 @@ class BIMI: resolver = dns.resolver.Resolver() if self.dns_server: resolver.nameservers = [self.dns_server] - bimi = resolver.resolve(f'default._bimi.{self.domain}', 'TXT') + bimi = resolver.resolve(f"default._bimi.{self.domain}", "TXT") for record in bimi: - if 'v=BIMI' in str(record): + if "v=BIMI" in str(record): return record return None except Exception: @@ -53,7 +54,9 @@ class BIMI: return self.version, self.location, self.authority def __str__(self): - return (f"BIMI Record: {self.bimi_record}\n" - f"Version: {self.version}\n" - f"Location: {self.location}\n" - f"Authority: {self.authority}") + return ( + f"BIMI Record: {self.bimi_record}\n" + f"Version: {self.version}\n" + f"Location: {self.location}\n" + f"Authority: {self.authority}" + ) diff --git a/modules/dmarc.py b/modules/dmarc.py index 226ff22..63144f8 100644 --- a/modules/dmarc.py +++ b/modules/dmarc.py @@ -3,6 +3,7 @@ import dns.resolver import tldextract + class DMARC: def __init__(self, domain, dns_server=None): self.domain = domain @@ -28,7 +29,7 @@ class DMARC: subdomain = tldextract.extract(self.domain).registered_domain if subdomain != self.domain: return self.get_dmarc_record_for_domain(subdomain) - + return self.get_dmarc_record_for_domain(self.domain) def get_dmarc_record_for_domain(self, domain): @@ -42,7 +43,7 @@ class DMARC: for dns_data in dmarc: if "DMARC1" in str(dns_data): - return str(dns_data).replace('"', '') + return str(dns_data).replace('"', "") return None def get_dmarc_policy(self): @@ -82,10 +83,12 @@ class DMARC: return None def __str__(self): - return (f"DMARC Record: {self.dmarc_record}\n" - f"Policy: {self.policy}\n" - f"Pct: {self.pct}\n" - f"ASPF: {self.aspf}\n" - f"Subdomain Policy: {self.sp}\n" - f"Forensic Report URI: {self.fo}\n" - f"Aggregate Report URI: {self.rua}") + return ( + f"DMARC Record: {self.dmarc_record}\n" + f"Policy: {self.policy}\n" + f"Pct: {self.pct}\n" + f"ASPF: {self.aspf}\n" + f"Subdomain Policy: {self.sp}\n" + f"Forensic Report URI: {self.fo}\n" + f"Aggregate Report URI: {self.rua}" + ) diff --git a/modules/dns.py b/modules/dns.py index 600def8..68a75b5 100644 --- a/modules/dns.py +++ b/modules/dns.py @@ -6,6 +6,7 @@ from .spf import SPF from .dmarc import DMARC from .bimi import BIMI + class DNS: def __init__(self, domain): self.domain = domain @@ -21,9 +22,9 @@ class DNS: def get_soa_record(self): """Sets the SOA record and DNS server of a given domain.""" resolver = dns.resolver.Resolver() - resolver.nameservers = ['1.1.1.1'] + resolver.nameservers = ["1.1.1.1"] try: - query = resolver.resolve(self.domain, 'SOA') + query = resolver.resolve(self.domain, "SOA") except Exception: return if query: @@ -44,7 +45,7 @@ class DNS: if self.spf_record.spf_record and self.dmarc_record.dmarc_record: return - for ip_address in ['1.1.1.1', '8.8.8.8', '9.9.9.9']: + for ip_address in ["1.1.1.1", "8.8.8.8", "9.9.9.9"]: self.spf_record = SPF(self.domain, ip_address) self.dmarc_record = DMARC(self.domain, ip_address) self.bimi_record = BIMI(self.domain, ip_address) @@ -52,7 +53,7 @@ class DNS: self.dns_server = ip_address return - self.dns_server = '1.1.1.1' + self.dns_server = "1.1.1.1" def get_txt_record(self, record_type): """Returns the TXT record of a given type for the domain.""" @@ -65,9 +66,11 @@ class DNS: return None def __str__(self): - return (f"Domain: {self.domain}\n" - f"SOA Record: {self.soa_record}\n" - f"DNS Server: {self.dns_server}\n" - f"SPF Record: {self.spf_record.spf_record}\n" - f"DMARC Record: {self.dmarc_record.dmarc_record}\n" - f"BIMI Record: {self.bimi_record.bimi_record}") + return ( + f"Domain: {self.domain}\n" + f"SOA Record: {self.soa_record}\n" + f"DNS Server: {self.dns_server}\n" + f"SPF Record: {self.spf_record.spf_record}\n" + f"DMARC Record: {self.dmarc_record.dmarc_record}\n" + f"BIMI Record: {self.bimi_record.bimi_record}" + ) diff --git a/modules/report.py b/modules/report.py index a277739..4116fa5 100644 --- a/modules/report.py +++ b/modules/report.py @@ -7,6 +7,7 @@ from colorama import init, Fore, Style # Initialize colorama init() + def output_message(symbol, message, level="info"): """Generic function to print messages with different colors and symbols based on the level.""" colors = { @@ -15,11 +16,12 @@ def output_message(symbol, message, level="info"): "bad": Fore.RED + Style.BRIGHT, "indifferent": Fore.BLUE + Style.BRIGHT, "error": Fore.RED + Style.BRIGHT + "!!! ", - "info": Fore.WHITE + Style.BRIGHT + "info": Fore.WHITE + Style.BRIGHT, } color = colors.get(level, Fore.WHITE + Style.BRIGHT) print(color + f"{symbol} {message}" + Style.RESET_ALL) + def write_to_excel(data, file_name="output.xlsx"): """Writes a DataFrame of data to an Excel file, appending if the file exists.""" if os.path.exists(file_name) and os.path.getsize(file_name) > 0: @@ -30,27 +32,28 @@ def write_to_excel(data, file_name="output.xlsx"): else: pd.DataFrame(data).to_excel(file_name, index=False) + def printer(**kwargs): """Utility function to print the results of DMARC, SPF, and BIMI checks in the original format.""" - domain = kwargs.get('DOMAIN') - subdomain = kwargs.get('DOMAIN_TYPE') == 'subdomain' - dns_server = kwargs.get('DNS_SERVER') - spf_record = kwargs.get('SPF') - spf_all = kwargs.get('SPF_MULTIPLE_ALLS') - spf_dns_query_count = kwargs.get('SPF_NUM_DNS_QUERIES') - dmarc_record = kwargs.get('DMARC') - p = kwargs.get('DMARC_POLICY') - pct = kwargs.get('DMARC_PCT') - aspf = kwargs.get('DMARC_ASPF') - sp = kwargs.get('DMARC_SP') - fo = kwargs.get('DMARC_FORENSIC_REPORT') - rua = kwargs.get('DMARC_AGGREGATE_REPORT') - bimi_record = kwargs.get('BIMI_RECORD') - vbimi = kwargs.get('BIMI_VERSION') - location = kwargs.get('BIMI_LOCATION') - authority = kwargs.get('BIMI_AUTHORITY') - spoofable = kwargs.get('SPOOFING_POSSIBLE') - spoofing_type = kwargs.get('SPOOFING_TYPE') + domain = kwargs.get("DOMAIN") + subdomain = kwargs.get("DOMAIN_TYPE") == "subdomain" + dns_server = kwargs.get("DNS_SERVER") + spf_record = kwargs.get("SPF") + spf_all = kwargs.get("SPF_MULTIPLE_ALLS") + spf_dns_query_count = kwargs.get("SPF_NUM_DNS_QUERIES") + dmarc_record = kwargs.get("DMARC") + p = kwargs.get("DMARC_POLICY") + pct = kwargs.get("DMARC_PCT") + aspf = kwargs.get("DMARC_ASPF") + sp = kwargs.get("DMARC_SP") + fo = kwargs.get("DMARC_FORENSIC_REPORT") + rua = kwargs.get("DMARC_AGGREGATE_REPORT") + bimi_record = kwargs.get("BIMI_RECORD") + vbimi = kwargs.get("BIMI_VERSION") + location = kwargs.get("BIMI_LOCATION") + authority = kwargs.get("BIMI_AUTHORITY") + spoofable = kwargs.get("SPOOFING_POSSIBLE") + spoofing_type = kwargs.get("SPOOFING_TYPE") output_message("[*]", f"Domain: {domain}", "indifferent") output_message("[*]", f"Is subdomain: {subdomain}", "indifferent") @@ -61,21 +64,55 @@ def printer(**kwargs): if spf_all is None: output_message("[*]", "SPF does not contain an `All` item.", "info") elif spf_all == "2many": - output_message("[?]", "SPF record contains multiple `All` items.", "warning") + output_message( + "[?]", "SPF record contains multiple `All` items.", "warning" + ) else: output_message("[*]", f"SPF all record: {spf_all}", "info") - output_message("[*]", f"SPF DNS query count: {spf_dns_query_count}" if spf_dns_query_count <= 10 else f"Too many SPF DNS query lookups {spf_dns_query_count}.", "info") + output_message( + "[*]", + f"SPF DNS query count: {spf_dns_query_count}" + if spf_dns_query_count <= 10 + else f"Too many SPF DNS query lookups {spf_dns_query_count}.", + "info", + ) else: output_message("[?]", "No SPF record found.", "warning") if dmarc_record: output_message("[*]", f"DMARC record: {dmarc_record}", "info") - output_message("[*]", f"Found DMARC policy: {p}" if p else "No DMARC policy found.", "info") - output_message("[*]", f"Found DMARC pct: {pct}" if pct else "No DMARC pct found.", "info") - output_message("[*]", f"Found DMARC aspf: {aspf}" if aspf else "No DMARC aspf found.", "info") - output_message("[*]", f"Found DMARC subdomain policy: {sp}" if sp else "No DMARC subdomain policy found.", "info") - output_message("[*]", f"Forensics reports will be sent: {fo}" if fo else "No DMARC forensics report location found.", "indifferent") - output_message("[*]", f"Aggregate reports will be sent to: {rua}" if rua else "No DMARC aggregate report location found.", "indifferent") + output_message( + "[*]", f"Found DMARC policy: {p}" if p else "No DMARC policy found.", "info" + ) + output_message( + "[*]", f"Found DMARC pct: {pct}" if pct else "No DMARC pct found.", "info" + ) + output_message( + "[*]", + f"Found DMARC aspf: {aspf}" if aspf else "No DMARC aspf found.", + "info", + ) + output_message( + "[*]", + f"Found DMARC subdomain policy: {sp}" + if sp + else "No DMARC subdomain policy found.", + "info", + ) + output_message( + "[*]", + f"Forensics reports will be sent: {fo}" + if fo + else "No DMARC forensics report location found.", + "indifferent", + ) + output_message( + "[*]", + f"Aggregate reports will be sent to: {rua}" + if rua + else "No DMARC aggregate report location found.", + "indifferent", + ) else: output_message("[?]", "No DMARC record found.", "warning") diff --git a/modules/spf.py b/modules/spf.py index 0c6b68c..2365e96 100644 --- a/modules/spf.py +++ b/modules/spf.py @@ -3,6 +3,7 @@ import dns.resolver import re + class SPF: def __init__(self, domain, dns_server=None): self.domain = domain @@ -23,11 +24,11 @@ class SPF: if not domain: domain = self.domain resolver = dns.resolver.Resolver() - resolver.nameservers = [self.dns_server, '1.1.1.1', '8.8.8.8'] - query_result = resolver.resolve(domain, 'TXT') + resolver.nameservers = [self.dns_server, "1.1.1.1", "8.8.8.8"] + query_result = resolver.resolve(domain, "TXT") for record in query_result: - if 'spf1' in str(record): - spf_record = str(record).replace('"', '') + if "spf1" in str(record): + spf_record = str(record).replace('"', "") return spf_record return None except Exception: @@ -35,18 +36,18 @@ class SPF: def get_spf_all_string(self): """Returns the string value of the 'all' mechanism in the SPF record.""" - + spf_record = self.spf_record visited_domains = set() while spf_record: - all_matches = re.findall(r'[-~?+]all', spf_record) + all_matches = re.findall(r"[-~?+]all", spf_record) if len(all_matches) == 1: return all_matches[0] elif len(all_matches) > 1: - return '2many' + return "2many" - redirect_match = re.search(r'redirect=([\w.-]+)', spf_record) + redirect_match = re.search(r"redirect=([\w.-]+)", spf_record) if redirect_match: redirect_domain = redirect_match.group(1) if redirect_domain in visited_domains: @@ -57,42 +58,45 @@ class SPF: break return None - + def get_spf_dns_queries(self): """Returns the number of dns queries, redirects, and other mechanisms in the SPF record for a given domain.""" + def count_dns_queries(spf_record): count = 0 for item in spf_record.split(): if item.startswith("include:") or item.startswith("redirect="): if item.startswith("include:"): - url = item.replace('include:', '') + url = item.replace("include:", "") elif item.startswith("redirect="): - url = item.replace('redirect=', '') - + url = item.replace("redirect=", "") + count += 1 try: # Recursively fetch and count dns queries or redirects in the SPF record of the referenced domain - answers = dns.resolver.resolve(url, 'TXT') + answers = dns.resolver.resolve(url, "TXT") for rdata in answers: for txt_string in rdata.strings: - txt_record = txt_string.decode('utf-8') - if txt_record.startswith('v=spf1'): + txt_record = txt_string.decode("utf-8") + if txt_record.startswith("v=spf1"): count += count_dns_queries(txt_record) except Exception: pass - + # Count occurrences of 'a', 'mx', 'ptr', and 'exists' mechanisms count += len(re.findall(r"[ ,+]a[ ,:]", spf_record)) count += len(re.findall(r"[ ,+]mx[ ,:]", spf_record)) count += len(re.findall(r"[ ]ptr[ ]", spf_record)) count += len(re.findall(r"exists[:]", spf_record)) - + return count return count_dns_queries(self.spf_record) - + def __str__(self): - return (f"SPF Record: {self.spf_record}\n" - f"All Mechanism: {self.all_mechanism}\n" - f"DNS Query Count: {self.spf_dns_query_count}\n" - f"Too Many DNS Queries: {self.too_many_dns_queries}") \ No newline at end of file + return ( + f"SPF Record: {self.spf_record}\n" + f"All Mechanism: {self.all_mechanism}\n" + f"DNS Query Count: {self.spf_dns_query_count}\n" + f"Too Many DNS Queries: {self.too_many_dns_queries}" + ) diff --git a/modules/spoofing.py b/modules/spoofing.py index 747f310..3a4ace8 100644 --- a/modules/spoofing.py +++ b/modules/spoofing.py @@ -2,6 +2,7 @@ import tldextract + class Spoofing: def __init__(self, domain, p, aspf, spf_record, spf_all, spf_dns_queries, sp, pct): self.domain = domain @@ -45,11 +46,23 @@ class Spoofing: return 1 elif self.aspf is None and self.sp == "none": return 1 - elif self.p == "none" and (self.aspf == "r" or self.aspf is None) and self.sp is None: + elif ( + self.p == "none" + and (self.aspf == "r" or self.aspf is None) + and self.sp is None + ): return 4 - elif self.p == "none" and self.aspf == "r" and (self.sp == "reject" or self.sp == "quarantine"): + elif ( + self.p == "none" + and self.aspf == "r" + and (self.sp == "reject" or self.sp == "quarantine") + ): return 2 - elif self.p == "none" and self.aspf is None and (self.sp == "reject" or self.sp == "quarantine"): + elif ( + self.p == "none" + and self.aspf is None + and (self.sp == "reject" or self.sp == "quarantine") + ): return 5 elif self.p == "none" and self.aspf is None and self.sp == "none": return 7 @@ -62,26 +75,48 @@ class Spoofing: return 0 elif self.p == "none" and self.sp == "none": return 7 - elif (self.p == "reject" or self.p == "quarantine") and self.aspf is None and self.sp == "none": + elif ( + (self.p == "reject" or self.p == "quarantine") + and self.aspf is None + and self.sp == "none" + ): return 1 - elif (self.p == "reject" or self.p == "quarantine") and self.aspf and self.sp == "none": + elif ( + (self.p == "reject" or self.p == "quarantine") + and self.aspf + and self.sp == "none" + ): return 1 else: return 8 elif self.spf_all == "?all": - if (self.p == "reject" or self.p == "quarantine") and self.aspf and self.sp == "none": + if ( + (self.p == "reject" or self.p == "quarantine") + and self.aspf + and self.sp == "none" + ): return 6 - elif (self.p == "reject" or self.p == "quarantine") and self.aspf is None and self.sp == "none": + elif ( + (self.p == "reject" or self.p == "quarantine") + and self.aspf is None + and self.sp == "none" + ): return 6 elif self.p == "none" and self.aspf == "r" and self.sp is None: return 0 elif self.p == "none" and self.aspf == "r" and self.sp == "none": return 7 - elif self.p == "none" and self.aspf == "s" or None and self.sp == "none": + elif ( + self.p == "none" and self.aspf == "s" or None and self.sp == "none" + ): return 7 elif self.p == "none" and self.aspf == "s" or None and self.sp is None: return 6 - elif self.p == "none" and self.aspf and (self.sp == "reject" or self.sp == "quarantine"): + elif ( + self.p == "none" + and self.aspf + and (self.sp == "reject" or self.sp == "quarantine") + ): return 5 elif self.p == "none" and self.aspf is None and self.sp == "reject": return 5 @@ -104,10 +139,12 @@ class Spoofing: 5: f"Organizational domain spoofing might be possible for {self.domain}.", 6: f"Subdomain spoofing might be possible (Mailbox dependent) for {self.domain}.", 7: f"Subdomain spoofing is possible and organizational domain spoofing might be possible for {self.domain}.", - 8: f"Spoofing is not possible for {self.domain}." + 8: f"Spoofing is not possible for {self.domain}.", } - spoofing_type = spoofing_types.get(self.spoofable, f"Unknown spoofing type for {self.domain}.") + spoofing_type = spoofing_types.get( + self.spoofable, f"Unknown spoofing type for {self.domain}." + ) if self.spoofable in {0, 1, 3, 7}: spoofing_possible = True @@ -119,7 +156,9 @@ class Spoofing: return spoofing_possible, spoofing_type def __str__(self): - return (f"Domain: {self.domain}\n" - f"Domain Type: {self.domain_type}\n" - f"Spoofing Possible: {self.spoofing_possible}\n" - f"Spoofing Type: {self.spoofing_type}") + return ( + f"Domain: {self.domain}\n" + f"Domain Type: {self.domain_type}\n" + f"Spoofing Possible: {self.spoofing_possible}\n" + f"Spoofing Type: {self.spoofing_type}" + ) diff --git a/spoofy.py b/spoofy.py index 9e25553..db534d9 100755 --- a/spoofy.py +++ b/spoofy.py @@ -8,6 +8,7 @@ from modules.bimi import BIMI from modules.spoofing import Spoofing from modules import report + def process_domain(domain): """Process a domain to gather DNS, SPF, DMARC, and BIMI records, and evaluate spoofing potential.""" dns_info = DNS(domain) @@ -33,36 +34,46 @@ def process_domain(domain): bimi_location = bimi_info.location bimi_authority = bimi_info.authority - spoofing_info = Spoofing(domain, dmarc_p, dmarc_aspf, spf_record, spf_all, spf_dns_query_count, dmarc_sp, dmarc_pct) + spoofing_info = Spoofing( + domain, + dmarc_p, + dmarc_aspf, + spf_record, + spf_all, + spf_dns_query_count, + dmarc_sp, + dmarc_pct, + ) domain_type = spoofing_info.domain_type spoofing_possible = spoofing_info.spoofing_possible spoofing_type = spoofing_info.spoofing_type result = { - 'DOMAIN': domain, - 'DOMAIN_TYPE': domain_type, - 'DNS_SERVER': dns_info.dns_server, - 'SPF': spf_record, - 'SPF_MULTIPLE_ALLS': spf_all, - 'SPF_NUM_DNS_QUERIES': spf_dns_query_count, - 'SPF_TOO_MANY_DNS_QUERIES': spf_too_many_dns_queries, - 'DMARC': dmarc_record, - 'DMARC_POLICY': dmarc_p, - 'DMARC_PCT': dmarc_pct, - 'DMARC_ASPF': dmarc_aspf, - 'DMARC_SP': dmarc_sp, - 'DMARC_FORENSIC_REPORT': dmarc_fo, - 'DMARC_AGGREGATE_REPORT': dmarc_rua, - 'BIMI_RECORD': bimi_record, - 'BIMI_VERSION': bimi_version, - 'BIMI_LOCATION': bimi_location, - 'BIMI_AUTHORITY': bimi_authority, - 'SPOOFING_POSSIBLE': spoofing_possible, - 'SPOOFING_TYPE': spoofing_type + "DOMAIN": domain, + "DOMAIN_TYPE": domain_type, + "DNS_SERVER": dns_info.dns_server, + "SPF": spf_record, + "SPF_MULTIPLE_ALLS": spf_all, + "SPF_NUM_DNS_QUERIES": spf_dns_query_count, + "SPF_TOO_MANY_DNS_QUERIES": spf_too_many_dns_queries, + "DMARC": dmarc_record, + "DMARC_POLICY": dmarc_p, + "DMARC_PCT": dmarc_pct, + "DMARC_ASPF": dmarc_aspf, + "DMARC_SP": dmarc_sp, + "DMARC_FORENSIC_REPORT": dmarc_fo, + "DMARC_AGGREGATE_REPORT": dmarc_rua, + "BIMI_RECORD": bimi_record, + "BIMI_VERSION": bimi_version, + "BIMI_LOCATION": bimi_location, + "BIMI_AUTHORITY": bimi_authority, + "SPOOFING_POSSIBLE": spoofing_possible, + "SPOOFING_TYPE": spoofing_type, } return result + def worker(domain_queue, result_queue): """Worker function to process domains and put results into the result queue.""" while not domain_queue.empty(): @@ -72,20 +83,33 @@ def worker(domain_queue, result_queue): result = process_domain(domain) result_queue.put(result) domain_queue.task_done() + + def process_domain_and_output(domain, output, results): """Process a domain and handle output based on the specified format.""" result = process_domain(domain) - if output == 'stdout': + if output == "stdout": report.printer(**result) else: results.append(result) + def main(): - parser = argparse.ArgumentParser(description="Process domains to gather DNS, SPF, DMARC, and BIMI records.") + parser = argparse.ArgumentParser( + description="Process domains to gather DNS, SPF, DMARC, and BIMI records." + ) group = parser.add_mutually_exclusive_group(required=True) group.add_argument("-d", type=str, help="Single domain to process.") - group.add_argument("-iL", type=str, help="File containing a list of domains to process.") - parser.add_argument("-o", type=str, choices=['stdout', 'xls'], default='stdout', help="Output format: stdout or xls (default: stdout).") + group.add_argument( + "-iL", type=str, help="File containing a list of domains to process." + ) + parser.add_argument( + "-o", + type=str, + choices=["stdout", "xls"], + default="stdout", + help="Output format: stdout or xls (default: stdout).", + ) args = parser.parse_args() @@ -102,7 +126,9 @@ def main(): # Start threads to process each domain for domain in domains: - thread = threading.Thread(target=process_domain_and_output, args=(domain, args.o, results)) + thread = threading.Thread( + target=process_domain_and_output, args=(domain, args.o, results) + ) thread.start() threads.append(thread) @@ -111,9 +137,10 @@ def main(): thread.join() # Handle output for xls format - if args.o == 'xls' and results: + if args.o == "xls" and results: report.write_to_excel(results) print("Results written to output.xlsx") + if __name__ == "__main__": - main() \ No newline at end of file + main()