From 72f5ab9f9af34c0a8affd2ee5335bc0f1cf60128 Mon Sep 17 00:00:00 2001 From: Matt Keeley Date: Wed, 3 Dec 2025 16:26:44 -0700 Subject: [PATCH] implement dkim optionally --- README.md | 16 +++++---- modules/dkim.py | 86 +++++++++++++++++++++++++++++++++++++++++++++++ modules/report.py | 6 ++++ requirements.txt | 3 +- spoofy.py | 23 +++++++++---- 5 files changed, 120 insertions(+), 14 deletions(-) create mode 100644 modules/dkim.py diff --git a/README.md b/README.md index 23c79d8..c3bcbb4 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ Well, Spoofy is different and here is why: > 2. Accurate bulk lookups > 3. Custom, manually tested spoof logic (No guessing or speculating, real world test results) > 4. SPF DNS query counter +> 5. Optional DKIM selector enumeration via API ## PASSING TESTS @@ -30,19 +31,22 @@ Well, Spoofy is different and here is why: ```console Usage: - ./spoofy.py -d [DOMAIN] -o [stdout or xls] -t [NUMBER_OF_THREADS] + ./spoofy.py -d [DOMAIN] -o [stdout or xls] -t [NUMBER_OF_THREADS] [--dkim] OR - ./spoofy.py -iL [DOMAIN_LIST] -o [stdout or xls] -t [NUMBER_OF_THREADS] + ./spoofy.py -iL [DOMAIN_LIST] -o [stdout or xls] -t [NUMBER_OF_THREADS] [--dkim] Options: - -d : Process a single domain. - -iL : Provide a file containing a list of domains to process. - -o : Specify the output format: stdout (default) or xls. - -t : Set the number of threads to use (default: 4). + -d : Process a single domain. + -iL : Provide a file containing a list of domains to process. + -o : Specify the output format: stdout (default), xls, or json. + -t : Set the number of threads to use (default: 4). + --dkim : Enable DKIM selector enumeration via API (optional). Examples: ./spoofy.py -d example.com -t 10 + ./spoofy.py -d example.com --dkim ./spoofy.py -iL domains.txt -o xls + ./spoofy.py -iL domains.txt -o json --dkim Install Dependencies: pip3 install -r requirements.txt diff --git a/modules/dkim.py b/modules/dkim.py new file mode 100644 index 0000000..a99a5e2 --- /dev/null +++ b/modules/dkim.py @@ -0,0 +1,86 @@ +# modules/dkim.py + +import requests + + +class DKIM: + def __init__(self, domain, dns_server=None, api_base_url=None): + self.domain = domain + self.dns_server = dns_server + self.api_base_url = "https://archive.prove.email/api" + self.dkim_record = self.get_dkim_record() + + def get_dkim_record(self): + """Returns the DKIM records for a given domain using the API.""" + try: + base_url = self.api_base_url.rstrip('/') + url = f"{base_url}/key" + params = {"domain": self.domain} + headers = {"accept": "application/json"} + + response = requests.get(url, params=params, headers=headers, timeout=10) + + if response.status_code == 200: + data = response.json() + return self.format_dkim_records(data) + elif response.status_code == 400: + return None + elif response.status_code == 429: + return None + elif response.status_code == 500: + return None + else: + return None + except requests.exceptions.RequestException: + return None + except (KeyError, ValueError, TypeError): + return None + + def format_dkim_records(self, api_response): + """Formats the API response into a readable string format.""" + combined_txt_records = "" + + if not isinstance(api_response, list): + return None + + records_by_key = {} + for record in api_response: + if not isinstance(record, dict): + continue + + selector = record.get("selector", "unknown") + domain = record.get("domain", self.domain) + value = record.get("value", "") + last_seen = record.get("lastSeenAt", "") + + key = f"{selector}._domainkey.{domain}" + + if key not in records_by_key: + records_by_key[key] = record + else: + existing_last_seen = records_by_key[key].get("lastSeenAt", "") + if last_seen > existing_last_seen: + records_by_key[key] = record + + for key, record in records_by_key.items(): + selector = record.get("selector", "unknown") + domain = record.get("domain", self.domain) + value = record.get("value", "") + + if len(value) > 128: + trimmed_value = value[:128] + "...(trimmed)" + else: + trimmed_value = value + + combined_txt_records += ( + f"[*] {selector}._domainkey.{domain} -> {trimmed_value}\r\n" + ) + + if combined_txt_records: + return combined_txt_records.strip() + else: + return None + + def __str__(self): + return f"DKIM Record: {self.dkim_record}" + diff --git a/modules/report.py b/modules/report.py index 12a318c..d0c4587 100644 --- a/modules/report.py +++ b/modules/report.py @@ -57,6 +57,7 @@ def printer(**kwargs): sp = kwargs.get("DMARC_SP") fo = kwargs.get("DMARC_FORENSIC_REPORT") rua = kwargs.get("DMARC_AGGREGATE_REPORT") + dkim_record = kwargs.get("DKIM") bimi_record = kwargs.get("BIMI_RECORD") vbimi = kwargs.get("BIMI_VERSION") location = kwargs.get("BIMI_LOCATION") @@ -125,6 +126,11 @@ def printer(**kwargs): else: output_message("[?]", "No DMARC record found.", "warning") + if dkim_record: + output_message("[*]", f"DKIM selectors: \r\n{dkim_record}", "info") + else: + output_message("[?]", f"No known DKIM selectors enumerated on {domain}.", "warning") + if bimi_record: output_message("[*]", f"BIMI record: {bimi_record}", "info") output_message("[*]", f"BIMI version: {vbimi}", "info") diff --git a/requirements.txt b/requirements.txt index db4c3a2..aa0c4fd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ colorama dnspython>= 2.2.1 tldextract pandas -openpyxl \ No newline at end of file +openpyxl +requests \ No newline at end of file diff --git a/spoofy.py b/spoofy.py index 14ee76b..57df96b 100755 --- a/spoofy.py +++ b/spoofy.py @@ -7,6 +7,7 @@ from queue import Queue from modules.dns import DNS from modules.spf import SPF from modules.dmarc import DMARC +from modules.dkim import DKIM from modules.bimi import BIMI from modules.spoofing import Spoofing from modules import report @@ -14,8 +15,8 @@ from modules import report print_lock = threading.Lock() -def process_domain(domain): - """Process a domain to gather DNS, SPF, DMARC, and BIMI records, and evaluate spoofing potential.""" +def process_domain(domain, enable_dkim=False): + """Process a domain to gather DNS, SPF, DMARC, and BIMI records. Optionally enumerate DKIM selectors if enabled.""" dns_info = DNS(domain) spf = SPF(domain, dns_info.dns_server) dmarc = DMARC(domain, dns_info.dns_server) @@ -34,6 +35,11 @@ def process_domain(domain): dmarc_fo = dmarc.fo dmarc_rua = dmarc.rua + dkim_record = None + if enable_dkim: + dkim = DKIM(domain, dns_info.dns_server) + dkim_record = dkim.dkim_record + bimi_record = bimi_info.bimi_record bimi_version = bimi_info.version bimi_location = bimi_info.location @@ -70,6 +76,7 @@ def process_domain(domain): "DMARC_SP": dmarc_sp, "DMARC_FORENSIC_REPORT": dmarc_fo, "DMARC_AGGREGATE_REPORT": dmarc_rua, + "DKIM": dkim_record, "BIMI_RECORD": bimi_record, "BIMI_VERSION": bimi_version, "BIMI_LOCATION": bimi_location, @@ -80,13 +87,13 @@ def process_domain(domain): return result -def worker(domain_queue, print_lock, output, results): +def worker(domain_queue, print_lock, output, results, enable_dkim=False): """Worker function to process domains and output results.""" while True: domain = domain_queue.get() if domain is None: break - result = process_domain(domain) + result = process_domain(domain, enable_dkim=enable_dkim) with print_lock: if output == "stdout": report.printer(**result) @@ -97,7 +104,7 @@ def worker(domain_queue, print_lock, output, results): def main(): parser = argparse.ArgumentParser( - description="Process domains to gather DNS, SPF, DMARC, and BIMI records." + description="Process domains to gather DNS, SPF, DMARC, and BIMI records. Use --dkim to enable DKIM selector enumeration." ) group = parser.add_mutually_exclusive_group(required=True) group.add_argument("-d", type=str, help="Single domain to process.") @@ -114,7 +121,9 @@ def main(): parser.add_argument( "-t", type=int, default=4, help="Number of threads to use (default: 4)" ) - + parser.add_argument( + "--dkim", action="store_true", help="Enable DKIM selector enumeration via API" + ) args = parser.parse_args() @@ -133,7 +142,7 @@ def main(): threads = [] for _ in range(min(args.t, len(domains))): thread = threading.Thread( - target=worker, args=(domain_queue, print_lock, args.o, results) + target=worker, args=(domain_queue, print_lock, args.o, results, args.dkim) ) thread.start() threads.append(thread)