mirror of
https://github.com/MattKeeley/Spoofy.git
synced 2026-02-03 13:33:24 +00:00
restructure to a class
This commit is contained in:
137
modules/dmarc.py
137
modules/dmarc.py
@@ -1,82 +1,91 @@
|
||||
# modules/dmarc.py
|
||||
|
||||
import dns.resolver
|
||||
import tldextract
|
||||
|
||||
class DMARC:
|
||||
def __init__(self, domain, dns_server=None):
|
||||
self.domain = domain
|
||||
self.dns_server = dns_server
|
||||
self.dmarc_record = self.get_dmarc_record()
|
||||
self.policy = None
|
||||
self.pct = None
|
||||
self.aspf = None
|
||||
self.sp = None
|
||||
self.fo = None
|
||||
self.rua = None
|
||||
|
||||
def get_dmarc_record(domain, dns_server):
|
||||
"""Returns the DMARC record for a given domain."""
|
||||
subdomain = tldextract.extract(domain).registered_domain
|
||||
if subdomain != domain:
|
||||
return get_dmarc_record(subdomain, dns_server)
|
||||
try:
|
||||
resolver = dns.resolver.Resolver()
|
||||
resolver.nameservers = [dns_server]
|
||||
dmarc = resolver.resolve(f"_dmarc.{domain}", "TXT")
|
||||
except:
|
||||
if self.dmarc_record:
|
||||
self.policy = self.get_dmarc_policy()
|
||||
self.pct = self.get_dmarc_pct()
|
||||
self.aspf = self.get_dmarc_aspf()
|
||||
self.sp = self.get_dmarc_subdomain_policy()
|
||||
self.fo = self.get_dmarc_forensic_reports()
|
||||
self.rua = self.get_dmarc_aggregate_reports()
|
||||
|
||||
def get_dmarc_record(self):
|
||||
"""Returns the DMARC record for the domain."""
|
||||
subdomain = tldextract.extract(self.domain).registered_domain
|
||||
if subdomain != self.domain:
|
||||
return self.get_dmarc_record_for_domain(subdomain)
|
||||
|
||||
return self.get_dmarc_record_for_domain(self.domain)
|
||||
|
||||
def get_dmarc_record_for_domain(self, domain):
|
||||
try:
|
||||
resolver = dns.resolver.Resolver()
|
||||
if self.dns_server:
|
||||
resolver.nameservers = [self.dns_server]
|
||||
dmarc = resolver.resolve(f"_dmarc.{domain}", "TXT")
|
||||
except:
|
||||
return None
|
||||
|
||||
for dns_data in dmarc:
|
||||
if "DMARC1" in str(dns_data):
|
||||
return str(dns_data).replace('"', '')
|
||||
return None
|
||||
|
||||
for dns_data in dmarc:
|
||||
if "DMARC1" in str(dns_data):
|
||||
dmarc_record = str(dns_data).replace('"', '')
|
||||
return dmarc_record
|
||||
return None
|
||||
|
||||
|
||||
def get_dmarc_details(dmarc_record):
|
||||
"""Returns a tuple containing policy, pct, aspf, subdomain policy,
|
||||
forensic report uri, and aggregate report uri from a DMARC record"""
|
||||
p = get_dmarc_policy(dmarc_record)
|
||||
pct = get_dmarc_pct(dmarc_record)
|
||||
aspf = get_dmarc_aspf(dmarc_record)
|
||||
sp = get_dmarc_subdomain_policy(dmarc_record)
|
||||
fo = get_dmarc_forensic_reports(dmarc_record)
|
||||
rua = get_dmarc_aggregate_reports(dmarc_record)
|
||||
return p, pct, aspf, sp, fo, rua
|
||||
|
||||
|
||||
def get_dmarc_policy(dmarc_record):
|
||||
"""Returns the policy value from a DMARC record."""
|
||||
if "p=" in str(dmarc_record):
|
||||
return str(dmarc_record).split("p=")[1].split(";")[0]
|
||||
else:
|
||||
def get_dmarc_policy(self):
|
||||
"""Returns the policy value from a DMARC record."""
|
||||
if "p=" in str(self.dmarc_record):
|
||||
return str(self.dmarc_record).split("p=")[1].split(";")[0]
|
||||
return None
|
||||
|
||||
|
||||
def get_dmarc_pct(dmarc_record):
|
||||
"""Returns the pct value from a DMARC record."""
|
||||
if "pct=" in str(dmarc_record):
|
||||
return str(dmarc_record).split("pct=")[1].split(";")[0]
|
||||
else:
|
||||
def get_dmarc_pct(self):
|
||||
"""Returns the pct value from a DMARC record."""
|
||||
if "pct=" in str(self.dmarc_record):
|
||||
return str(self.dmarc_record).split("pct=")[1].split(";")[0]
|
||||
return None
|
||||
|
||||
|
||||
def get_dmarc_aspf(dmarc_record):
|
||||
"""Returns the aspf value from a DMARC record"""
|
||||
if "aspf=" in str(dmarc_record):
|
||||
return str(dmarc_record).split("aspf=")[1].split(";")[0]
|
||||
else:
|
||||
def get_dmarc_aspf(self):
|
||||
"""Returns the aspf value from a DMARC record"""
|
||||
if "aspf=" in str(self.dmarc_record):
|
||||
return str(self.dmarc_record).split("aspf=")[1].split(";")[0]
|
||||
return None
|
||||
|
||||
|
||||
def get_dmarc_subdomain_policy(dmarc_record):
|
||||
"""Returns the policy to apply for subdomains from a DMARC record."""
|
||||
if "sp=" in str(dmarc_record):
|
||||
return str(dmarc_record).split("sp=")[1].split(";")[0]
|
||||
else:
|
||||
def get_dmarc_subdomain_policy(self):
|
||||
"""Returns the policy to apply for subdomains from a DMARC record."""
|
||||
if "sp=" in str(self.dmarc_record):
|
||||
return str(self.dmarc_record).split("sp=")[1].split(";")[0]
|
||||
return None
|
||||
|
||||
|
||||
def get_dmarc_forensic_reports(dmarc_record):
|
||||
"""Returns the email addresses to which forensic reports should be sent."""
|
||||
if "ruf=" in str(dmarc_record) and "fo=1" in str(dmarc_record):
|
||||
return str(dmarc_record).split("ruf=")[1].split(";")[0]
|
||||
else:
|
||||
def get_dmarc_forensic_reports(self):
|
||||
"""Returns the email addresses to which forensic reports should be sent."""
|
||||
if "ruf=" in str(self.dmarc_record) and "fo=1" in str(self.dmarc_record):
|
||||
return str(self.dmarc_record).split("ruf=")[1].split(";")[0]
|
||||
return None
|
||||
|
||||
|
||||
def get_dmarc_aggregate_reports(dmarc_record):
|
||||
"""Returns the email addresses to which aggregate reports should be sent."""
|
||||
if "rua=" in str(dmarc_record):
|
||||
return str(dmarc_record).split("rua=")[1].split(";")[0]
|
||||
else:
|
||||
def get_dmarc_aggregate_reports(self):
|
||||
"""Returns the email addresses to which aggregate reports should be sent."""
|
||||
if "rua=" in str(self.dmarc_record):
|
||||
return str(self.dmarc_record).split("rua=")[1].split(";")[0]
|
||||
return None
|
||||
|
||||
def __str__(self):
|
||||
return (f"DMARC Record: {self.dmarc_record}\n"
|
||||
f"Policy: {self.policy}\n"
|
||||
f"Pct: {self.pct}\n"
|
||||
f"ASPF: {self.aspf}\n"
|
||||
f"Subdomain Policy: {self.sp}\n"
|
||||
f"Forensic Report URI: {self.fo}\n"
|
||||
f"Aggregate Report URI: {self.rua}")
|
||||
|
||||
Reference in New Issue
Block a user