mirror of
https://github.com/MattKeeley/Spoofy.git
synced 2026-02-03 05:23:24 +00:00
implement accurate multi-threadding
This commit is contained in:
48
spoofy.py
48
spoofy.py
@@ -1,6 +1,7 @@
|
||||
#! /usr/bin/env python3
|
||||
import argparse
|
||||
import threading
|
||||
from queue import Queue
|
||||
from modules.dns import DNS
|
||||
from modules.spf import SPF
|
||||
from modules.dmarc import DMARC
|
||||
@@ -8,6 +9,8 @@ from modules.bimi import BIMI
|
||||
from modules.spoofing import Spoofing
|
||||
from modules import report
|
||||
|
||||
print_lock = threading.Lock()
|
||||
|
||||
|
||||
def process_domain(domain):
|
||||
"""Process a domain to gather DNS, SPF, DMARC, and BIMI records, and evaluate spoofing potential."""
|
||||
@@ -74,26 +77,21 @@ def process_domain(domain):
|
||||
return result
|
||||
|
||||
|
||||
def worker(domain_queue, result_queue):
|
||||
"""Worker function to process domains and put results into the result queue."""
|
||||
while not domain_queue.empty():
|
||||
def worker(domain_queue, print_lock, output, results):
|
||||
"""Worker function to process domains and output results."""
|
||||
while True:
|
||||
domain = domain_queue.get()
|
||||
if domain is None:
|
||||
break
|
||||
result = process_domain(domain)
|
||||
result_queue.put(result)
|
||||
with print_lock:
|
||||
if output == "stdout":
|
||||
report.printer(**result)
|
||||
else:
|
||||
results.append(result)
|
||||
domain_queue.task_done()
|
||||
|
||||
|
||||
def process_domain_and_output(domain, output, results):
|
||||
"""Process a domain and handle output based on the specified format."""
|
||||
result = process_domain(domain)
|
||||
if output == "stdout":
|
||||
report.printer(**result)
|
||||
else:
|
||||
results.append(result)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Process domains to gather DNS, SPF, DMARC, and BIMI records."
|
||||
@@ -110,37 +108,43 @@ def main():
|
||||
default="stdout",
|
||||
help="Output format: stdout or xls (default: stdout).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-t", type=int, default=4, help="Number of threads to use (default: 4)"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load domains
|
||||
if args.d:
|
||||
domains = [args.d]
|
||||
elif args.iL:
|
||||
with open(args.iL, "r") as file:
|
||||
domains = [line.strip() for line in file]
|
||||
|
||||
# Prepare for processing
|
||||
domain_queue = Queue()
|
||||
results = []
|
||||
threads = []
|
||||
|
||||
# Start threads to process each domain
|
||||
for domain in domains:
|
||||
domain_queue.put(domain)
|
||||
|
||||
threads = []
|
||||
for _ in range(min(args.t, len(domains))):
|
||||
thread = threading.Thread(
|
||||
target=process_domain_and_output, args=(domain, args.o, results)
|
||||
target=worker, args=(domain_queue, print_lock, args.o, results)
|
||||
)
|
||||
thread.start()
|
||||
threads.append(thread)
|
||||
|
||||
# Wait for all threads to complete
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
domain_queue.join()
|
||||
|
||||
# Handle output for xls format
|
||||
if args.o == "xls" and results:
|
||||
report.write_to_excel(results)
|
||||
print("Results written to output.xlsx")
|
||||
|
||||
for _ in range(len(threads)):
|
||||
domain_queue.put(None)
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user