Merge pull request #42 from MattKeeley/dkim-implementation

DKIM Implementation
This commit is contained in:
Matt Keeley
2025-12-07 16:07:06 -07:00
committed by GitHub
5 changed files with 120 additions and 14 deletions

View File

@@ -19,6 +19,7 @@ Well, Spoofy is different and here is why:
> 2. Accurate bulk lookups > 2. Accurate bulk lookups
> 3. Custom, manually tested spoof logic (No guessing or speculating, real world test results) > 3. Custom, manually tested spoof logic (No guessing or speculating, real world test results)
> 4. SPF DNS query counter > 4. SPF DNS query counter
> 5. Optional DKIM selector enumeration via API
## PASSING TESTS ## PASSING TESTS
@@ -30,19 +31,22 @@ Well, Spoofy is different and here is why:
```console ```console
Usage: Usage:
./spoofy.py -d [DOMAIN] -o [stdout or xls] -t [NUMBER_OF_THREADS] ./spoofy.py -d [DOMAIN] -o [stdout or xls] -t [NUMBER_OF_THREADS] [--dkim]
OR OR
./spoofy.py -iL [DOMAIN_LIST] -o [stdout or xls] -t [NUMBER_OF_THREADS] ./spoofy.py -iL [DOMAIN_LIST] -o [stdout or xls] -t [NUMBER_OF_THREADS] [--dkim]
Options: Options:
-d : Process a single domain. -d : Process a single domain.
-iL : Provide a file containing a list of domains to process. -iL : Provide a file containing a list of domains to process.
-o : Specify the output format: stdout (default) or xls. -o : Specify the output format: stdout (default), xls, or json.
-t : Set the number of threads to use (default: 4). -t : Set the number of threads to use (default: 4).
--dkim : Enable DKIM selector enumeration via API (optional).
Examples: Examples:
./spoofy.py -d example.com -t 10 ./spoofy.py -d example.com -t 10
./spoofy.py -d example.com --dkim
./spoofy.py -iL domains.txt -o xls ./spoofy.py -iL domains.txt -o xls
./spoofy.py -iL domains.txt -o json --dkim
Install Dependencies: Install Dependencies:
pip3 install -r requirements.txt pip3 install -r requirements.txt

86
modules/dkim.py Normal file
View File

