mirror of
https://github.com/MattKeeley/Spoofy.git
synced 2026-02-03 05:23:24 +00:00
Update spoofy.py
This commit is contained in:
108
spoofy.py
108
spoofy.py
@@ -1,16 +1,16 @@
|
||||
#! /usr/bin/env python3
|
||||
import argparse
|
||||
import threading
|
||||
from modules.dns import DNS
|
||||
from modules.spf import SPF
|
||||
from modules.dmarc import DMARC
|
||||
from modules.bimi import BIMI
|
||||
from modules.spoofing import Spoofing
|
||||
|
||||
import argparse
|
||||
import tldextract
|
||||
import threading
|
||||
from modules import report
|
||||
from queue import Queue
|
||||
|
||||
def process_domain(domain):
|
||||
|
||||
"""Process a domain to gather DNS, SPF, DMARC, and BIMI records, and evaluate spoofing potential."""
|
||||
dns_info = DNS(domain)
|
||||
spf = SPF(domain, dns_info.dns_server)
|
||||
dmarc = DMARC(domain, dns_info.dns_server)
|
||||
@@ -34,39 +34,87 @@ def process_domain(domain):
|
||||
bimi_location = bimi_info.location
|
||||
bimi_authority = bimi_info.authority
|
||||
|
||||
|
||||
spoofing_info = Spoofing(domain, dmarc_p, dmarc_aspf, spf_record, spf_all, spf_num_includes, dmarc_sp, dmarc_pct)
|
||||
|
||||
domain_type = spoofing_info.domain_type
|
||||
spoofing_possible = spoofing_info.spoofing_possible
|
||||
spoofing_type = spoofing_info.spoofing_type
|
||||
|
||||
|
||||
result = {
|
||||
'DOMAIN_TYPE': domain_type,
|
||||
'DNS_SERVER': dns_info.dns_server,
|
||||
'SPF': spf_record,
|
||||
'SPF_MULTIPLE_ALLS': spf_all,
|
||||
'SPF_NUM_INCLUDES': spf_num_includes,
|
||||
'SPF_TOO_MANY_INCLUDES': spf_too_many_includes,
|
||||
'DMARC': dmarc_record,
|
||||
'DMARC_POLICY': dmarc_p,
|
||||
'DMARC_PCT': dmarc_pct,
|
||||
'DMARC_ASPF': dmarc_aspf,
|
||||
'DMARC_SP': dmarc_sp,
|
||||
'DMARC_FORENSIC_REPORT': dmarc_fo,
|
||||
'DMARC_AGGREGATE_REPORT': dmarc_rua,
|
||||
'BIMI_RECORD': bimi_record,
|
||||
'BIMI_VERSION': bimi_version,
|
||||
'BIMI_LOCATION': bimi_location,
|
||||
'BIMI_AUTHORITY': bimi_authority,
|
||||
'SPOOFING_POSSIBLE': spoofing_possible,
|
||||
'SPOOFING_TYPE': spoofing_type
|
||||
'DOMAIN': domain,
|
||||
'DOMAIN_TYPE': domain_type,
|
||||
'DNS_SERVER': dns_info.dns_server,
|
||||
'SPF': spf_record,
|
||||
'SPF_MULTIPLE_ALLS': spf_all,
|
||||
'SPF_NUM_INCLUDES': spf_num_includes,
|
||||
'SPF_TOO_MANY_INCLUDES': spf_too_many_includes,
|
||||
'DMARC': dmarc_record,
|
||||
'DMARC_POLICY': dmarc_p,
|
||||
'DMARC_PCT': dmarc_pct,
|
||||
'DMARC_ASPF': dmarc_aspf,
|
||||
'DMARC_SP': dmarc_sp,
|
||||
'DMARC_FORENSIC_REPORT': dmarc_fo,
|
||||
'DMARC_AGGREGATE_REPORT': dmarc_rua,
|
||||
'BIMI_RECORD': bimi_record,
|
||||
'BIMI_VERSION': bimi_version,
|
||||
'BIMI_LOCATION': bimi_location,
|
||||
'BIMI_AUTHORITY': bimi_authority,
|
||||
'SPOOFING_POSSIBLE': spoofing_possible,
|
||||
'SPOOFING_TYPE': spoofing_type
|
||||
}
|
||||
return result
|
||||
|
||||
def worker(domain_queue, result_queue):
|
||||
"""Worker function to process domains and put results into the result queue."""
|
||||
while not domain_queue.empty():
|
||||
domain = domain_queue.get()
|
||||
if domain is None:
|
||||
break
|
||||
result = process_domain(domain)
|
||||
result_queue.put(result)
|
||||
domain_queue.task_done()
|
||||
def process_domain_and_output(domain, output, results):
|
||||
"""Process a domain and handle output based on the specified format."""
|
||||
result = process_domain(domain)
|
||||
if output == 'stdout':
|
||||
report.printer(**result)
|
||||
else:
|
||||
results.append(result)
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Process domains to gather DNS, SPF, DMARC, and BIMI records.")
|
||||
group = parser.add_mutually_exclusive_group(required=True)
|
||||
group.add_argument("-d", type=str, help="Single domain to process.")
|
||||
group.add_argument("-iL", type=str, help="File containing a list of domains to process.")
|
||||
parser.add_argument("-o", type=str, choices=['stdout', 'xls'], default='stdout', help="Output format: stdout or xls (default: stdout).")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load domains
|
||||
if args.d:
|
||||
domains = [args.d]
|
||||
elif args.iL:
|
||||
with open(args.iL, "r") as file:
|
||||
domains = [line.strip() for line in file]
|
||||
|
||||
# Prepare for processing
|
||||
results = []
|
||||
threads = []
|
||||
|
||||
# Start threads to process each domain
|
||||
for domain in domains:
|
||||
thread = threading.Thread(target=process_domain_and_output, args=(domain, args.o, results))
|
||||
thread.start()
|
||||
threads.append(thread)
|
||||
|
||||
# Wait for all threads to complete
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
# Handle output for xls format
|
||||
if args.o == 'xls' and results:
|
||||
report.write_to_excel(results)
|
||||
print("Results written to output.xlsx")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# for domain in domains
|
||||
# process the domain, get the results. Store those results in batches of 100
|
||||
# if the 100 mark is hit, write it to stdout, clear the stoage and work on next batch
|
||||
pass
|
||||
main()
|
||||
Reference in New Issue
Block a user