Use shell=False in subprocess Function Calls

This commit is contained in:
pixeebot[bot] 2024-04-14 16:16:31 +00:00 committed by GitHub
parent 4e4c6172cd
commit 887e90cd06
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,478 +1,478 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from classes.parsezeeklogs import ParseZeekLogs from classes.parsezeeklogs import ParseZeekLogs
from netaddr import IPNetwork, IPAddress from netaddr import IPNetwork, IPAddress
from utils import get_iocs, get_config, get_whitelist from utils import get_iocs, get_config, get_whitelist
from datetime import datetime from datetime import datetime
import subprocess as sp import subprocess as sp
import json import json
import pydig import pydig
import os import os
import re import re
import sys import sys
import whois import whois
class ZeekEngine(object): class ZeekEngine(object):
def __init__(self, capture_directory): def __init__(self, capture_directory):
self.working_dir = capture_directory self.working_dir = capture_directory
self.alerts = [] self.alerts = []
self.conns = [] self.conns = []
self.ssl = [] self.ssl = []
self.http = [] self.http = []
self.dns = [] self.dns = []
self.files = [] self.files = []
self.whitelist = [] self.whitelist = []
# Get analysis and userlang configuration # Get analysis and userlang configuration
self.heuristics_analysis = get_config(("analysis", "heuristics")) self.heuristics_analysis = get_config(("analysis", "heuristics"))
self.iocs_analysis = get_config(("analysis", "iocs")) self.iocs_analysis = get_config(("analysis", "iocs"))
self.whitelist_analysis = get_config(("analysis", "whitelist")) self.whitelist_analysis = get_config(("analysis", "whitelist"))
self.active_analysis = get_config(("analysis", "active")) self.active_analysis = get_config(("analysis", "active"))
self.userlang = get_config(("frontend", "user_lang")) self.userlang = get_config(("frontend", "user_lang"))
# Retreive IOCs. # Retreive IOCs.
if self.iocs_analysis: if self.iocs_analysis:
self.bl_cidrs = [[IPNetwork(cidr[0]), cidr[1]] self.bl_cidrs = [[IPNetwork(cidr[0]), cidr[1]]
for cidr in get_iocs("cidr")] for cidr in get_iocs("cidr")]
self.bl_hosts = get_iocs("ip4addr") + get_iocs("ip6addr") self.bl_hosts = get_iocs("ip4addr") + get_iocs("ip6addr")
self.bl_domains = get_iocs("domain") self.bl_domains = get_iocs("domain")
self.bl_freedns = get_iocs("freedns") self.bl_freedns = get_iocs("freedns")
self.bl_nameservers = get_iocs("ns") self.bl_nameservers = get_iocs("ns")
self.bl_tlds = get_iocs("tld") self.bl_tlds = get_iocs("tld")
# Retreive whitelisted items. # Retreive whitelisted items.
if self.whitelist_analysis: if self.whitelist_analysis:
self.wl_cidrs = [IPNetwork(cidr) for cidr in get_whitelist("cidr")] self.wl_cidrs = [IPNetwork(cidr) for cidr in get_whitelist("cidr")]
self.wl_hosts = get_whitelist("ip4addr") + get_whitelist("ip6addr") self.wl_hosts = get_whitelist("ip4addr") + get_whitelist("ip6addr")
self.wl_domains = get_whitelist("domain") self.wl_domains = get_whitelist("domain")
# Load template language # Load template language
if not re.match("^[a-z]{2,3}$", self.userlang): if not re.match("^[a-z]{2,3}$", self.userlang):
self.userlang = "en" self.userlang = "en"
with open(os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "locales/{}.json".format(self.userlang))) as f: with open(os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "locales/{}.json".format(self.userlang))) as f:
self.template = json.load(f)["alerts"] self.template = json.load(f)["alerts"]
def fill_dns(self, dir): def fill_dns(self, dir):
""" """
Fill the DNS resolutions thanks to the dns.log. Fill the DNS resolutions thanks to the dns.log.
:return: nothing - all resolutions appended to self.dns. :return: nothing - all resolutions appended to self.dns.
""" """
if os.path.isfile(os.path.join(dir, "dns.log")): if os.path.isfile(os.path.join(dir, "dns.log")):
for record in ParseZeekLogs(os.path.join(dir, "dns.log"), output_format="json", safe_headers=False): for record in ParseZeekLogs(os.path.join(dir, "dns.log"), output_format="json", safe_headers=False):
if record is not None: if record is not None:
if record["qtype_name"] in ["A", "AAAA"]: if record["qtype_name"] in ["A", "AAAA"]:
d = {"domain": record["query"], d = {"domain": record["query"],
"answers": record["answers"].split(",")} "answers": record["answers"].split(",")}
if d not in self.dns: if d not in self.dns:
self.dns.append(d) self.dns.append(d)
def netflow_check(self, dir): def netflow_check(self, dir):
""" """
Enrich and check the netflow from the conn.log against whitelist and IOCs. Enrich and check the netflow from the conn.log against whitelist and IOCs.
:return: nothing - all stuff appended to self.alerts :return: nothing - all stuff appended to self.alerts
""" """
max_ports = get_config(("analysis", "max_ports")) max_ports = get_config(("analysis", "max_ports"))
http_default_port = get_config(("analysis", "http_default_port")) http_default_port = get_config(("analysis", "http_default_port"))
# Get the netflow from conn.log. # Get the netflow from conn.log.
if os.path.isfile(os.path.join(dir, "conn.log")): if os.path.isfile(os.path.join(dir, "conn.log")):
for record in ParseZeekLogs(os.path.join(dir, "conn.log"), output_format="json", safe_headers=False): for record in ParseZeekLogs(os.path.join(dir, "conn.log"), output_format="json", safe_headers=False):
if record is not None: if record is not None:
c = {"ip_dst": record["id.resp_h"], c = {"ip_dst": record["id.resp_h"],
"proto": record["proto"], "proto": record["proto"],
"port_dst": record["id.resp_p"], "port_dst": record["id.resp_p"],
"service": record["service"], "service": record["service"],
"alert_tiggered": False} "alert_tiggered": False}
if c not in self.conns: if c not in self.conns:
self.conns.append(c) self.conns.append(c)
# Let's add some dns resolutions. # Let's add some dns resolutions.
for c in self.conns: for c in self.conns:
c["resolution"] = self.resolve(c["ip_dst"]) c["resolution"] = self.resolve(c["ip_dst"])
# Order the conns list by the resolution field. # Order the conns list by the resolution field.
self.conns = sorted(self.conns, key=lambda c: c["resolution"]) self.conns = sorted(self.conns, key=lambda c: c["resolution"])
# Check for whitelisted assets, if any, delete the record. # Check for whitelisted assets, if any, delete the record.
if self.whitelist_analysis: if self.whitelist_analysis:
for i, c in enumerate(self.conns): for i, c in enumerate(self.conns):
if c["ip_dst"] in [ip for ip in self.wl_hosts]: if c["ip_dst"] in [ip for ip in self.wl_hosts]:
self.whitelist.append(self.conns[i]) self.whitelist.append(self.conns[i])
self.conns[i] = False self.conns[i] = False
elif c["resolution"] in self.wl_domains: elif c["resolution"] in self.wl_domains:
self.whitelist.append(self.conns[i]) self.whitelist.append(self.conns[i])
self.conns[i] = False self.conns[i] = False
elif True in [c["resolution"].endswith("." + dom) for dom in self.wl_domains]: elif True in [c["resolution"].endswith("." + dom) for dom in self.wl_domains]:
self.whitelist.append(self.conns[i]) self.whitelist.append(self.conns[i])
self.conns[i] = False self.conns[i] = False
elif True in [IPAddress(c["ip_dst"]) in cidr for cidr in self.wl_cidrs]: elif True in [IPAddress(c["ip_dst"]) in cidr for cidr in self.wl_cidrs]:
self.whitelist.append(self.conns[i]) self.whitelist.append(self.conns[i])
self.conns[i] = False self.conns[i] = False
# Let's delete whitelisted connections. # Let's delete whitelisted connections.
self.conns = list(filter(lambda c: c != False, self.conns)) self.conns = list(filter(lambda c: c != False, self.conns))
if self.heuristics_analysis: if self.heuristics_analysis:
for c in self.conns: for c in self.conns:
# Check for UDP / ICMP (strange from a smartphone.) # Check for UDP / ICMP (strange from a smartphone.)
if c["proto"] in ["UDP", "ICMP"]: if c["proto"] in ["UDP", "ICMP"]:
c["alert_tiggered"] = True c["alert_tiggered"] = True
self.alerts.append({"title": self.template["PROTO-01"]["title"].format(c["proto"].upper(), c["resolution"]), self.alerts.append({"title": self.template["PROTO-01"]["title"].format(c["proto"].upper(), c["resolution"]),
"description": self.template["PROTO-01"]["description"].format(c["proto"].upper(), c["resolution"]), "description": self.template["PROTO-01"]["description"].format(c["proto"].upper(), c["resolution"]),
"host": c["resolution"], "host": c["resolution"],
"level": "Moderate", "level": "Moderate",
"id": "PROTO-01"}) "id": "PROTO-01"})
# Check for use of ports over 1024. # Check for use of ports over 1024.
if c["port_dst"] >= max_ports: if c["port_dst"] >= max_ports:
c["alert_tiggered"] = True c["alert_tiggered"] = True
self.alerts.append({"title": self.template["PROTO-02"]["title"].format(c["proto"].upper(), c["resolution"], max_ports), self.alerts.append({"title": self.template["PROTO-02"]["title"].format(c["proto"].upper(), c["resolution"], max_ports),
"description": self.template["PROTO-02"]["description"].format(c["proto"].upper(), c["resolution"], c["port_dst"]), "description": self.template["PROTO-02"]["description"].format(c["proto"].upper(), c["resolution"], c["port_dst"]),
"host": c["resolution"], "host": c["resolution"],
"level": "Low", "level": "Low",
"id": "PROTO-02"}) "id": "PROTO-02"})
# Check for use of HTTP. # Check for use of HTTP.
if c["service"] == "http" and c["port_dst"] == http_default_port: if c["service"] == "http" and c["port_dst"] == http_default_port:
c["alert_tiggered"] = True c["alert_tiggered"] = True
self.alerts.append({"title": self.template["PROTO-03"]["title"].format(c["resolution"]), self.alerts.append({"title": self.template["PROTO-03"]["title"].format(c["resolution"]),
"description": self.template["PROTO-03"]["description"].format(c["resolution"]), "description": self.template["PROTO-03"]["description"].format(c["resolution"]),
"host": c["resolution"], "host": c["resolution"],
"level": "Low", "level": "Low",
"id": "PROTO-03"}) "id": "PROTO-03"})
# Check for use of HTTP on a non standard port. # Check for use of HTTP on a non standard port.
if c["service"] == "http" and c["port_dst"] != http_default_port: if c["service"] == "http" and c["port_dst"] != http_default_port:
c["alert_tiggered"] = True c["alert_tiggered"] = True
self.alerts.append({"title": self.template["PROTO-04"]["title"].format(c["resolution"], c["port_dst"]), self.alerts.append({"title": self.template["PROTO-04"]["title"].format(c["resolution"], c["port_dst"]),
"description": self.template["PROTO-04"]["description"].format(c["resolution"], c["port_dst"]), "description": self.template["PROTO-04"]["description"].format(c["resolution"], c["port_dst"]),
"host": c["resolution"], "host": c["resolution"],
"level": "Moderate", "level": "Moderate",
"id": "PROTO-04"}) "id": "PROTO-04"})
# Check for non-resolved IP address. # Check for non-resolved IP address.
if c["ip_dst"] == c["resolution"]: if c["ip_dst"] == c["resolution"]:
c["alert_tiggered"] = True c["alert_tiggered"] = True
self.alerts.append({"title": self.template["PROTO-05"]["title"].format(c["ip_dst"]), self.alerts.append({"title": self.template["PROTO-05"]["title"].format(c["ip_dst"]),
"description": self.template["PROTO-05"]["description"].format(c["ip_dst"]), "description": self.template["PROTO-05"]["description"].format(c["ip_dst"]),
"host": c["ip_dst"], "host": c["ip_dst"],
"level": "Low", "level": "Low",
"id": "PROTO-05"}) "id": "PROTO-05"})
if self.iocs_analysis: if self.iocs_analysis:
for c in self.conns: for c in self.conns:
# Check for blacklisted IP address. # Check for blacklisted IP address.
for host in self.bl_hosts: for host in self.bl_hosts:
if c["ip_dst"] == host[0]: if c["ip_dst"] == host[0]:
c["alert_tiggered"] = True c["alert_tiggered"] = True
self.alerts.append({"title": self.template["IOC-01"]["title"].format(c["resolution"], c["ip_dst"], host[1].upper()), self.alerts.append({"title": self.template["IOC-01"]["title"].format(c["resolution"], c["ip_dst"], host[1].upper()),
"description": self.template["IOC-01"]["description"].format(c["ip_dst"]), "description": self.template["IOC-01"]["description"].format(c["ip_dst"]),
"host": c["resolution"], "host": c["resolution"],
"level": "High", "level": "High",
"id": "IOC-01"}) "id": "IOC-01"})
break break
# Check for blacklisted CIDR. # Check for blacklisted CIDR.
for cidr in self.bl_cidrs: for cidr in self.bl_cidrs:
if IPAddress(c["ip_dst"]) in cidr[0]: if IPAddress(c["ip_dst"]) in cidr[0]:
c["alert_tiggered"] = True c["alert_tiggered"] = True
self.alerts.append({"title": self.template["IOC-02"]["title"].format(c["resolution"], cidr[0], cidr[1].upper()), self.alerts.append({"title": self.template["IOC-02"]["title"].format(c["resolution"], cidr[0], cidr[1].upper()),
"description": self.template["IOC-02"]["description"].format(c["resolution"]), "description": self.template["IOC-02"]["description"].format(c["resolution"]),
"host": c["resolution"], "host": c["resolution"],
"level": "Moderate", "level": "Moderate",
"id": "IOC-02"}) "id": "IOC-02"})
# Check for blacklisted domain. # Check for blacklisted domain.
for domain in self.bl_domains: for domain in self.bl_domains:
if c["resolution"].endswith(domain[0]): if c["resolution"].endswith(domain[0]):
if domain[1] != "tracker": if domain[1] != "tracker":
c["alert_tiggered"] = True c["alert_tiggered"] = True
self.alerts.append({"title": self.template["IOC-03"]["title"].format(c["resolution"], domain[1].upper()), self.alerts.append({"title": self.template["IOC-03"]["title"].format(c["resolution"], domain[1].upper()),
"description": self.template["IOC-03"]["description"].format(c["resolution"]), "description": self.template["IOC-03"]["description"].format(c["resolution"]),
"host": c["resolution"], "host": c["resolution"],
"level": "High", "level": "High",
"id": "IOC-03"}) "id": "IOC-03"})
else: else:
c["alert_tiggered"] = True c["alert_tiggered"] = True
self.alerts.append({"title": self.template["IOC-04"]["title"].format(c["resolution"], domain[1].upper()), self.alerts.append({"title": self.template["IOC-04"]["title"].format(c["resolution"], domain[1].upper()),
"description": self.template["IOC-04"]["description"].format(c["resolution"]), "description": self.template["IOC-04"]["description"].format(c["resolution"]),
"host": c["resolution"], "host": c["resolution"],
"level": "Moderate", "level": "Moderate",
"id": "IOC-04"}) "id": "IOC-04"})
# Check for blacklisted FreeDNS. # Check for blacklisted FreeDNS.
for domain in self.bl_freedns: for domain in self.bl_freedns:
if c["resolution"].endswith("." + domain[0]): if c["resolution"].endswith("." + domain[0]):
c["alert_tiggered"] = True c["alert_tiggered"] = True
self.alerts.append({"title": self.template["IOC-05"]["title"].format(c["resolution"]), self.alerts.append({"title": self.template["IOC-05"]["title"].format(c["resolution"]),
"description": self.template["IOC-05"]["description"].format(c["resolution"]), "description": self.template["IOC-05"]["description"].format(c["resolution"]),
"host": c["resolution"], "host": c["resolution"],
"level": "Moderate", "level": "Moderate",
"id": "IOC-05"}) "id": "IOC-05"})
# Check for suspect tlds. # Check for suspect tlds.
for tld in self.bl_tlds: for tld in self.bl_tlds:
if c["resolution"].endswith(tld[0]): if c["resolution"].endswith(tld[0]):
c["alert_tiggered"] = True c["alert_tiggered"] = True
self.alerts.append({"title": self.template["IOC-06"]["title"].format(c["resolution"]), self.alerts.append({"title": self.template["IOC-06"]["title"].format(c["resolution"]),
"description": self.template["IOC-06"]["description"].format(c["resolution"], tld[0]), "description": self.template["IOC-06"]["description"].format(c["resolution"], tld[0]),
"host": c["resolution"], "host": c["resolution"],
"level": "Low", "level": "Low",
"id": "IOC-06"}) "id": "IOC-06"})
if self.active_analysis: if self.active_analysis:
for c in self.conns: for c in self.conns:
try: # Domain nameservers check. try: # Domain nameservers check.
name_servers = pydig.query(c["resolution"], "NS") name_servers = pydig.query(c["resolution"], "NS")
if len(name_servers): if len(name_servers):
for ns in self.bl_nameservers: for ns in self.bl_nameservers:
if name_servers[0].endswith(".{}.".format(ns[0])): if name_servers[0].endswith(".{}.".format(ns[0])):
c["alert_tiggered"] = True c["alert_tiggered"] = True
self.alerts.append({"title": self.template["ACT-01"]["title"].format(c["resolution"], name_servers[0]), self.alerts.append({"title": self.template["ACT-01"]["title"].format(c["resolution"], name_servers[0]),
"description": self.template["ACT-01"]["description"].format(c["resolution"]), "description": self.template["ACT-01"]["description"].format(c["resolution"]),
"host": c["resolution"], "host": c["resolution"],
"level": "Moderate", "level": "Moderate",
"id": "ACT-01"}) "id": "ACT-01"})
except: except:
pass pass
try: # Domain history check. try: # Domain history check.
whois_record = whois.whois(c["resolution"]) whois_record = whois.whois(c["resolution"])
creation_date = whois_record.creation_date if type( creation_date = whois_record.creation_date if type(
whois_record.creation_date) is not list else whois_record.creation_date[0] whois_record.creation_date) is not list else whois_record.creation_date[0]
creation_days = abs((datetime.now() - creation_date).days) creation_days = abs((datetime.now() - creation_date).days)
if creation_days < 365: if creation_days < 365:
c["alert_tiggered"] = True c["alert_tiggered"] = True
self.alerts.append({"title": self.template["ACT-02"]["title"].format(c["resolution"], creation_days), self.alerts.append({"title": self.template["ACT-02"]["title"].format(c["resolution"], creation_days),
"description": self.template["ACT-02"]["description"].format(c["resolution"]), "description": self.template["ACT-02"]["description"].format(c["resolution"]),
"host": c["resolution"], "host": c["resolution"],
"level": "Moderate", "level": "Moderate",
"id": "ACT-02"}) "id": "ACT-02"})
except: except:
pass pass
def files_check(self, dir): def files_check(self, dir):
""" """
Check on the files.log: Check on the files.log:
* Check certificates sha1 * Check certificates sha1
* [todo] Check possible binary data or APKs? * [todo] Check possible binary data or APKs?
:return: nothing - all stuff appended to self.alerts :return: nothing - all stuff appended to self.alerts
""" """
if not self.iocs_analysis: if not self.iocs_analysis:
return return
bl_certs = get_iocs("sha1cert") bl_certs = get_iocs("sha1cert")
if os.path.isfile(os.path.join(dir, "files.log")): if os.path.isfile(os.path.join(dir, "files.log")):
for record in ParseZeekLogs(os.path.join(dir, "files.log"), output_format="json", safe_headers=False): for record in ParseZeekLogs(os.path.join(dir, "files.log"), output_format="json", safe_headers=False):
if record is not None: if record is not None:
f = {"filename": record["filename"], f = {"filename": record["filename"],
"ip_src": record["id.orig_h"], "ip_src": record["id.orig_h"],
"ip_dst": record["id.resp_h"], "ip_dst": record["id.resp_h"],
"mime_type": record["mime_type"], "mime_type": record["mime_type"],
"sha1": record["sha1"]} "sha1": record["sha1"]}
if f not in self.files: if f not in self.files:
self.files.append(f) self.files.append(f)
for f in self.files: for f in self.files:
if f["mime_type"] == "application/x-x509-user-cert": if f["mime_type"] == "application/x-x509-user-cert":
for cert in bl_certs: # Check for blacklisted certificate. for cert in bl_certs: # Check for blacklisted certificate.
if f["sha1"] == cert[0]: if f["sha1"] == cert[0]:
host = self.resolve(f["ip_src"]) host = self.resolve(f["ip_src"])
self.alerts.append({"title": self.template["IOC-07"]["title"].format(cert[1].upper(), host), self.alerts.append({"title": self.template["IOC-07"]["title"].format(cert[1].upper(), host),
"description": self.template["IOC-07"]["description"].format(f["sha1"], host), "description": self.template["IOC-07"]["description"].format(f["sha1"], host),
"host": host, "host": host,
"level": "High", "level": "High",
"id": "IOC-07"}) "id": "IOC-07"})
def http_check(self, dir): def http_check(self, dir):
""" """
Check on the http.log: Check on the http.log:
* Blacklisted domain/tld from the Host header field. * Blacklisted domain/tld from the Host header field.
Can be used when no DNS query have been done during the session (already cached by the device.) Can be used when no DNS query have been done during the session (already cached by the device.)
:return: nothing - all stuff appended to self.alerts :return: nothing - all stuff appended to self.alerts
""" """
if os.path.isfile(os.path.join(dir, "http.log")): if os.path.isfile(os.path.join(dir, "http.log")):
for record in ParseZeekLogs(os.path.join(dir, "http.log"), output_format="json", safe_headers=False): for record in ParseZeekLogs(os.path.join(dir, "http.log"), output_format="json", safe_headers=False):
if record is not None: if record is not None:
c = {"host": record['host']} c = {"host": record['host']}
if c not in self.http: if c not in self.http:
self.http.append(c) self.http.append(c)
if self.iocs_analysis: if self.iocs_analysis:
for c in self.http: for c in self.http:
# If we already know the host form DNS, let's continue. # If we already know the host form DNS, let's continue.
if c["host"] in [r["domain"] for r in self.dns]: if c["host"] in [r["domain"] for r in self.dns]:
continue continue
# Check for blacklisted domain. # Check for blacklisted domain.
for h in self.bl_domains: for h in self.bl_domains:
if h[1] != "tracker": if h[1] != "tracker":
if c["host"].endswith(h[0]): if c["host"].endswith(h[0]):
self.alerts.append({"title": self.template["IOC-08"]["title"].format(c["host"], h[1].upper()), self.alerts.append({"title": self.template["IOC-08"]["title"].format(c["host"], h[1].upper()),
"description": self.template["IOC-08"]["description"].format(c["host"]), "description": self.template["IOC-08"]["description"].format(c["host"]),
"host": c["host"], "host": c["host"],
"level": "High", "level": "High",
"id": "IOC-08"}) "id": "IOC-08"})
# Check for freedns. # Check for freedns.
for h in self.bl_freedns: for h in self.bl_freedns:
if c["host"].endswith(h[0]): if c["host"].endswith(h[0]):
self.alerts.append({"title": self.template["IOC-09"]["title"].format(c["host"]), self.alerts.append({"title": self.template["IOC-09"]["title"].format(c["host"]),
"description": self.template["IOC-09"]["description"].format(c["host"]), "description": self.template["IOC-09"]["description"].format(c["host"]),
"host": c["host"], "host": c["host"],
"level": "Moderate", "level": "Moderate",
"id": "IOC-09"}) "id": "IOC-09"})
# Check for fancy TLD. # Check for fancy TLD.
for h in self.bl_tlds: for h in self.bl_tlds:
if c["host"].endswith(h[0]): if c["host"].endswith(h[0]):
self.alerts.append({"title": self.template["IOC-10"]["title"].format(c["host"]), self.alerts.append({"title": self.template["IOC-10"]["title"].format(c["host"]),
"description": self.template["IOC-10"]["description"].format(c["host"], h[0]), "description": self.template["IOC-10"]["description"].format(c["host"], h[0]),
"host": c["host"], "host": c["host"],
"level": "Low", "level": "Low",
"id": "IOC-10"}) "id": "IOC-10"})
def ssl_check(self, dir): def ssl_check(self, dir):
""" """
Check on the ssl.log: Check on the ssl.log:
* SSL connections which doesn't use the 443. * SSL connections which doesn't use the 443.
* "Free" certificate issuer (taken from the config). * "Free" certificate issuer (taken from the config).
* Self-signed certificates. * Self-signed certificates.
* Blacklisted domain in the CN * Blacklisted domain in the CN
:return: nothing - all stuff appended to self.alerts :return: nothing - all stuff appended to self.alerts
""" """
ssl_default_ports = get_config(("analysis", "ssl_default_ports")) ssl_default_ports = get_config(("analysis", "ssl_default_ports"))
free_issuers = get_config(("analysis", "free_issuers")) free_issuers = get_config(("analysis", "free_issuers"))
if os.path.isfile(os.path.join(dir, "ssl.log")): if os.path.isfile(os.path.join(dir, "ssl.log")):
for record in ParseZeekLogs(os.path.join(dir, "ssl.log"), output_format="json", safe_headers=False): for record in ParseZeekLogs(os.path.join(dir, "ssl.log"), output_format="json", safe_headers=False):
if record is not None: if record is not None:
c = {"host": record['id.resp_h'], c = {"host": record['id.resp_h'],
"port": record['id.resp_p'], "port": record['id.resp_p'],
"issuer": record["issuer"] if "issuer" in record else "", "issuer": record["issuer"] if "issuer" in record else "",
"validation_status": record["validation_status"], "validation_status": record["validation_status"],
"cn": record["server_name"] if "server_name" in record else ""} "cn": record["server_name"] if "server_name" in record else ""}
if c not in self.ssl: if c not in self.ssl:
self.ssl.append(c) self.ssl.append(c)
if self.heuristics_analysis: if self.heuristics_analysis:
for cert in self.ssl: for cert in self.ssl:
host = self.resolve(cert["host"]) host = self.resolve(cert["host"])
# If the associated host has not whitelisted, check the cert. # If the associated host has not whitelisted, check the cert.
for c in self.conns: for c in self.conns:
if host in c["resolution"]: if host in c["resolution"]:
# Check for non generic SSL port. # Check for non generic SSL port.
if cert["port"] not in ssl_default_ports: if cert["port"] not in ssl_default_ports:
c["alert_tiggered"] = True c["alert_tiggered"] = True
self.alerts.append({"title": self.template["SSL-01"]["title"].format(cert["port"], host), self.alerts.append({"title": self.template["SSL-01"]["title"].format(cert["port"], host),
"description": self.template["SSL-01"]["description"].format(host), "description": self.template["SSL-01"]["description"].format(host),
"host": host, "host": host,
"level": "Moderate", "level": "Moderate",
"id": "SSL-01"}) "id": "SSL-01"})
# Check Free SSL certificates. # Check Free SSL certificates.
if cert["issuer"] in free_issuers: if cert["issuer"] in free_issuers:
c["alert_tiggered"] = True c["alert_tiggered"] = True
self.alerts.append({"title": self.template["SSL-02"]["title"].format(host), self.alerts.append({"title": self.template["SSL-02"]["title"].format(host),
"description": self.template["SSL-02"]["description"], "description": self.template["SSL-02"]["description"],
"host": host, "host": host,
"level": "Moderate", "level": "Moderate",
"id": "SSL-02"}) "id": "SSL-02"})
# Check for self-signed certificates. # Check for self-signed certificates.
if cert["validation_status"] == "self signed certificate in certificate chain": if cert["validation_status"] == "self signed certificate in certificate chain":
c["alert_tiggered"] = True c["alert_tiggered"] = True
self.alerts.append({"title": self.template["SSL-03"]["title"].format(host), self.alerts.append({"title": self.template["SSL-03"]["title"].format(host),
"description": self.template["SSL-03"]["description"].format(host), "description": self.template["SSL-03"]["description"].format(host),
"host": host, "host": host,
"level": "Moderate", "level": "Moderate",
"id": "SSL-03"}) "id": "SSL-03"})
if self.iocs_analysis: if self.iocs_analysis:
for cert in self.ssl: for cert in self.ssl:
# Check if the domain in the certificate haven't been blacklisted # Check if the domain in the certificate haven't been blacklisted
# This check can be good if the domain has already been cached by # This check can be good if the domain has already been cached by
# the device so it wont appear in self.dns. # the device so it wont appear in self.dns.
if any([cert["cn"].endswith(r["domain"]) for r in self.dns]): if any([cert["cn"].endswith(r["domain"]) for r in self.dns]):
continue continue
for domain in self.bl_domains: for domain in self.bl_domains:
if domain[1] != "tracker": if domain[1] != "tracker":
if cert["cn"].endswith(domain[0]): if cert["cn"].endswith(domain[0]):
self.alerts.append({"title": self.template["SSL-04"]["title"].format(domain[0], domain[1].upper()), self.alerts.append({"title": self.template["SSL-04"]["title"].format(domain[0], domain[1].upper()),
"description": self.template["SSL-04"]["description"].format(domain[0]), "description": self.template["SSL-04"]["description"].format(domain[0]),
"host": domain[0], "host": domain[0],
"level": "High", "level": "High",
"id": "SSL-04"}) "id": "SSL-04"})
def alerts_check(self): def alerts_check(self):
""" """
Leverage an advice to the user based on the trigered hosts Leverage an advice to the user based on the trigered hosts
:return: nothing - all generated alerts appended to self.alerts :return: nothing - all generated alerts appended to self.alerts
""" """
hosts = {} hosts = {}
for alert in [dict(t) for t in {tuple(d.items()) for d in self.alerts}]: for alert in [dict(t) for t in {tuple(d.items()) for d in self.alerts}]:
if alert["host"] not in hosts: if alert["host"] not in hosts:
hosts[alert["host"]] = 1 hosts[alert["host"]] = 1
else: else:
hosts[alert["host"]] += 1 hosts[alert["host"]] += 1
for host, nb in hosts.items(): for host, nb in hosts.items():
if nb >= get_config(("analysis", "max_alerts")): if nb >= get_config(("analysis", "max_alerts")):
self.alerts.append({"title": self.template["ADV-01"]["title"].format(host), self.alerts.append({"title": self.template["ADV-01"]["title"].format(host),
"description": self.template["ADV-01"]["description"].format(host, nb), "description": self.template["ADV-01"]["description"].format(host, nb),
"host": host, "host": host,
"level": "Moderate", "level": "Moderate",
"id": "ADV-01"}) "id": "ADV-01"})
def resolve(self, ip_addr): def resolve(self, ip_addr):
""" """
A simple method to retreive DNS names from IP addresses A simple method to retreive DNS names from IP addresses
in order to replace them in alerts. in order to replace them in alerts.
:return: String - DNS record or IP Address. :return: String - DNS record or IP Address.
""" """
for record in self.dns: for record in self.dns:
if ip_addr in record["answers"]: if ip_addr in record["answers"]:
return record["domain"] return record["domain"]
return ip_addr return ip_addr
def start_zeek(self): def start_zeek(self):
""" """
Start zeek and check the logs. Start zeek and check the logs.
""" """
sp.Popen("cd {} && /opt/zeek/bin/zeek -Cr capture.pcap protocols/ssl/validate-certs".format( sp.Popen("cd {} && /opt/zeek/bin/zeek -Cr capture.pcap protocols/ssl/validate-certs".format(
self.working_dir), shell=True).wait() self.working_dir), shell=False).wait()
sp.Popen("cd {} && mv *.log assets/".format(self.working_dir), sp.Popen("cd {} && mv *.log assets/".format(self.working_dir),
shell=True).wait() shell=False).wait()
self.fill_dns(self.working_dir + "/assets/") self.fill_dns(self.working_dir + "/assets/")
self.netflow_check(self.working_dir + "/assets/") self.netflow_check(self.working_dir + "/assets/")
self.ssl_check(self.working_dir + "/assets/") self.ssl_check(self.working_dir + "/assets/")
self.http_check(self.working_dir + "/assets/") self.http_check(self.working_dir + "/assets/")
self.files_check(self.working_dir + "/assets/") self.files_check(self.working_dir + "/assets/")
self.alerts_check() self.alerts_check()
def retrieve_alerts(self): def retrieve_alerts(self):
""" """
Retrieve alerts. Retrieve alerts.
:return: list - a list of alerts wihout duplicates. :return: list - a list of alerts wihout duplicates.
""" """
return [dict(t) for t in {tuple(d.items()) for d in self.alerts}] return [dict(t) for t in {tuple(d.items()) for d in self.alerts}]
def retrieve_whitelist(self): def retrieve_whitelist(self):
""" """
Retrieve whitelisted elements. Retrieve whitelisted elements.
:return: list - a list of whitelisted elements wihout duplicates. :return: list - a list of whitelisted elements wihout duplicates.
""" """
return [dict(t) for t in {tuple(d.items()) for d in self.whitelist}] return [dict(t) for t in {tuple(d.items()) for d in self.whitelist}]
def retrieve_conns(self): def retrieve_conns(self):
""" """
Retrieve not whitelisted elements. Retrieve not whitelisted elements.
:return: list - a list of non-whitelisted elements wihout duplicates. :return: list - a list of non-whitelisted elements wihout duplicates.
""" """
return self.conns return self.conns