@@ -0,0 +1,86 @@
# modules/dkim.py
import requests
class DKIM:
def __init__(self, domain, dns_server=None, api_base_url=None):
self.domain = domain
self.dns_server = dns_server
self.api_base_url = "https://archive.prove.email/api"
self.dkim_record = self.get_dkim_record()
def get_dkim_record(self):
"""Returns the DKIM records for a given domain using the API."""
try:
base_url = self.api_base_url.rstrip('/')
url = f"{base_url}/key"
params = {"domain": self.domain}
headers = {"accept": "application/json"}
response = requests.get(url, params=params, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
return self.format_dkim_records(data)
elif response.status_code == 400:
return None
elif response.status_code == 429:
return None
elif response.status_code == 500:
return None
else:
return None
except requests.exceptions.RequestException:
return None
except (KeyError, ValueError, TypeError):
return None
def format_dkim_records(self, api_response):
"""Formats the API response into a readable string format."""
combined_txt_records = ""
if not isinstance(api_response, list):
return None
records_by_key = {}
for record in api_response:
if not isinstance(record, dict):
continue
selector = record.get("selector", "unknown")
domain = record.get("domain", self.domain)
value = record.get("value", "")
last_seen = record.get("lastSeenAt", "")
key = f"{selector}._domainkey.{domain}"
if key not in records_by_key:
records_by_key[key] = record
else:
existing_last_seen = records_by_key[key].get("lastSeenAt", "")
if last_seen > existing_last_seen:
records_by_key[key] = record
for key, record in records_by_key.items():
selector = record.get("selector", "unknown")
domain = record.get("domain", self.domain)
value = record.get("value", "")
if len(value) > 128:
trimmed_value = value[:128] + "...(trimmed)"
else:
trimmed_value = value
combined_txt_records += (
f"[*] {selector}._domainkey.{domain} -> {trimmed_value}\r\n"
)
if combined_txt_records:
return combined_txt_records.strip()
else:
return None
def __str__(self):
return f"DKIM Record: {self.dkim_record}"

View File

@@ -57,6 +57,7 @@ def printer(**kwargs):
sp = kwargs.get("DMARC_SP") sp = kwargs.get("DMARC_SP")
fo = kwargs.get("DMARC_FORENSIC_REPORT") fo = kwargs.get("DMARC_FORENSIC_REPORT")
rua = kwargs.get("DMARC_AGGREGATE_REPORT") rua = kwargs.get("DMARC_AGGREGATE_REPORT")
dkim_record = kwargs.get("DKIM")
bimi_record = kwargs.get("BIMI_RECORD") bimi_record = kwargs.get("BIMI_RECORD")
vbimi = kwargs.get("BIMI_VERSION") vbimi = kwargs.get("BIMI_VERSION")
location = kwargs.get("BIMI_LOCATION") location = kwargs.get("BIMI_LOCATION")
@@ -125,6 +126,11 @@ def printer(**kwargs):
else: else:
output_message("[?]", "No DMARC record found.", "warning") output_message("[?]", "No DMARC record found.", "warning")
if dkim_record:
output_message("[*]", f"DKIM selectors: \r\n{dkim_record}", "info")
else:
output_message("[?]", f"No known DKIM selectors enumerated on {domain}.", "warning")
if bimi_record: if bimi_record:
output_message("[*]", f"BIMI record: {bimi_record}", "info") output_message("[*]", f"BIMI record: {bimi_record}", "info")
output_message("[*]", f"BIMI version: {vbimi}", "info") output_message("[*]", f"BIMI version: {vbimi}", "info")

View File

@@ -2,4 +2,5 @@ colorama
dnspython>= 2.2.1 dnspython>= 2.2.1
tldextract tldextract
pandas pandas
openpyxl openpyxl
requests

View File

@@ -7,6 +7,7 @@ from queue import Queue
from modules.dns import DNS from modules.dns import DNS
from modules.spf import SPF from modules.spf import SPF
from modules.dmarc import DMARC from modules.dmarc import DMARC
from modules.dkim import DKIM
from modules.bimi import BIMI from modules.bimi import BIMI
from modules.spoofing import Spoofing from modules.spoofing import Spoofing
from modules import report from modules import report
@@ -14,8 +15,8 @@ from modules import report
print_lock = threading.Lock() print_lock = threading.Lock()
def process_domain(domain): def process_domain(domain, enable_dkim=False):
"""Process a domain to gather DNS, SPF, DMARC, and BIMI records, and evaluate spoofing potential.""" """Process a domain to gather DNS, SPF, DMARC, and BIMI records. Optionally enumerate DKIM selectors if enabled."""
dns_info = DNS(domain) dns_info = DNS(domain)
spf = SPF(domain, dns_info.dns_server) spf = SPF(domain, dns_info.dns_server)
dmarc = DMARC(domain, dns_info.dns_server) dmarc = DMARC(domain, dns_info.dns_server)
@@ -34,6 +35,11 @@ def process_domain(domain):
dmarc_fo = dmarc.fo dmarc_fo = dmarc.fo
dmarc_rua = dmarc.rua dmarc_rua = dmarc.rua
dkim_record = None
if enable_dkim:
dkim = DKIM(domain, dns_info.dns_server)
dkim_record = dkim.dkim_record
bimi_record = bimi_info.bimi_record bimi_record = bimi_info.bimi_record
bimi_version = bimi_info.version bimi_version = bimi_info.version
bimi_location = bimi_info.location bimi_location = bimi_info.location
@@ -70,6 +76,7 @@ def process_domain(domain):
"DMARC_SP": dmarc_sp, "DMARC_SP": dmarc_sp,
"DMARC_FORENSIC_REPORT": dmarc_fo, "DMARC_FORENSIC_REPORT": dmarc_fo,
"DMARC_AGGREGATE_REPORT": dmarc_rua, "DMARC_AGGREGATE_REPORT": dmarc_rua,
"DKIM": dkim_record,
"BIMI_RECORD": bimi_record, "BIMI_RECORD": bimi_record,
"BIMI_VERSION": bimi_version, "BIMI_VERSION": bimi_version,
"BIMI_LOCATION": bimi_location, "BIMI_LOCATION": bimi_location,
@@ -80,13 +87,13 @@ def process_domain(domain):
return result return result
def worker(domain_queue, print_lock, output, results): def worker(domain_queue, print_lock, output, results, enable_dkim=False):
"""Worker function to process domains and output results.""" """Worker function to process domains and output results."""
while True: while True:
domain = domain_queue.get() domain = domain_queue.get()
if domain is None: if domain is None:
break break
result = process_domain(domain) result = process_domain(domain, enable_dkim=enable_dkim)
with print_lock: with print_lock:
if output == "stdout": if output == "stdout":
report.printer(**result) report.printer(**result)
@@ -97,7 +104,7 @@ def worker(domain_queue, print_lock, output, results):
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Process domains to gather DNS, SPF, DMARC, and BIMI records." description="Process domains to gather DNS, SPF, DMARC, and BIMI records. Use --dkim to enable DKIM selector enumeration."
) )
group = parser.add_mutually_exclusive_group(required=True) group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-d", type=str, help="Single domain to process.") group.add_argument("-d", type=str, help="Single domain to process.")
@@ -114,7 +121,9 @@ def main():
parser.add_argument( parser.add_argument(
"-t", type=int, default=4, help="Number of threads to use (default: 4)" "-t", type=int, default=4, help="Number of threads to use (default: 4)"
) )
parser.add_argument(
"--dkim", action="store_true", help="Enable DKIM selector enumeration via API"
)
args = parser.parse_args() args = parser.parse_args()
@@ -133,7 +142,7 @@ def main():
threads = [] threads = []
for _ in range(min(args.t, len(domains))): for _ in range(min(args.t, len(domains))):
thread = threading.Thread( thread = threading.Thread(
target=worker, args=(domain_queue, print_lock, args.o, results) target=worker, args=(domain_queue, print_lock, args.o, results, args.dkim)
) )
thread.start() thread.start()
threads.append(thread) threads.append(thread)