SpyGuard/analysis/classes/engine.py

749 lines
38 KiB
Python
Raw Permalink Normal View History

2022-11-06 15:51:33 +01:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import os
import re
import subprocess as sp
import sys
2023-09-10 23:59:53 +02:00
import time
2022-11-06 15:51:33 +01:00
from datetime import datetime
from ipaddress import IPv4Address, IPv6Address
import ssl
import socket
import OpenSSL
import requests
import pydig
import whois
from publicsuffix2 import get_sld
from netaddr import IPAddress, IPNetwork
from classes.jarm import get_jarm
from utils import get_config, get_iocs, get_whitelist
class Engine():
def __init__(self, capture_directory):
# Set some vars.
self.analysis_start = datetime.now()
self.connected = self.check_internet()
self.working_dir = capture_directory
self.assets_dir = f"{capture_directory}/assets/"
self.rules_file = "/tmp/rules.rules"
self.pcap_path = os.path.join(self.working_dir, "capture.pcap")
self.records = []
self.alerts = []
self.dns = []
self.files = []
self.whitelist = []
self.uncategorized = []
self.analysed = []
self.dns_failed = []
self.dns_checked = []
self.cert_checked = []
self.errors = []
self.analysis_end = None
# Get configuration
self.heuristics_analysis = get_config(("analysis", "heuristics"))
self.iocs_analysis = get_config(("analysis", "iocs"))
self.whitelist_analysis = get_config(("analysis", "whitelist"))
self.active_analysis = get_config(("analysis", "active"))
self.userlang = get_config(("frontend", "user_lang"))
self.max_ports = get_config(("analysis", "max_ports"))
self.http_default_ports = get_config(("analysis", "http_default_ports"))
self.tls_default_ports = get_config(("analysis", "tls_default_ports"))
self.free_issuers = get_config(("analysis", "free_issuers"))
self.max_alerts = get_config(("analysis", "max_alerts"))
self.indicators_types = get_config(("analysis", "indicators_types"))
# Save detection methods used.
self.detection_methods = { "iocs" : self.iocs_analysis,
"heuristics" : self.heuristics_analysis,
"active" : self.active_analysis }
# Retreive IOCs.
if self.iocs_analysis:
self.bl_cidrs = [[IPNetwork(cidr[0]), cidr[1]] for cidr in get_iocs("cidr")]
self.bl_hosts = get_iocs("ip4addr") + get_iocs("ip6addr")
self.tor_nodes = self.get_tor_nodes()
self.bl_domains = get_iocs("domain")
self.bl_freedns = get_iocs("freedns")
self.bl_certs = get_iocs("sha1cert")
self.bl_jarms = get_iocs("jarm")
self.bl_nameservers = get_iocs("ns")
self.bl_tlds = get_iocs("tld")
# Retreive whitelisted items.
if self.whitelist_analysis:
self.wl_cidrs = [IPNetwork(cidr) for cidr in get_whitelist("cidr")]
self.wl_hosts = get_whitelist("ip4addr") + get_whitelist("ip6addr") + self.get_public_ip()
self.wl_domains = get_whitelist("domain")
# Load template language
if not re.match("^[a-z]{2,3}$", self.userlang): self.userlang = "en"
with open(os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "locales/{}.json".format(self.userlang))) as f:
self.template = json.load(f)["alerts"]
def check_internet(self) -> bool:
"""Check the internet link just with a small http request
2023-09-10 23:59:53 +02:00
to an URL present in the configuration. If the link is down,
retry 3 times.
2022-11-06 15:51:33 +01:00
Returns:
bool: True if everything works.
"""
2023-09-10 23:59:53 +02:00
attempts = 3
while True:
try:
url = get_config(("network", "internet_check"))
requests.get(url, timeout=3)
return True
except:
if attempts == 0:
return False
else:
time.sleep(5)
attempts -= 1
2022-11-06 15:51:33 +01:00
def get_public_ip(self) -> list:
"""Get the public IP address
Returns:
list: list containing the public IP address.
"""
if self.connected:
try:
return [requests.get("https://api.ipify.org", timeout=3).text]
except:
return []
else:
return []
def start_engine(self):
""" This method starts suricata and then launch the
parsers to analyse the output logs.
"""
# Parse the eve.json file.
self.parse_eve_file()
# For each type of records, check it against heuristics.
for record in self.records:
if self.whitelist_analysis: self.check_whitelist(record)
self.check_domains(record)
self.check_flow(record)
self.check_tls(record)
self.check_http(record)
# Check for failed DNS answers (if spyguard not connected)
for dnsname in list(set(self.dns_failed)):
self.check_dnsname(dnsname)
def parse_eve_file(self):
"""This method parses the eve.json file produced by suricata.
For each record, it look at the record type and then append the self.record
dictionnary which contains valuable data to look at suspicious stuff.
"""
for record in open(f"{self.assets_dir}eve.json", "r").readlines():
record = json.loads(record)
try:
if "flow" in record:
if "app_proto" not in record: record["app_proto"] = "failed"
proto = { "name" : record["app_proto"].upper() if record["app_proto"] != "failed" else record["proto"].upper(), "port" : record["dest_port"] if "dest_port" in record else -1 }
if record["dest_ip"] not in [r["ip_dst"] for r in self.records]:
self.records.append({
"ip_dst" : record["dest_ip"],
"whitelisted" : False,
"suspicious" : False,
"protocols" : [proto],
"domains" : [],
"certificates" : []
})
else:
for rec in self.records:
if record["dest_ip"] == rec["ip_dst"]:
if proto not in rec["protocols"]:
rec["protocols"].append(proto)
except Exception as e:
self.errors.append(f"Issue when processing the following eve record (flow): {json.dumps(record)}")
for record in open(f"{self.assets_dir}eve.json", "r").readlines():
record = json.loads(record)
try:
if "tls" in record:
for rec in self.records:
if record["dest_ip"] == rec["ip_dst"]:
if "version" in record["tls"]:
if float(record["tls"]["version"].split(" ")[1]) < 1.3 and not "session_resumed" in record["tls"]:
if record["tls"] not in rec["certificates"]:
record["tls"]["port"] = record["dest_port"]
rec["certificates"].append(record["tls"])
else:
if "sni" in record["tls"] and record["tls"]["sni"] not in [c["sni"] for c in rec["certificates"]]:
rec["certificates"].append({ "sni" : record["tls"]["sni"], "version" : record["tls"]["version"], "port" : record["dest_port"] })
else:
rec["certificates"].append({ "version" : record["tls"]["version"], "port" : record["dest_port"] })
except Exception as e:
self.errors.append(f"Issue when processing the following eve record (tls): {json.dumps(record)}")
for record in open(f"{self.assets_dir}eve.json", "r").readlines():
record = json.loads(record)
try:
if "http" in record:
for rec in self.records:
if record["dest_ip"] == rec["ip_dst"]:
d = { "hostname" : record["http"]["hostname"] }
if "http_user_agent" in record["http"]:
d["user-agent"] = record["http"]["http_user_agent"]
if "http" in rec:
if not d in rec["http"]:
rec["http"].append(d)
else:
rec["http"] = [d]
except Exception as e:
self.errors.append(f"Issue when processing the following eve record (http): {json.dumps(record)}")
for record in open(f"{self.assets_dir}eve.json", "r").readlines():
record = json.loads(record)
try:
if "dns" in record:
if record["dns"]["type"] == "answer":
for rec in self.records:
if record["dns"]["rcode"] == "NOERROR":
if "grouped" in record["dns"]:
if "A" in record["dns"]["grouped"] and rec["ip_dst"] in record["dns"]["grouped"]["A"]:
if record["dns"]["rrname"] not in rec["domains"]:
rec["domains"].append(record["dns"]["rrname"])
elif "AAAA" in record["dns"]["grouped"] and rec["ip_dst"] in record["dns"]["grouped"]["AAAA"]:
if record["dns"]["rrname"] not in rec["domains"]:
rec["domains"].append(record["dns"]["rrname"])
elif record["dns"]["rcode"] == "SERVFAIL":
self.dns_failed.append(record["dns"]["rrname"])
except Exception as e:
self.errors.append(f"Issue when processing the following eve record (dns answer): {json.dumps(record)}")
# This pass is if SpyGuard is not connected to Internet.
# We still analyze the un answered DNS queries.
for record in open(f"{self.assets_dir}eve.json", "r").readlines():
record = json.loads(record)
try:
if "dns" in record:
if record["dns"]["type"] == "query":
if record["dns"]["rrname"] not in sum([r["domains"] for r in self.records], []):
self.records.append({
"ip_dst" : "--",
"whitelisted" : False,
"suspicious" : False,
"protocols" : [{"name" : "DNS", "port" : "53"}],
"domains" : [record["dns"]["rrname"]],
"certificates" : []
})
except Exception as e:
self.errors.append(f"Issue when processing the following eve record (dns query): {json.dumps(record)}")
for record in open(f"{self.assets_dir}eve.json", "r").readlines():
record = json.loads(record)
try:
if "alert" in record and record["event_type"] == "alert":
for rec in self.records:
if record["dest_ip"] == rec["ip_dst"]:
rec["suspicious"] = True
self.alerts.append({"title": self.template["SNORT-01"]["title"].format(record["alert"]["signature"]),
"description": self.template["SNORT-01"]["description"].format(rec["ip_dst"]),
"host": rec["ip_dst"],
"level": "High",
"id": "SNORT-01"})
except Exception as e:
self.errors.append(f"Issue when processing the following eve record (dns answer): {json.dumps(record)}")
def check_whitelist(self, record):
""" This method is asked on each record. It:
1. Check if the associated IP(v4/6) Address can be whitelisted
2. Check if one of the associated domain names can be whitelisted.
If its the case, the "whitelisted" key of the record is set to True.
Therefore, the record will be ignored for the rest of the analysis.
Args:
record (dict): record to be processed.
"""
try:
assert IPv4Address(record["ip_dst"])
if IPv4Address('224.0.0.0') <= IPv4Address(record["ip_dst"]) <= IPv4Address('239.255.255.255'):
record["whitelisted"] = True
return
for cidr in self.wl_cidrs:
if IPAddress(record["ip_dst"]) in cidr:
record["whitelisted"] = True
return
for ip in self.wl_hosts:
if record["ip_dst"] == ip:
record["whitelisted"] = True
return
except:
pass
try:
assert IPv6Address(record["ip_dst"])
if [record["ip_dst"].startswith(prefix) for prefix in ["fe80", "fc00", "ff02"]]:
record["whitelisted"] = True
return
for ip in self.wl_hosts:
if record["ip_dst"] == ip:
record["whitelisted"] = True
return
except:
pass
# We check if at least one of the associated
# domains is whitelisted
for dom in self.wl_domains:
for domain in record["domains"]:
if domain.endswith(dom):
record["whitelisted"] = True
return
def check_domains(self, record):
"""Check the domains associated to each record.
First this method checks if the record is whitelisted. If not:
1. Leverage a low alert if the record don't have any associated DNSName
2. Check each domain associated to the record by calling check_dnsname.
Args:
record (dict): record to be processed.
"""
if record["whitelisted"]: return
if self.heuristics_analysis:
# Otherwise, we alert the user that an IP haven't been resolved by
# a DNS answer during the session...
if record["domains"] == []:
record["suspicious"] = True
self.alerts.append({"title": self.template["PROTO-05"]["title"].format(record["ip_dst"]),
"description": self.template["PROTO-05"]["description"].format(record["ip_dst"]),
"host": record["ip_dst"],
"level": "Low",
"id": "PROTO-05"})
# Check each associated domain.
for domain in record["domains"]:
if self.check_dnsname(domain):
record["suspicious"] = True
def check_dnsname(self, dnsname):
"""Check a domain name against a set of IOCs / heuristics.
1. Check if the parent domain is blacklisted.
2. Check if the parent domain is a Free DNS.
3. Check if the domain extension is a suspicious TLD.
4. Check if the name servers associated to the domain are suspicious.
5. Check if the domain have been registered recently - less than one year.
Args:
record (dict): record to be processed.
Returns:
supicious (bool) : if an alert has been leveraged.
"""
suspicious = False
if self.iocs_analysis:
for domain in self.bl_domains:
if dnsname.endswith(domain[0]) and any(t in self.indicators_types for t in [domain[1], "all"]):
if domain[1] == "dual":
suspicious = True
self.alerts.append({"title": self.template["IOC-12"]["title"],
"description": self.template["IOC-12"]["description"].format(domain[0]),
"host": domain[0],
"level": "Low",
"id": "IOC-12"})
elif domain[1] == "tracker":
suspicious = True
self.alerts.append({"title": self.template["IOC-04"]["title"].format(domain[0], "tracker"),
"description": self.template["IOC-04"]["description"].format(domain[0], "tracker"),
"host": domain[0],
"level": "Low",
"id": "IOC-04"})
elif domain[1] == "doh":
suspicious = True
self.alerts.append({"title": self.template["IOC-13"]["title"].format(f"{dnsname}"),
"description": self.template["IOC-13"]["description"].format(f"{dnsname}"),
"host": dnsname,
"level": "Low",
"id": "IOC-13"})
else:
suspicious = True
self.alerts.append({"title": self.template["IOC-03"]["title"].format(dnsname, domain[1].upper()),
"description": self.template["IOC-03"]["description"].format(dnsname),
"host": dnsname,
"level": "High",
"id": "IOC-03"})
for domain in self.bl_freedns:
if dnsname.endswith(domain[0]) and any(t in self.indicators_types for t in [domain[1], "all"]):
suspicious = True
self.alerts.append({"title": self.template["IOC-05"]["title"].format(dnsname),
"description": self.template["IOC-05"]["description"].format(dnsname),
"host": dnsname,
"level": "Moderate",
"id": "IOC-05"})
if self.heuristics_analysis:
for domain in self.bl_tlds:
if dnsname.endswith(domain[0]) and any(t in self.indicators_types for t in [domain[1], "all"]):
suspicious = True
self.alerts.append({"title": self.template["IOC-06"]["title"].format(dnsname),
"description": self.template["IOC-06"]["description"].format(dnsname, domain[0]),
"host": dnsname,
"level": "Low",
"id": "IOC-06"})
if self.active_analysis and self.connected:
domain = get_sld(dnsname)
if domain not in self.dns_checked:
self.dns_checked.append(domain)
try:
name_servers = pydig.query(domain, "NS")
if len(name_servers):
for ns in self.bl_nameservers:
if name_servers[0].endswith(".{}.".format(ns[0])) and any(t in self.indicators_types for t in [ns[1], "all"]):
suspicious = True
self.alerts.append({"title": self.template["ACT-01"]["title"].format(dnsname, name_servers[0]),
"description": self.template["ACT-01"]["description"].format(dnsname),
"host": dnsname,
"level": "Moderate",
"id": "ACT-01"})
except Exception as e:
self.errors.append(f"Issue when doing a dig NS query to {domain}, are you connected? Error: {str(e)}")
try:
whois_record = whois.whois(domain)
creation_date = whois_record.creation_date if type(whois_record.creation_date) is not list else whois_record.creation_date[0]
creation_days = abs((datetime.now() - creation_date).days)
if creation_days < 365:
suspicious = True
self.alerts.append({"title": self.template["ACT-02"]["title"].format(dnsname, creation_days),
"description": self.template["ACT-02"]["description"].format(dnsname),
"host": dnsname,
"level": "Moderate",
"id": "ACT-02"})
except Exception as e:
self.errors.append(f"Issue when doing a WHOIS query to {domain}, are you connected? Error: {str(e)}")
return suspicious
def check_flow(self, record):
"""Check a network flow against a set of IOCs / heuristics.
1. Check if the IP Address is blacklisted
2. Check if the IP Address is inside a blacklisted CIDR
3. Check if the UDP or ICMP protocol is going outside of the local network.
4. Check if the HTTP protocol is not using default HTTP ports.
5. Check if the network flow is using a port > 1024.
Args:
record (dict): record to be processed.
Returns:
supicious (bool) : if an alert has been leveraged.
"""
if record["whitelisted"]: return
resolved_host = record["domains"][0] if len(record["domains"]) else record["ip_dst"]
if self.iocs_analysis:
for host in self.bl_hosts:
if record["ip_dst"] == host[0] and any(t in self.indicators_types for t in [host[1], "all"]):
if host[1] == "dual":
record["suspicious"] = True
self.alerts.append({"title": self.template["IOC-12"]["title"],
"description": self.template["IOC-12"]["description"].format(resolved_host),
"host": resolved_host,
"level": "Low",
"id": "IOC-12"})
if host[1] == "tracker":
record["suspicious"] = True
self.alerts.append({"title": self.template["IOC-04"]["title"].format(resolved_host, "tracker"),
"description": self.template["IOC-04"]["description"].format(resolved_host, "tracker"),
"host": resolved_host,
"level": "Low",
"id": "IOC-04"})
elif host[1] == "doh":
if 443 in [p["port"] for p in record["protocols"]]:
record["suspicious"] = True
self.alerts.append({"title": self.template["IOC-13"]["title"].format(f"{resolved_host}"),
"description": self.template["IOC-13"]["description"].format(f"{resolved_host}"),
"host": resolved_host,
"level": "Low",
"id": "IOC-13"})
else:
record["suspicious"] = True
self.alerts.append({"title": self.template["IOC-01"]["title"].format(resolved_host, record["ip_dst"], host[1].upper()),
"description": self.template["IOC-01"]["description"].format(f"{resolved_host} ({record['ip_dst']})"),
"host": resolved_host,
"level": "High",
"id": "IOC-01"})
break
for host in self.tor_nodes:
if record["ip_dst"] == host:
record["suspicious"] = True
self.alerts.append({"title": self.template["IOC-11"]["title"].format(resolved_host, record["ip_dst"]),
"description": self.template["IOC-11"]["description"].format(f"{resolved_host} ({record['ip_dst']})"),
"host": resolved_host,
"level": "High",
"id": "IOC-11"})
break
for cidr in self.bl_cidrs:
try:
if IPAddress(record["ip_dst"]) in cidr[0] and any(t in self.indicators_types for t in [cidr[1], "all"]):
record["suspicious"] = True
self.alerts.append({"title": self.template["IOC-02"]["title"].format(resolved_host, cidr[0], cidr[1].upper()),
"description": self.template["IOC-02"]["description"].format(record["ip_dst"]),
"host": resolved_host,
"level": "Moderate",
"id": "IOC-02"})
except:
continue
if self.heuristics_analysis:
for protocol in record["protocols"]:
if protocol["name"] in ["UDP", "ICMP", "IPV6-ICMP"]:
record["suspicious"] = True
self.alerts.append({"title": self.template["PROTO-01"]["title"].format(protocol["name"], resolved_host),
"description": self.template["PROTO-01"]["description"].format(protocol["name"], resolved_host),
"host": resolved_host,
"level": "Moderate",
"id": "PROTO-01"})
try:
if protocol["port"] >= int(self.max_ports):
record["suspicious"] = True
self.alerts.append({"title": self.template["PROTO-02"]["title"].format("", resolved_host, self.max_ports),
"description": self.template["PROTO-02"]["description"].format("", resolved_host, protocol["port"]),
"host": resolved_host,
"level": "Low",
"id": "PROTO-02"})
except:
pass
if protocol["name"] == "HTTP":
record["suspicious"] = True
self.alerts.append({"title": self.template["PROTO-03"]["title"].format(resolved_host),
"description": self.template["PROTO-03"]["description"].format(resolved_host),
"host": resolved_host,
"level": "Low",
"id": "PROTO-03"})
if protocol["name"] == "HTTP" and protocol["port"] not in self.http_default_ports:
record["suspicious"] = True
self.alerts.append({"title": self.template["PROTO-04"]["title"].format(resolved_host, protocol["port"]),
"description": self.template["PROTO-04"]["description"].format(resolved_host, protocol["port"]),
"host": resolved_host,
"level": "Moderate",
"id": "PROTO-04"})
def check_tls(self, record):
"""Check a TLS protocol and certificates against a set of IOCs / heuristics.
Note since TLS 1.3, the certificate is not exchanged in clear text, therefore
we need to check it "actively" via the method active_check_ssl.
1. Check if the TLS record is not using default TLS ports.
2. Check if one of the certificates is a free one, like Let's Encrypt.
3. Check if the certificate is auto-signed.
4. If the certificate has an SNI, check the domain by calling check_dnsname.
Args:
record (dict): record to be processed.
Returns:
supicious (bool) : if an alert has been leveraged.
"""
if record["whitelisted"]: return
resolved_host = record["domains"][0] if len(record["domains"]) else record["ip_dst"]
for certificate in record["certificates"]:
try:
if "sni" in certificate and certificate["sni"] not in record["domains"]:
if certificate["sni"]:
if self.check_dnsname(certificate["sni"]):
record["suspicious"] = True
if certificate["port"] not in self.tls_default_ports:
record["suspicious"] = True
self.alerts.append({"title": self.template["SSL-01"]["title"].format(certificate["port"], resolved_host),
"description": self.template["SSL-01"]["description"].format(resolved_host),
"host": resolved_host,
"level": "Moderate",
"id": "SSL-01"})
if float(certificate["version"].split(" ")[1]) < 1.3 and "issuerdn" in certificate:
if certificate["issuerdn"] in self.free_issuers:
record["suspicious"] = True
self.alerts.append({"title": self.template["SSL-02"]["title"].format(resolved_host),
"description": self.template["SSL-02"]["description"],
"host": resolved_host,
"level": "Moderate",
"id": "SSL-02"})
elif certificate["issuerdn"] == certificate["subject"]:
record["suspicious"] = True
self.alerts.append({"title": self.template["SSL-03"]["title"].format(resolved_host),
"description": self.template["SSL-03"]["description"].format(resolved_host),
"host": resolved_host,
"level": "Moderate",
"id": "SSL-03"})
else:
if self.active_analysis and self.connected:
if "sni" in certificate:
if certificate["sni"] not in self.cert_checked:
self.cert_checked.append(certificate["sni"])
if self.active_check_ssl(certificate["sni"], certificate["port"]):
record["suspicious"] = True
break
else:
if resolved_host not in self.cert_checked:
self.cert_checked.append(resolved_host)
if self.active_check_ssl(resolved_host, certificate["port"]):
record["suspicious"] = True
break
except Exception as e:
self.errors.append(f"Issue when processing the following certificate (check_tls): {json.dumps(certificate)}")
def get_tor_nodes(self) -> list:
"""Get a list of TOR nodes from dan.me.uk.
Returns:
list: list of TOR nodes
"""
nodes = []
if os.path.exists("/tmp/tor_nodes.lst"):
with open("/tmp/tor_nodes.lst", "r") as f:
for l in f.readlines():
nodes.append(l.strip())
else:
if self.connected:
try:
nodes_list = requests.get("https://www.dan.me.uk/torlist/", timeout=10).text
with open("/tmp/tor_nodes.lst", "w+") as f:
f.write(nodes_list)
for l in nodes_list.splitlines():
nodes.append(l.strip())
except:
self.errors.append(f"Issue when trying to get TOR nodes from dan.me.uk")
return nodes
def check_http(self, record):
"""Check the HTTP hostname against a set of IOCs / heuristics.
Args:
record (dict): record to be processed.
Returns:
supicious (bool) : if an alert has been leveraged.
"""
if record["whitelisted"]: return
if "http" in record:
for http in record["http"]:
if http["hostname"] not in record["domains"]:
if re.match("^[a-z\.0-9\-]+\.[a-z\-]{2,}$", http["hostname"]):
if http["hostname"]:
if self.check_dnsname(http["hostname"]):
record["suspicious"] = True
def active_check_ssl(self, host, port):
"""This method:
1. Check the issuer and subject of a certificate directly by connecting
to the remote server in order to bypass TLS 1.3+ restrictions.
Most of this method was been taken from: https://tinyurl.com/3vsvhu79
2. Get the JARM of the remote server by using the standard poc library
from sales force.
Args:
host (str): Host to connect to
port (int): Port to connect to
"""
try:
suspect = False
context = ssl.create_default_context()
conn = socket.create_connection((host, port))
sock = context.wrap_socket(conn, server_hostname=host)
sock.settimeout(5)
try:
der_cert = sock.getpeercert(True)
finally:
sock.close()
if "der_cert" in locals():
certificate = ssl.DER_cert_to_PEM_cert(der_cert)
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, certificate)
issuer = dict(x509.get_issuer().get_components())
subject = dict(x509.get_subject().get_components())
certhash = x509.digest("sha1").decode("utf8").replace(":", "").lower()
issuer = ", ".join(f"{k.decode('utf8')}={v.decode('utf8')}" for k, v in issuer.items())
subject = ", ".join(f"{k.decode('utf8')}={v.decode('utf8')}" for k, v in subject.items())
if issuer in self.free_issuers:
self.alerts.append({"title": self.template["SSL-02"]["title"].format(host),
"description": self.template["SSL-02"]["description"],
"host": host,
"level": "Moderate",
"id": "SSL-02"})
suspect = True
if issuer == subject:
self.alerts.append({"title": self.template["SSL-03"]["title"].format(host),
"description": self.template["SSL-03"]["description"].format(host),
"host": host,
"level": "Moderate",
"id": "SSL-03"})
suspect = True
if self.iocs_analysis:
for cert in self.bl_certs:
if cert[0] == certhash and any(t in self.indicators_types for t in [cert[1], "all"]):
self.alerts.append({"title": self.template["SSL-04"]["title"].format(host, cert[1].upper()),
"description": self.template["SSL-04"]["description"].format(host),
"host": host,
"level": "High",
"id": "SSL-04"})
suspect = True
if self.bl_jarms:
host_jarm = get_jarm(host, port)
for jarm in self.bl_jarms:
if jarm[0] == host_jarm and any(t in self.indicators_types for t in [jarm[1], "all"]):
self.alerts.append({"title": self.template["SSL-05"]["title"].format(host, cert[1].upper()),
"description": self.template["SSL-05"]["description"].format(host),
"host": host,
"level": "High",
"id": "SSL-05"})
suspect = True
return suspect
except:
self.errors.append(f"Issue when trying to grab the SSL certificate located at {host}:{port}")
return False
def get_alerts(self):
"""Retrieves the alerts triggered during the analysis
Returns:
list: list of the alerts.
"""
self.analysis_end = datetime.now()
return [dict(t) for t in {tuple(d.items()) for d in self.alerts}]