implement dkim optionally

This commit is contained in:
Matt Keeley
2025-12-03 16:26:44 -07:00
parent 955fff5bf9
commit 72f5ab9f9a
5 changed files with 120 additions and 14 deletions

View File

@@ -19,6 +19,7 @@ Well, Spoofy is different and here is why:
> 2. Accurate bulk lookups
> 3. Custom, manually tested spoof logic (No guessing or speculating, real world test results)
> 4. SPF DNS query counter
> 5. Optional DKIM selector enumeration via API
## PASSING TESTS
@@ -30,19 +31,22 @@ Well, Spoofy is different and here is why:
```console
Usage:
./spoofy.py -d [DOMAIN] -o [stdout or xls] -t [NUMBER_OF_THREADS]
./spoofy.py -d [DOMAIN] -o [stdout or xls] -t [NUMBER_OF_THREADS] [--dkim]
OR
./spoofy.py -iL [DOMAIN_LIST] -o [stdout or xls] -t [NUMBER_OF_THREADS]
./spoofy.py -iL [DOMAIN_LIST] -o [stdout or xls] -t [NUMBER_OF_THREADS] [--dkim]
Options:
-d : Process a single domain.
-iL : Provide a file containing a list of domains to process.
-o : Specify the output format: stdout (default) or xls.
-t : Set the number of threads to use (default: 4).
-d : Process a single domain.
-iL : Provide a file containing a list of domains to process.
-o : Specify the output format: stdout (default), xls, or json.
-t : Set the number of threads to use (default: 4).
--dkim : Enable DKIM selector enumeration via API (optional).
Examples:
./spoofy.py -d example.com -t 10
./spoofy.py -d example.com --dkim
./spoofy.py -iL domains.txt -o xls
./spoofy.py -iL domains.txt -o json --dkim
Install Dependencies:
pip3 install -r requirements.txt

86
modules/dkim.py Normal file
View File

@@ -0,0 +1,86 @@
# modules/dkim.py
import requests
class DKIM:
def __init__(self, domain, dns_server=None, api_base_url=None):
self.domain = domain
self.dns_server = dns_server
self.api_base_url = "https://archive.prove.email/api"
self.dkim_record = self.get_dkim_record()
def get_dkim_record(self):
"""Returns the DKIM records for a given domain using the API."""
try:
base_url = self.api_base_url.rstrip('/')
url = f"{base_url}/key"
params = {"domain": self.domain}
headers = {"accept": "application/json"}
response = requests.get(url, params=params, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
return self.format_dkim_records(data)
elif response.status_code == 400:
return None
elif response.status_code == 429:
return None
elif response.status_code == 500:
return None
else:
return None
except requests.exceptions.RequestException:
return None
except (KeyError, ValueError, TypeError):
return None
def format_dkim_records(self, api_response):
"""Formats the API response into a readable string format."""
combined_txt_records = ""
if not isinstance(api_response, list):
return None
records_by_key = {}
for record in api_response:
if not isinstance(record, dict):
continue
selector = record.get("selector", "unknown")
domain = record.get("domain", self.domain)
value = record.get("value", "")
last_seen = record.get("lastSeenAt", "")
key = f"{selector}._domainkey.{domain}"
if key not in records_by_key:
records_by_key[key] = record
else:
existing_last_seen = records_by_key[key].get("lastSeenAt", "")
if last_seen > existing_last_seen:
records_by_key[key] = record
for key, record in records_by_key.items():
selector = record.get("selector", "unknown")
domain = record.get("domain", self.domain)
value = record.get("value", "")
if len(value) > 128:
trimmed_value = value[:128] + "...(trimmed)"
else:
trimmed_value = value
combined_txt_records += (
f"[*] {selector}._domainkey.{domain} -> {trimmed_value}\r\n"
)
if combined_txt_records:
return combined_txt_records.strip()
else:
return None
def __str__(self):
return f"DKIM Record: {self.dkim_record}"

View File

@@ -57,6 +57,7 @@ def printer(**kwargs):
sp = kwargs.get("DMARC_SP")
fo = kwargs.get("DMARC_FORENSIC_REPORT")
rua = kwargs.get("DMARC_AGGREGATE_REPORT")
dkim_record = kwargs.get("DKIM")
bimi_record = kwargs.get("BIMI_RECORD")
vbimi = kwargs.get("BIMI_VERSION")
location = kwargs.get("BIMI_LOCATION")
@@ -125,6 +126,11 @@ def printer(**kwargs):
else:
output_message("[?]", "No DMARC record found.", "warning")
if dkim_record:
output_message("[*]", f"DKIM selectors: \r\n{dkim_record}", "info")
else:
output_message("[?]", f"No known DKIM selectors enumerated on {domain}.", "warning")
if bimi_record:
output_message("[*]", f"BIMI record: {bimi_record}", "info")
output_message("[*]", f"BIMI version: {vbimi}", "info")

View File

@@ -2,4 +2,5 @@ colorama
dnspython>= 2.2.1
tldextract
pandas
openpyxl
openpyxl
requests

View File

@@ -7,6 +7,7 @@ from queue import Queue
from modules.dns import DNS
from modules.spf import SPF
from modules.dmarc import DMARC
from modules.dkim import DKIM
from modules.bimi import BIMI
from modules.spoofing import Spoofing
from modules import report
@@ -14,8 +15,8 @@ from modules import report
print_lock = threading.Lock()
def process_domain(domain):
"""Process a domain to gather DNS, SPF, DMARC, and BIMI records, and evaluate spoofing potential."""
def process_domain(domain, enable_dkim=False):
"""Process a domain to gather DNS, SPF, DMARC, and BIMI records. Optionally enumerate DKIM selectors if enabled."""
dns_info = DNS(domain)
spf = SPF(domain, dns_info.dns_server)
dmarc = DMARC(domain, dns_info.dns_server)
@@ -34,6 +35,11 @@ def process_domain(domain):
dmarc_fo = dmarc.fo
dmarc_rua = dmarc.rua
dkim_record = None
if enable_dkim:
dkim = DKIM(domain, dns_info.dns_server)
dkim_record = dkim.dkim_record
bimi_record = bimi_info.bimi_record
bimi_version = bimi_info.version
bimi_location = bimi_info.location
@@ -70,6 +76,7 @@ def process_domain(domain):
"DMARC_SP": dmarc_sp,
"DMARC_FORENSIC_REPORT": dmarc_fo,
"DMARC_AGGREGATE_REPORT": dmarc_rua,
"DKIM": dkim_record,
"BIMI_RECORD": bimi_record,
"BIMI_VERSION": bimi_version,
"BIMI_LOCATION": bimi_location,
@@ -80,13 +87,13 @@ def process_domain(domain):
return result
def worker(domain_queue, print_lock, output, results):
def worker(domain_queue, print_lock, output, results, enable_dkim=False):
"""Worker function to process domains and output results."""
while True:
domain = domain_queue.get()
if domain is None:
break
result = process_domain(domain)
result = process_domain(domain, enable_dkim=enable_dkim)
with print_lock:
if output == "stdout":
report.printer(**result)
@@ -97,7 +104,7 @@ def worker(domain_queue, print_lock, output, results):
def main():
parser = argparse.ArgumentParser(
description="Process domains to gather DNS, SPF, DMARC, and BIMI records."
description="Process domains to gather DNS, SPF, DMARC, and BIMI records. Use --dkim to enable DKIM selector enumeration."
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-d", type=str, help="Single domain to process.")
@@ -114,7 +121,9 @@ def main():
parser.add_argument(
"-t", type=int, default=4, help="Number of threads to use (default: 4)"
)
parser.add_argument(
"--dkim", action="store_true", help="Enable DKIM selector enumeration via API"
)
args = parser.parse_args()
@@ -133,7 +142,7 @@ def main():
threads = []
for _ in range(min(args.t, len(domains))):
thread = threading.Thread(
target=worker, args=(domain_queue, print_lock, args.o, results)
target=worker, args=(domain_queue, print_lock, args.o, results, args.dkim)
)
thread.start()
threads.append(thread)