Compare commits

...

4 Commits

Author SHA1 Message Date
Robert Bongart (MSc MSc MA)
13ddb3b0bc
Merge pull request #5 from 2lambda123/pixeebot/ablesov/fix_field_name
Hardening suggestions for KasperskyLab-TinyCheck / ablesov/fix_field_name
2024-04-14 11:59:34 -05:00
pixeebot[bot]
daca707ad8
Use Generator Expressions Instead of List Comprehensions 2024-04-14 16:16:32 +00:00
pixeebot[bot]
216db5d387
Sandbox Process Creation 2024-04-14 16:16:31 +00:00
pixeebot[bot]
887e90cd06
Use shell=False in subprocess Function Calls 2024-04-14 16:16:31 +00:00
2 changed files with 499 additions and 495 deletions

View File

@ -1,478 +1,479 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from classes.parsezeeklogs import ParseZeekLogs from classes.parsezeeklogs import ParseZeekLogs
from netaddr import IPNetwork, IPAddress from netaddr import IPNetwork, IPAddress
from utils import get_iocs, get_config, get_whitelist from utils import get_iocs, get_config, get_whitelist
from datetime import datetime from datetime import datetime
import subprocess as sp import subprocess as sp
import json import json
import pydig import pydig
import os import os
import re import re
import sys import sys
import whois import whois
from security import safe_command
class ZeekEngine(object):
class ZeekEngine(object):
def __init__(self, capture_directory):
self.working_dir = capture_directory def __init__(self, capture_directory):
self.alerts = [] self.working_dir = capture_directory
self.conns = [] self.alerts = []
self.ssl = [] self.conns = []
self.http = [] self.ssl = []
self.dns = [] self.http = []
self.files = [] self.dns = []
self.whitelist = [] self.files = []
self.whitelist = []
# Get analysis and userlang configuration
self.heuristics_analysis = get_config(("analysis", "heuristics")) # Get analysis and userlang configuration
self.iocs_analysis = get_config(("analysis", "iocs")) self.heuristics_analysis = get_config(("analysis", "heuristics"))
self.whitelist_analysis = get_config(("analysis", "whitelist")) self.iocs_analysis = get_config(("analysis", "iocs"))
self.active_analysis = get_config(("analysis", "active")) self.whitelist_analysis = get_config(("analysis", "whitelist"))
self.userlang = get_config(("frontend", "user_lang")) self.active_analysis = get_config(("analysis", "active"))
self.userlang = get_config(("frontend", "user_lang"))
# Retreive IOCs.
if self.iocs_analysis: # Retreive IOCs.
self.bl_cidrs = [[IPNetwork(cidr[0]), cidr[1]] if self.iocs_analysis:
for cidr in get_iocs("cidr")] self.bl_cidrs = [[IPNetwork(cidr[0]), cidr[1]]
self.bl_hosts = get_iocs("ip4addr") + get_iocs("ip6addr") for cidr in get_iocs("cidr")]
self.bl_domains = get_iocs("domain") self.bl_hosts = get_iocs("ip4addr") + get_iocs("ip6addr")
self.bl_freedns = get_iocs("freedns") self.bl_domains = get_iocs("domain")
self.bl_nameservers = get_iocs("ns") self.bl_freedns = get_iocs("freedns")
self.bl_tlds = get_iocs("tld") self.bl_nameservers = get_iocs("ns")
self.bl_tlds = get_iocs("tld")
# Retreive whitelisted items.
if self.whitelist_analysis: # Retreive whitelisted items.
self.wl_cidrs = [IPNetwork(cidr) for cidr in get_whitelist("cidr")] if self.whitelist_analysis:
self.wl_hosts = get_whitelist("ip4addr") + get_whitelist("ip6addr") self.wl_cidrs = [IPNetwork(cidr) for cidr in get_whitelist("cidr")]
self.wl_domains = get_whitelist("domain") self.wl_hosts = get_whitelist("ip4addr") + get_whitelist("ip6addr")
self.wl_domains = get_whitelist("domain")
# Load template language
if not re.match("^[a-z]{2,3}$", self.userlang): # Load template language
self.userlang = "en" if not re.match("^[a-z]{2,3}$", self.userlang):
with open(os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "locales/{}.json".format(self.userlang))) as f: self.userlang = "en"
self.template = json.load(f)["alerts"] with open(os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "locales/{}.json".format(self.userlang))) as f:
self.template = json.load(f)["alerts"]
def fill_dns(self, dir):
""" def fill_dns(self, dir):
Fill the DNS resolutions thanks to the dns.log. """
:return: nothing - all resolutions appended to self.dns. Fill the DNS resolutions thanks to the dns.log.
""" :return: nothing - all resolutions appended to self.dns.
if os.path.isfile(os.path.join(dir, "dns.log")): """
for record in ParseZeekLogs(os.path.join(dir, "dns.log"), output_format="json", safe_headers=False): if os.path.isfile(os.path.join(dir, "dns.log")):
if record is not None: for record in ParseZeekLogs(os.path.join(dir, "dns.log"), output_format="json", safe_headers=False):
if record["qtype_name"] in ["A", "AAAA"]: if record is not None:
d = {"domain": record["query"], if record["qtype_name"] in ["A", "AAAA"]:
"answers": record["answers"].split(",")} d = {"domain": record["query"],
if d not in self.dns: "answers": record["answers"].split(",")}
self.dns.append(d) if d not in self.dns:
self.dns.append(d)
def netflow_check(self, dir):
""" def netflow_check(self, dir):
Enrich and check the netflow from the conn.log against whitelist and IOCs. """
:return: nothing - all stuff appended to self.alerts Enrich and check the netflow from the conn.log against whitelist and IOCs.
""" :return: nothing - all stuff appended to self.alerts
max_ports = get_config(("analysis", "max_ports")) """
http_default_port = get_config(("analysis", "http_default_port")) max_ports = get_config(("analysis", "max_ports"))
http_default_port = get_config(("analysis", "http_default_port"))
# Get the netflow from conn.log.
if os.path.isfile(os.path.join(dir, "conn.log")): # Get the netflow from conn.log.
for record in ParseZeekLogs(os.path.join(dir, "conn.log"), output_format="json", safe_headers=False): if os.path.isfile(os.path.join(dir, "conn.log")):
if record is not None: for record in ParseZeekLogs(os.path.join(dir, "conn.log"), output_format="json", safe_headers=False):
c = {"ip_dst": record["id.resp_h"], if record is not None:
"proto": record["proto"], c = {"ip_dst": record["id.resp_h"],
"port_dst": record["id.resp_p"], "proto": record["proto"],
"service": record["service"], "port_dst": record["id.resp_p"],
"alert_tiggered": False} "service": record["service"],
if c not in self.conns: "alert_tiggered": False}
self.conns.append(c) if c not in self.conns:
self.conns.append(c)
# Let's add some dns resolutions.
for c in self.conns: # Let's add some dns resolutions.
c["resolution"] = self.resolve(c["ip_dst"]) for c in self.conns:
c["resolution"] = self.resolve(c["ip_dst"])
# Order the conns list by the resolution field.
self.conns = sorted(self.conns, key=lambda c: c["resolution"]) # Order the conns list by the resolution field.
self.conns = sorted(self.conns, key=lambda c: c["resolution"])
# Check for whitelisted assets, if any, delete the record.
if self.whitelist_analysis: # Check for whitelisted assets, if any, delete the record.
if self.whitelist_analysis:
for i, c in enumerate(self.conns):
if c["ip_dst"] in [ip for ip in self.wl_hosts]: for i, c in enumerate(self.conns):
self.whitelist.append(self.conns[i]) if c["ip_dst"] in [ip for ip in self.wl_hosts]:
self.conns[i] = False self.whitelist.append(self.conns[i])
elif c["resolution"] in self.wl_domains: self.conns[i] = False
self.whitelist.append(self.conns[i]) elif c["resolution"] in self.wl_domains:
self.conns[i] = False self.whitelist.append(self.conns[i])
elif True in [c["resolution"].endswith("." + dom) for dom in self.wl_domains]: self.conns[i] = False
self.whitelist.append(self.conns[i]) elif True in [c["resolution"].endswith("." + dom) for dom in self.wl_domains]:
self.conns[i] = False self.whitelist.append(self.conns[i])
elif True in [IPAddress(c["ip_dst"]) in cidr for cidr in self.wl_cidrs]: self.conns[i] = False
self.whitelist.append(self.conns[i]) elif True in [IPAddress(c["ip_dst"]) in cidr for cidr in self.wl_cidrs]:
self.conns[i] = False self.whitelist.append(self.conns[i])
self.conns[i] = False
# Let's delete whitelisted connections.
self.conns = list(filter(lambda c: c != False, self.conns)) # Let's delete whitelisted connections.
self.conns = list(filter(lambda c: c != False, self.conns))
if self.heuristics_analysis:
for c in self.conns: if self.heuristics_analysis:
# Check for UDP / ICMP (strange from a smartphone.) for c in self.conns:
if c["proto"] in ["UDP", "ICMP"]: # Check for UDP / ICMP (strange from a smartphone.)
c["alert_tiggered"] = True if c["proto"] in ["UDP", "ICMP"]:
self.alerts.append({"title": self.template["PROTO-01"]["title"].format(c["proto"].upper(), c["resolution"]), c["alert_tiggered"] = True
"description": self.template["PROTO-01"]["description"].format(c["proto"].upper(), c["resolution"]), self.alerts.append({"title": self.template["PROTO-01"]["title"].format(c["proto"].upper(), c["resolution"]),
"host": c["resolution"], "description": self.template["PROTO-01"]["description"].format(c["proto"].upper(), c["resolution"]),
"level": "Moderate", "host": c["resolution"],
"id": "PROTO-01"}) "level": "Moderate",
# Check for use of ports over 1024. "id": "PROTO-01"})
if c["port_dst"] >= max_ports: # Check for use of ports over 1024.
c["alert_tiggered"] = True if c["port_dst"] >= max_ports:
self.alerts.append({"title": self.template["PROTO-02"]["title"].format(c["proto"].upper(), c["resolution"], max_ports), c["alert_tiggered"] = True
"description": self.template["PROTO-02"]["description"].format(c["proto"].upper(), c["resolution"], c["port_dst"]), self.alerts.append({"title": self.template["PROTO-02"]["title"].format(c["proto"].upper(), c["resolution"], max_ports),
"host": c["resolution"], "description": self.template["PROTO-02"]["description"].format(c["proto"].upper(), c["resolution"], c["port_dst"]),
"level": "Low", "host": c["resolution"],
"id": "PROTO-02"}) "level": "Low",
# Check for use of HTTP. "id": "PROTO-02"})
if c["service"] == "http" and c["port_dst"] == http_default_port: # Check for use of HTTP.
c["alert_tiggered"] = True if c["service"] == "http" and c["port_dst"] == http_default_port:
self.alerts.append({"title": self.template["PROTO-03"]["title"].format(c["resolution"]), c["alert_tiggered"] = True
"description": self.template["PROTO-03"]["description"].format(c["resolution"]), self.alerts.append({"title": self.template["PROTO-03"]["title"].format(c["resolution"]),
"host": c["resolution"], "description": self.template["PROTO-03"]["description"].format(c["resolution"]),
"level": "Low", "host": c["resolution"],
"id": "PROTO-03"}) "level": "Low",
"id": "PROTO-03"})
# Check for use of HTTP on a non standard port.
if c["service"] == "http" and c["port_dst"] != http_default_port: # Check for use of HTTP on a non standard port.
c["alert_tiggered"] = True if c["service"] == "http" and c["port_dst"] != http_default_port:
self.alerts.append({"title": self.template["PROTO-04"]["title"].format(c["resolution"], c["port_dst"]), c["alert_tiggered"] = True
"description": self.template["PROTO-04"]["description"].format(c["resolution"], c["port_dst"]), self.alerts.append({"title": self.template["PROTO-04"]["title"].format(c["resolution"], c["port_dst"]),
"host": c["resolution"], "description": self.template["PROTO-04"]["description"].format(c["resolution"], c["port_dst"]),
"level": "Moderate", "host": c["resolution"],
"id": "PROTO-04"}) "level": "Moderate",
# Check for non-resolved IP address. "id": "PROTO-04"})
if c["ip_dst"] == c["resolution"]: # Check for non-resolved IP address.
c["alert_tiggered"] = True if c["ip_dst"] == c["resolution"]:
self.alerts.append({"title": self.template["PROTO-05"]["title"].format(c["ip_dst"]), c["alert_tiggered"] = True
"description": self.template["PROTO-05"]["description"].format(c["ip_dst"]), self.alerts.append({"title": self.template["PROTO-05"]["title"].format(c["ip_dst"]),
"host": c["ip_dst"], "description": self.template["PROTO-05"]["description"].format(c["ip_dst"]),
"level": "Low", "host": c["ip_dst"],
"id": "PROTO-05"}) "level": "Low",
"id": "PROTO-05"})
if self.iocs_analysis:
if self.iocs_analysis:
for c in self.conns:
# Check for blacklisted IP address. for c in self.conns:
for host in self.bl_hosts: # Check for blacklisted IP address.
if c["ip_dst"] == host[0]: for host in self.bl_hosts:
c["alert_tiggered"] = True if c["ip_dst"] == host[0]:
self.alerts.append({"title": self.template["IOC-01"]["title"].format(c["resolution"], c["ip_dst"], host[1].upper()), c["alert_tiggered"] = True
"description": self.template["IOC-01"]["description"].format(c["ip_dst"]), self.alerts.append({"title": self.template["IOC-01"]["title"].format(c["resolution"], c["ip_dst"], host[1].upper()),
"host": c["resolution"], "description": self.template["IOC-01"]["description"].format(c["ip_dst"]),
"level": "High", "host": c["resolution"],
"id": "IOC-01"}) "level": "High",
break "id": "IOC-01"})
# Check for blacklisted CIDR. break
for cidr in self.bl_cidrs: # Check for blacklisted CIDR.
if IPAddress(c["ip_dst"]) in cidr[0]: for cidr in self.bl_cidrs:
c["alert_tiggered"] = True if IPAddress(c["ip_dst"]) in cidr[0]:
self.alerts.append({"title": self.template["IOC-02"]["title"].format(c["resolution"], cidr[0], cidr[1].upper()), c["alert_tiggered"] = True
"description": self.template["IOC-02"]["description"].format(c["resolution"]), self.alerts.append({"title": self.template["IOC-02"]["title"].format(c["resolution"], cidr[0], cidr[1].upper()),
"host": c["resolution"], "description": self.template["IOC-02"]["description"].format(c["resolution"]),
"level": "Moderate", "host": c["resolution"],
"id": "IOC-02"}) "level": "Moderate",
# Check for blacklisted domain. "id": "IOC-02"})
for domain in self.bl_domains: # Check for blacklisted domain.
if c["resolution"].endswith(domain[0]): for domain in self.bl_domains:
if domain[1] != "tracker": if c["resolution"].endswith(domain[0]):
c["alert_tiggered"] = True if domain[1] != "tracker":
self.alerts.append({"title": self.template["IOC-03"]["title"].format(c["resolution"], domain[1].upper()), c["alert_tiggered"] = True
"description": self.template["IOC-03"]["description"].format(c["resolution"]), self.alerts.append({"title": self.template["IOC-03"]["title"].format(c["resolution"], domain[1].upper()),
"host": c["resolution"], "description": self.template["IOC-03"]["description"].format(c["resolution"]),
"level": "High", "host": c["resolution"],
"id": "IOC-03"}) "level": "High",
else: "id": "IOC-03"})
c["alert_tiggered"] = True else:
self.alerts.append({"title": self.template["IOC-04"]["title"].format(c["resolution"], domain[1].upper()), c["alert_tiggered"] = True
"description": self.template["IOC-04"]["description"].format(c["resolution"]), self.alerts.append({"title": self.template["IOC-04"]["title"].format(c["resolution"], domain[1].upper()),
"host": c["resolution"], "description": self.template["IOC-04"]["description"].format(c["resolution"]),
"level": "Moderate", "host": c["resolution"],
"id": "IOC-04"}) "level": "Moderate",
# Check for blacklisted FreeDNS. "id": "IOC-04"})
for domain in self.bl_freedns: # Check for blacklisted FreeDNS.
if c["resolution"].endswith("." + domain[0]): for domain in self.bl_freedns:
c["alert_tiggered"] = True if c["resolution"].endswith("." + domain[0]):
self.alerts.append({"title": self.template["IOC-05"]["title"].format(c["resolution"]), c["alert_tiggered"] = True
"description": self.template["IOC-05"]["description"].format(c["resolution"]), self.alerts.append({"title": self.template["IOC-05"]["title"].format(c["resolution"]),
"host": c["resolution"], "description": self.template["IOC-05"]["description"].format(c["resolution"]),
"level": "Moderate", "host": c["resolution"],
"id": "IOC-05"}) "level": "Moderate",
"id": "IOC-05"})
# Check for suspect tlds.
for tld in self.bl_tlds: # Check for suspect tlds.
if c["resolution"].endswith(tld[0]): for tld in self.bl_tlds:
c["alert_tiggered"] = True if c["resolution"].endswith(tld[0]):
self.alerts.append({"title": self.template["IOC-06"]["title"].format(c["resolution"]), c["alert_tiggered"] = True
"description": self.template["IOC-06"]["description"].format(c["resolution"], tld[0]), self.alerts.append({"title": self.template["IOC-06"]["title"].format(c["resolution"]),
"host": c["resolution"], "description": self.template["IOC-06"]["description"].format(c["resolution"], tld[0]),
"level": "Low", "host": c["resolution"],
"id": "IOC-06"}) "level": "Low",
if self.active_analysis: "id": "IOC-06"})
for c in self.conns: if self.active_analysis:
try: # Domain nameservers check. for c in self.conns:
name_servers = pydig.query(c["resolution"], "NS") try: # Domain nameservers check.
if len(name_servers): name_servers = pydig.query(c["resolution"], "NS")
for ns in self.bl_nameservers: if len(name_servers):
if name_servers[0].endswith(".{}.".format(ns[0])): for ns in self.bl_nameservers:
c["alert_tiggered"] = True if name_servers[0].endswith(".{}.".format(ns[0])):
self.alerts.append({"title": self.template["ACT-01"]["title"].format(c["resolution"], name_servers[0]), c["alert_tiggered"] = True
"description": self.template["ACT-01"]["description"].format(c["resolution"]), self.alerts.append({"title": self.template["ACT-01"]["title"].format(c["resolution"], name_servers[0]),
"host": c["resolution"], "description": self.template["ACT-01"]["description"].format(c["resolution"]),
"level": "Moderate", "host": c["resolution"],
"id": "ACT-01"}) "level": "Moderate",
except: "id": "ACT-01"})
pass except:
pass
try: # Domain history check.
try: # Domain history check.
whois_record = whois.whois(c["resolution"])
creation_date = whois_record.creation_date if type( whois_record = whois.whois(c["resolution"])
whois_record.creation_date) is not list else whois_record.creation_date[0] creation_date = whois_record.creation_date if type(
creation_days = abs((datetime.now() - creation_date).days) whois_record.creation_date) is not list else whois_record.creation_date[0]
if creation_days < 365: creation_days = abs((datetime.now() - creation_date).days)
c["alert_tiggered"] = True if creation_days < 365:
self.alerts.append({"title": self.template["ACT-02"]["title"].format(c["resolution"], creation_days), c["alert_tiggered"] = True
"description": self.template["ACT-02"]["description"].format(c["resolution"]), self.alerts.append({"title": self.template["ACT-02"]["title"].format(c["resolution"], creation_days),
"host": c["resolution"], "description": self.template["ACT-02"]["description"].format(c["resolution"]),
"level": "Moderate", "host": c["resolution"],
"id": "ACT-02"}) "level": "Moderate",
"id": "ACT-02"})
except:
pass except:
pass
def files_check(self, dir):
""" def files_check(self, dir):
Check on the files.log: """
* Check certificates sha1 Check on the files.log:
* [todo] Check possible binary data or APKs? * Check certificates sha1
:return: nothing - all stuff appended to self.alerts * [todo] Check possible binary data or APKs?
""" :return: nothing - all stuff appended to self.alerts
"""
if not self.iocs_analysis:
return if not self.iocs_analysis:
return
bl_certs = get_iocs("sha1cert")
bl_certs = get_iocs("sha1cert")
if os.path.isfile(os.path.join(dir, "files.log")):
for record in ParseZeekLogs(os.path.join(dir, "files.log"), output_format="json", safe_headers=False): if os.path.isfile(os.path.join(dir, "files.log")):
if record is not None: for record in ParseZeekLogs(os.path.join(dir, "files.log"), output_format="json", safe_headers=False):
f = {"filename": record["filename"], if record is not None:
"ip_src": record["id.orig_h"], f = {"filename": record["filename"],
"ip_dst": record["id.resp_h"], "ip_src": record["id.orig_h"],
"mime_type": record["mime_type"], "ip_dst": record["id.resp_h"],
"sha1": record["sha1"]} "mime_type": record["mime_type"],
if f not in self.files: "sha1": record["sha1"]}
self.files.append(f) if f not in self.files:
self.files.append(f)
for f in self.files:
if f["mime_type"] == "application/x-x509-user-cert": for f in self.files:
for cert in bl_certs: # Check for blacklisted certificate. if f["mime_type"] == "application/x-x509-user-cert":
if f["sha1"] == cert[0]: for cert in bl_certs: # Check for blacklisted certificate.
host = self.resolve(f["ip_src"]) if f["sha1"] == cert[0]:
self.alerts.append({"title": self.template["IOC-07"]["title"].format(cert[1].upper(), host), host = self.resolve(f["ip_src"])
"description": self.template["IOC-07"]["description"].format(f["sha1"], host), self.alerts.append({"title": self.template["IOC-07"]["title"].format(cert[1].upper(), host),
"host": host, "description": self.template["IOC-07"]["description"].format(f["sha1"], host),
"level": "High", "host": host,
"id": "IOC-07"}) "level": "High",
"id": "IOC-07"})
def http_check(self, dir):
""" def http_check(self, dir):
Check on the http.log: """
* Blacklisted domain/tld from the Host header field. Check on the http.log:
Can be used when no DNS query have been done during the session (already cached by the device.) * Blacklisted domain/tld from the Host header field.
:return: nothing - all stuff appended to self.alerts Can be used when no DNS query have been done during the session (already cached by the device.)
""" :return: nothing - all stuff appended to self.alerts
"""
if os.path.isfile(os.path.join(dir, "http.log")):
for record in ParseZeekLogs(os.path.join(dir, "http.log"), output_format="json", safe_headers=False): if os.path.isfile(os.path.join(dir, "http.log")):
if record is not None: for record in ParseZeekLogs(os.path.join(dir, "http.log"), output_format="json", safe_headers=False):
c = {"host": record['host']} if record is not None:
if c not in self.http: c = {"host": record['host']}
self.http.append(c) if c not in self.http:
self.http.append(c)
if self.iocs_analysis:
for c in self.http: if self.iocs_analysis:
for c in self.http:
# If we already know the host form DNS, let's continue.
if c["host"] in [r["domain"] for r in self.dns]: # If we already know the host form DNS, let's continue.
continue if c["host"] in [r["domain"] for r in self.dns]:
continue
# Check for blacklisted domain.
for h in self.bl_domains: # Check for blacklisted domain.
if h[1] != "tracker": for h in self.bl_domains:
if c["host"].endswith(h[0]): if h[1] != "tracker":
self.alerts.append({"title": self.template["IOC-08"]["title"].format(c["host"], h[1].upper()), if c["host"].endswith(h[0]):
"description": self.template["IOC-08"]["description"].format(c["host"]), self.alerts.append({"title": self.template["IOC-08"]["title"].format(c["host"], h[1].upper()),
"host": c["host"], "description": self.template["IOC-08"]["description"].format(c["host"]),
"level": "High", "host": c["host"],
"id": "IOC-08"}) "level": "High",
# Check for freedns. "id": "IOC-08"})
for h in self.bl_freedns: # Check for freedns.
if c["host"].endswith(h[0]): for h in self.bl_freedns:
self.alerts.append({"title": self.template["IOC-09"]["title"].format(c["host"]), if c["host"].endswith(h[0]):
"description": self.template["IOC-09"]["description"].format(c["host"]), self.alerts.append({"title": self.template["IOC-09"]["title"].format(c["host"]),
"host": c["host"], "description": self.template["IOC-09"]["description"].format(c["host"]),
"level": "Moderate", "host": c["host"],
"id": "IOC-09"}) "level": "Moderate",
# Check for fancy TLD. "id": "IOC-09"})
for h in self.bl_tlds: # Check for fancy TLD.
if c["host"].endswith(h[0]): for h in self.bl_tlds:
self.alerts.append({"title": self.template["IOC-10"]["title"].format(c["host"]), if c["host"].endswith(h[0]):
"description": self.template["IOC-10"]["description"].format(c["host"], h[0]), self.alerts.append({"title": self.template["IOC-10"]["title"].format(c["host"]),
"host": c["host"], "description": self.template["IOC-10"]["description"].format(c["host"], h[0]),
"level": "Low", "host": c["host"],
"id": "IOC-10"}) "level": "Low",
"id": "IOC-10"})
def ssl_check(self, dir):
""" def ssl_check(self, dir):
Check on the ssl.log: """
* SSL connections which doesn't use the 443. Check on the ssl.log:
* "Free" certificate issuer (taken from the config). * SSL connections which doesn't use the 443.
* Self-signed certificates. * "Free" certificate issuer (taken from the config).
* Blacklisted domain in the CN * Self-signed certificates.
:return: nothing - all stuff appended to self.alerts * Blacklisted domain in the CN
""" :return: nothing - all stuff appended to self.alerts
ssl_default_ports = get_config(("analysis", "ssl_default_ports")) """
free_issuers = get_config(("analysis", "free_issuers")) ssl_default_ports = get_config(("analysis", "ssl_default_ports"))
free_issuers = get_config(("analysis", "free_issuers"))
if os.path.isfile(os.path.join(dir, "ssl.log")):
for record in ParseZeekLogs(os.path.join(dir, "ssl.log"), output_format="json", safe_headers=False): if os.path.isfile(os.path.join(dir, "ssl.log")):
if record is not None: for record in ParseZeekLogs(os.path.join(dir, "ssl.log"), output_format="json", safe_headers=False):
c = {"host": record['id.resp_h'], if record is not None:
"port": record['id.resp_p'], c = {"host": record['id.resp_h'],
"issuer": record["issuer"] if "issuer" in record else "", "port": record['id.resp_p'],
"validation_status": record["validation_status"], "issuer": record["issuer"] if "issuer" in record else "",
"cn": record["server_name"] if "server_name" in record else ""} "validation_status": record["validation_status"],
if c not in self.ssl: "cn": record["server_name"] if "server_name" in record else ""}
self.ssl.append(c) if c not in self.ssl:
self.ssl.append(c)
if self.heuristics_analysis:
for cert in self.ssl: if self.heuristics_analysis:
host = self.resolve(cert["host"]) for cert in self.ssl:
host = self.resolve(cert["host"])
# If the associated host has not whitelisted, check the cert.
for c in self.conns: # If the associated host has not whitelisted, check the cert.
if host in c["resolution"]: for c in self.conns:
# Check for non generic SSL port. if host in c["resolution"]:
if cert["port"] not in ssl_default_ports: # Check for non generic SSL port.
c["alert_tiggered"] = True if cert["port"] not in ssl_default_ports:
self.alerts.append({"title": self.template["SSL-01"]["title"].format(cert["port"], host), c["alert_tiggered"] = True
"description": self.template["SSL-01"]["description"].format(host), self.alerts.append({"title": self.template["SSL-01"]["title"].format(cert["port"], host),
"host": host, "description": self.template["SSL-01"]["description"].format(host),
"level": "Moderate", "host": host,
"id": "SSL-01"}) "level": "Moderate",
# Check Free SSL certificates. "id": "SSL-01"})
if cert["issuer"] in free_issuers: # Check Free SSL certificates.
c["alert_tiggered"] = True if cert["issuer"] in free_issuers:
self.alerts.append({"title": self.template["SSL-02"]["title"].format(host), c["alert_tiggered"] = True
"description": self.template["SSL-02"]["description"], self.alerts.append({"title": self.template["SSL-02"]["title"].format(host),
"host": host, "description": self.template["SSL-02"]["description"],
"level": "Moderate", "host": host,
"id": "SSL-02"}) "level": "Moderate",
# Check for self-signed certificates. "id": "SSL-02"})
if cert["validation_status"] == "self signed certificate in certificate chain": # Check for self-signed certificates.
c["alert_tiggered"] = True if cert["validation_status"] == "self signed certificate in certificate chain":
self.alerts.append({"title": self.template["SSL-03"]["title"].format(host), c["alert_tiggered"] = True
"description": self.template["SSL-03"]["description"].format(host), self.alerts.append({"title": self.template["SSL-03"]["title"].format(host),
"host": host, "description": self.template["SSL-03"]["description"].format(host),
"level": "Moderate", "host": host,
"id": "SSL-03"}) "level": "Moderate",
"id": "SSL-03"})
if self.iocs_analysis:
for cert in self.ssl: if self.iocs_analysis:
# Check if the domain in the certificate haven't been blacklisted for cert in self.ssl:
# This check can be good if the domain has already been cached by # Check if the domain in the certificate haven't been blacklisted
# the device so it wont appear in self.dns. # This check can be good if the domain has already been cached by
# the device so it wont appear in self.dns.
if any([cert["cn"].endswith(r["domain"]) for r in self.dns]):
continue if any(cert["cn"].endswith(r["domain"]) for r in self.dns):
continue
for domain in self.bl_domains:
if domain[1] != "tracker": for domain in self.bl_domains:
if cert["cn"].endswith(domain[0]): if domain[1] != "tracker":
self.alerts.append({"title": self.template["SSL-04"]["title"].format(domain[0], domain[1].upper()), if cert["cn"].endswith(domain[0]):
"description": self.template["SSL-04"]["description"].format(domain[0]), self.alerts.append({"title": self.template["SSL-04"]["title"].format(domain[0], domain[1].upper()),
"host": domain[0], "description": self.template["SSL-04"]["description"].format(domain[0]),
"level": "High", "host": domain[0],
"id": "SSL-04"}) "level": "High",
"id": "SSL-04"})
def alerts_check(self):
""" def alerts_check(self):
Leverage an advice to the user based on the trigered hosts """
:return: nothing - all generated alerts appended to self.alerts Leverage an advice to the user based on the trigered hosts
""" :return: nothing - all generated alerts appended to self.alerts
hosts = {} """
hosts = {}
for alert in [dict(t) for t in {tuple(d.items()) for d in self.alerts}]:
if alert["host"] not in hosts: for alert in [dict(t) for t in {tuple(d.items()) for d in self.alerts}]:
hosts[alert["host"]] = 1 if alert["host"] not in hosts:
else: hosts[alert["host"]] = 1
hosts[alert["host"]] += 1 else:
hosts[alert["host"]] += 1
for host, nb in hosts.items():
if nb >= get_config(("analysis", "max_alerts")): for host, nb in hosts.items():
self.alerts.append({"title": self.template["ADV-01"]["title"].format(host), if nb >= get_config(("analysis", "max_alerts")):
"description": self.template["ADV-01"]["description"].format(host, nb), self.alerts.append({"title": self.template["ADV-01"]["title"].format(host),
"host": host, "description": self.template["ADV-01"]["description"].format(host, nb),
"level": "Moderate", "host": host,
"id": "ADV-01"}) "level": "Moderate",
"id": "ADV-01"})
def resolve(self, ip_addr):
""" def resolve(self, ip_addr):
A simple method to retreive DNS names from IP addresses """
in order to replace them in alerts. A simple method to retreive DNS names from IP addresses
in order to replace them in alerts.
:return: String - DNS record or IP Address.
""" :return: String - DNS record or IP Address.
for record in self.dns: """
if ip_addr in record["answers"]: for record in self.dns:
return record["domain"] if ip_addr in record["answers"]:
return ip_addr return record["domain"]
return ip_addr
def start_zeek(self):
""" def start_zeek(self):
Start zeek and check the logs. """
""" Start zeek and check the logs.
sp.Popen("cd {} && /opt/zeek/bin/zeek -Cr capture.pcap protocols/ssl/validate-certs".format( """
self.working_dir), shell=True).wait() safe_command.run(sp.Popen, "cd {} && /opt/zeek/bin/zeek -Cr capture.pcap protocols/ssl/validate-certs".format(
sp.Popen("cd {} && mv *.log assets/".format(self.working_dir), self.working_dir), shell=False).wait()
shell=True).wait() safe_command.run(sp.Popen, "cd {} && mv *.log assets/".format(self.working_dir),
self.fill_dns(self.working_dir + "/assets/") shell=False).wait()
self.netflow_check(self.working_dir + "/assets/") self.fill_dns(self.working_dir + "/assets/")
self.ssl_check(self.working_dir + "/assets/") self.netflow_check(self.working_dir + "/assets/")
self.http_check(self.working_dir + "/assets/") self.ssl_check(self.working_dir + "/assets/")
self.files_check(self.working_dir + "/assets/") self.http_check(self.working_dir + "/assets/")
self.alerts_check() self.files_check(self.working_dir + "/assets/")
self.alerts_check()
def retrieve_alerts(self):
""" def retrieve_alerts(self):
Retrieve alerts. """
:return: list - a list of alerts wihout duplicates. Retrieve alerts.
""" :return: list - a list of alerts wihout duplicates.
return [dict(t) for t in {tuple(d.items()) for d in self.alerts}] """
return [dict(t) for t in {tuple(d.items()) for d in self.alerts}]
def retrieve_whitelist(self):
""" def retrieve_whitelist(self):
Retrieve whitelisted elements. """
:return: list - a list of whitelisted elements wihout duplicates. Retrieve whitelisted elements.
""" :return: list - a list of whitelisted elements wihout duplicates.
return [dict(t) for t in {tuple(d.items()) for d in self.whitelist}] """
return [dict(t) for t in {tuple(d.items()) for d in self.whitelist}]
def retrieve_conns(self):
""" def retrieve_conns(self):
Retrieve not whitelisted elements. """
:return: list - a list of non-whitelisted elements wihout duplicates. Retrieve not whitelisted elements.
""" :return: list - a list of non-whitelisted elements wihout duplicates.
return self.conns """
return self.conns

View File

@ -1,17 +1,20 @@
pymisp==2.4.165.1 pymisp==2.4.165.1
sqlalchemy==1.4.48 sqlalchemy==1.4.48
ipwhois==1.2.0 ipwhois==1.2.0
netaddr==0.8.0 netaddr==0.8.0
flask==1.1.2 flask==1.1.2
flask_httpauth==4.8.0 flask_httpauth==4.8.0
pyjwt==1.7.1 pyjwt==1.7.1
psutil==5.8.0 psutil==5.8.0
pydig==0.4.0 pydig==0.4.0
pyudev==0.24.0 pyudev==0.24.0
pyyaml==5.3.1 pyyaml==5.3.1
wifi==0.3.8 wifi==0.3.8
qrcode==7.3.1 qrcode==7.3.1
netifaces==0.11.0 netifaces==0.11.0
weasyprint==59.0 weasyprint==59.0
python-whois==0.8.0 python-whois==0.8.0
six==1.16.0 six==1.16.0
security==1.2.1 \
--hash=sha256:4ca5f8cfc6b836e2192a84bb5a28b72c17f3cd1abbfe3281f917394c6e6c9238
--hash=sha256:0a9dc7b457330e6d0f92bdae3603fecb85394beefad0fd3b5058758a58781ded