10 Commits

Author SHA1 Message Date
13ddb3b0bc Merge pull request #5 from 2lambda123/pixeebot/ablesov/fix_field_name
Hardening suggestions for KasperskyLab-TinyCheck / ablesov/fix_field_name
2024-04-14 11:59:34 -05:00
daca707ad8 Use Generator Expressions Instead of List Comprehensions 2024-04-14 16:16:32 +00:00
216db5d387 Sandbox Process Creation 2024-04-14 16:16:31 +00:00
887e90cd06 Use shell=False in subprocess Function Calls 2024-04-14 16:16:31 +00:00
4e4c6172cd Fixed field mapping @ zeekengine.py
Fixed field mapping @ zeekengine.py: resp_h is destination host, please refer to https://docs.zeek.org/en/current/log-formats.html
2023-08-14 15:14:00 +03:00
9939dc3bf4 Added diagnostics script (#141) 2023-06-26 14:04:10 +03:00
5091308d0b Update install.sh (#140)
Fixed password comparison
2023-06-26 14:00:43 +03:00
0dafbf63a2 Update requirements.txt (#139)
Use only fixed versions for dependencies
2023-06-26 13:59:03 +03:00
bfbb76c55a Update requirements.txt (#136)
Duplicated
2023-06-26 13:44:53 +03:00
e2a040798a Ablesov/fix tinycheck installation (#135)
* Update requirements.txt

Fix:
pymisp - use older version to keep code compatibility
sqlalchemy - use 1.4 latest build (api 2.0 incompatible with codebase)

M2Crypto - removed (not used)
pyOpenSSL - removed (not used)

* Update scheme.sql

pymisp warning fix

* Update main.py

Unused reference to pyOpenSSL removed
2023-05-30 15:20:09 +03:00
6 changed files with 709 additions and 518 deletions

View File

@ -1,478 +1,479 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from classes.parsezeeklogs import ParseZeekLogs
from netaddr import IPNetwork, IPAddress
from utils import get_iocs, get_config, get_whitelist
from datetime import datetime
import subprocess as sp
import json
import pydig
import os
import re
import sys
import whois
class ZeekEngine(object):
def __init__(self, capture_directory):
self.working_dir = capture_directory
self.alerts = []
self.conns = []
self.ssl = []
self.http = []
self.dns = []
self.files = []
self.whitelist = []
# Get analysis and userlang configuration
self.heuristics_analysis = get_config(("analysis", "heuristics"))
self.iocs_analysis = get_config(("analysis", "iocs"))
self.whitelist_analysis = get_config(("analysis", "whitelist"))
self.active_analysis = get_config(("analysis", "active"))
self.userlang = get_config(("frontend", "user_lang"))
# Retreive IOCs.
if self.iocs_analysis:
self.bl_cidrs = [[IPNetwork(cidr[0]), cidr[1]]
for cidr in get_iocs("cidr")]
self.bl_hosts = get_iocs("ip4addr") + get_iocs("ip6addr")
self.bl_domains = get_iocs("domain")
self.bl_freedns = get_iocs("freedns")
self.bl_nameservers = get_iocs("ns")
self.bl_tlds = get_iocs("tld")
# Retreive whitelisted items.
if self.whitelist_analysis:
self.wl_cidrs = [IPNetwork(cidr) for cidr in get_whitelist("cidr")]
self.wl_hosts = get_whitelist("ip4addr") + get_whitelist("ip6addr")
self.wl_domains = get_whitelist("domain")
# Load template language
if not re.match("^[a-z]{2,3}$", self.userlang):
self.userlang = "en"
with open(os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "locales/{}.json".format(self.userlang))) as f:
self.template = json.load(f)["alerts"]
def fill_dns(self, dir):
"""
Fill the DNS resolutions thanks to the dns.log.
:return: nothing - all resolutions appended to self.dns.
"""
if os.path.isfile(os.path.join(dir, "dns.log")):
for record in ParseZeekLogs(os.path.join(dir, "dns.log"), output_format="json", safe_headers=False):
if record is not None:
if record["qtype_name"] in ["A", "AAAA"]:
d = {"domain": record["query"],
"answers": record["answers"].split(",")}
if d not in self.dns:
self.dns.append(d)
def netflow_check(self, dir):
"""
Enrich and check the netflow from the conn.log against whitelist and IOCs.
:return: nothing - all stuff appended to self.alerts
"""
max_ports = get_config(("analysis", "max_ports"))
http_default_port = get_config(("analysis", "http_default_port"))
# Get the netflow from conn.log.
if os.path.isfile(os.path.join(dir, "conn.log")):
for record in ParseZeekLogs(os.path.join(dir, "conn.log"), output_format="json", safe_headers=False):
if record is not None:
c = {"ip_dst": record["id.resp_h"],
"proto": record["proto"],
"port_dst": record["id.resp_p"],
"service": record["service"],
"alert_tiggered": False}
if c not in self.conns:
self.conns.append(c)
# Let's add some dns resolutions.
for c in self.conns:
c["resolution"] = self.resolve(c["ip_dst"])
# Order the conns list by the resolution field.
self.conns = sorted(self.conns, key=lambda c: c["resolution"])
# Check for whitelisted assets, if any, delete the record.
if self.whitelist_analysis:
for i, c in enumerate(self.conns):
if c["ip_dst"] in [ip for ip in self.wl_hosts]:
self.whitelist.append(self.conns[i])
self.conns[i] = False
elif c["resolution"] in self.wl_domains:
self.whitelist.append(self.conns[i])
self.conns[i] = False
elif True in [c["resolution"].endswith("." + dom) for dom in self.wl_domains]:
self.whitelist.append(self.conns[i])
self.conns[i] = False
elif True in [IPAddress(c["ip_dst"]) in cidr for cidr in self.wl_cidrs]:
self.whitelist.append(self.conns[i])
self.conns[i] = False
# Let's delete whitelisted connections.
self.conns = list(filter(lambda c: c != False, self.conns))
if self.heuristics_analysis:
for c in self.conns:
# Check for UDP / ICMP (strange from a smartphone.)
if c["proto"] in ["UDP", "ICMP"]:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["PROTO-01"]["title"].format(c["proto"].upper(), c["resolution"]),
"description": self.template["PROTO-01"]["description"].format(c["proto"].upper(), c["resolution"]),
"host": c["resolution"],
"level": "Moderate",
"id": "PROTO-01"})
# Check for use of ports over 1024.
if c["port_dst"] >= max_ports:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["PROTO-02"]["title"].format(c["proto"].upper(), c["resolution"], max_ports),
"description": self.template["PROTO-02"]["description"].format(c["proto"].upper(), c["resolution"], c["port_dst"]),
"host": c["resolution"],
"level": "Low",
"id": "PROTO-02"})
# Check for use of HTTP.
if c["service"] == "http" and c["port_dst"] == http_default_port:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["PROTO-03"]["title"].format(c["resolution"]),
"description": self.template["PROTO-03"]["description"].format(c["resolution"]),
"host": c["resolution"],
"level": "Low",
"id": "PROTO-03"})
# Check for use of HTTP on a non standard port.
if c["service"] == "http" and c["port_dst"] != http_default_port:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["PROTO-04"]["title"].format(c["resolution"], c["port_dst"]),
"description": self.template["PROTO-04"]["description"].format(c["resolution"], c["port_dst"]),
"host": c["resolution"],
"level": "Moderate",
"id": "PROTO-04"})
# Check for non-resolved IP address.
if c["ip_dst"] == c["resolution"]:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["PROTO-05"]["title"].format(c["ip_dst"]),
"description": self.template["PROTO-05"]["description"].format(c["ip_dst"]),
"host": c["ip_dst"],
"level": "Low",
"id": "PROTO-05"})
if self.iocs_analysis:
for c in self.conns:
# Check for blacklisted IP address.
for host in self.bl_hosts:
if c["ip_dst"] == host[0]:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["IOC-01"]["title"].format(c["resolution"], c["ip_dst"], host[1].upper()),
"description": self.template["IOC-01"]["description"].format(c["ip_dst"]),
"host": c["resolution"],
"level": "High",
"id": "IOC-01"})
break
# Check for blacklisted CIDR.
for cidr in self.bl_cidrs:
if IPAddress(c["ip_dst"]) in cidr[0]:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["IOC-02"]["title"].format(c["resolution"], cidr[0], cidr[1].upper()),
"description": self.template["IOC-02"]["description"].format(c["resolution"]),
"host": c["resolution"],
"level": "Moderate",
"id": "IOC-02"})
# Check for blacklisted domain.
for domain in self.bl_domains:
if c["resolution"].endswith(domain[0]):
if domain[1] != "tracker":
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["IOC-03"]["title"].format(c["resolution"], domain[1].upper()),
"description": self.template["IOC-03"]["description"].format(c["resolution"]),
"host": c["resolution"],
"level": "High",
"id": "IOC-03"})
else:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["IOC-04"]["title"].format(c["resolution"], domain[1].upper()),
"description": self.template["IOC-04"]["description"].format(c["resolution"]),
"host": c["resolution"],
"level": "Moderate",
"id": "IOC-04"})
# Check for blacklisted FreeDNS.
for domain in self.bl_freedns:
if c["resolution"].endswith("." + domain[0]):
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["IOC-05"]["title"].format(c["resolution"]),
"description": self.template["IOC-05"]["description"].format(c["resolution"]),
"host": c["resolution"],
"level": "Moderate",
"id": "IOC-05"})
# Check for suspect tlds.
for tld in self.bl_tlds:
if c["resolution"].endswith(tld[0]):
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["IOC-06"]["title"].format(c["resolution"]),
"description": self.template["IOC-06"]["description"].format(c["resolution"], tld[0]),
"host": c["resolution"],
"level": "Low",
"id": "IOC-06"})
if self.active_analysis:
for c in self.conns:
try: # Domain nameservers check.
name_servers = pydig.query(c["resolution"], "NS")
if len(name_servers):
for ns in self.bl_nameservers:
if name_servers[0].endswith(".{}.".format(ns[0])):
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["ACT-01"]["title"].format(c["resolution"], name_servers[0]),
"description": self.template["ACT-01"]["description"].format(c["resolution"]),
"host": c["resolution"],
"level": "Moderate",
"id": "ACT-01"})
except:
pass
try: # Domain history check.
whois_record = whois.whois(c["resolution"])
creation_date = whois_record.creation_date if type(
whois_record.creation_date) is not list else whois_record.creation_date[0]
creation_days = abs((datetime.now() - creation_date).days)
if creation_days < 365:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["ACT-02"]["title"].format(c["resolution"], creation_days),
"description": self.template["ACT-02"]["description"].format(c["resolution"]),
"host": c["resolution"],
"level": "Moderate",
"id": "ACT-02"})
except:
pass
def files_check(self, dir):
"""
Check on the files.log:
* Check certificates sha1
* [todo] Check possible binary data or APKs?
:return: nothing - all stuff appended to self.alerts
"""
if not self.iocs_analysis:
return
bl_certs = get_iocs("sha1cert")
if os.path.isfile(os.path.join(dir, "files.log")):
for record in ParseZeekLogs(os.path.join(dir, "files.log"), output_format="json", safe_headers=False):
if record is not None:
f = {"filename": record["filename"],
"ip_src": record["id.orig_h"],
"ip_dst": record["id.orig_p"],
"mime_type": record["mime_type"],
"sha1": record["sha1"]}
if f not in self.files:
self.files.append(f)
for f in self.files:
if f["mime_type"] == "application/x-x509-user-cert":
for cert in bl_certs: # Check for blacklisted certificate.
if f["sha1"] == cert[0]:
host = self.resolve(f["ip_src"])
self.alerts.append({"title": self.template["IOC-07"]["title"].format(cert[1].upper(), host),
"description": self.template["IOC-07"]["description"].format(f["sha1"], host),
"host": host,
"level": "High",
"id": "IOC-07"})
def http_check(self, dir):
"""
Check on the http.log:
* Blacklisted domain/tld from the Host header field.
Can be used when no DNS query have been done during the session (already cached by the device.)
:return: nothing - all stuff appended to self.alerts
"""
if os.path.isfile(os.path.join(dir, "http.log")):
for record in ParseZeekLogs(os.path.join(dir, "http.log"), output_format="json", safe_headers=False):
if record is not None:
c = {"host": record['host']}
if c not in self.http:
self.http.append(c)
if self.iocs_analysis:
for c in self.http:
# If we already know the host form DNS, let's continue.
if c["host"] in [r["domain"] for r in self.dns]:
continue
# Check for blacklisted domain.
for h in self.bl_domains:
if h[1] != "tracker":
if c["host"].endswith(h[0]):
self.alerts.append({"title": self.template["IOC-08"]["title"].format(c["host"], h[1].upper()),
"description": self.template["IOC-08"]["description"].format(c["host"]),
"host": c["host"],
"level": "High",
"id": "IOC-08"})
# Check for freedns.
for h in self.bl_freedns:
if c["host"].endswith(h[0]):
self.alerts.append({"title": self.template["IOC-09"]["title"].format(c["host"]),
"description": self.template["IOC-09"]["description"].format(c["host"]),
"host": c["host"],
"level": "Moderate",
"id": "IOC-09"})
# Check for fancy TLD.
for h in self.bl_tlds:
if c["host"].endswith(h[0]):
self.alerts.append({"title": self.template["IOC-10"]["title"].format(c["host"]),
"description": self.template["IOC-10"]["description"].format(c["host"], h[0]),
"host": c["host"],
"level": "Low",
"id": "IOC-10"})
def ssl_check(self, dir):
"""
Check on the ssl.log:
* SSL connections which doesn't use the 443.
* "Free" certificate issuer (taken from the config).
* Self-signed certificates.
* Blacklisted domain in the CN
:return: nothing - all stuff appended to self.alerts
"""
ssl_default_ports = get_config(("analysis", "ssl_default_ports"))
free_issuers = get_config(("analysis", "free_issuers"))
if os.path.isfile(os.path.join(dir, "ssl.log")):
for record in ParseZeekLogs(os.path.join(dir, "ssl.log"), output_format="json", safe_headers=False):
if record is not None:
c = {"host": record['id.resp_h'],
"port": record['id.resp_p'],
"issuer": record["issuer"] if "issuer" in record else "",
"validation_status": record["validation_status"],
"cn": record["server_name"] if "server_name" in record else ""}
if c not in self.ssl:
self.ssl.append(c)
if self.heuristics_analysis:
for cert in self.ssl:
host = self.resolve(cert["host"])
# If the associated host has not whitelisted, check the cert.
for c in self.conns:
if host in c["resolution"]:
# Check for non generic SSL port.
if cert["port"] not in ssl_default_ports:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["SSL-01"]["title"].format(cert["port"], host),
"description": self.template["SSL-01"]["description"].format(host),
"host": host,
"level": "Moderate",
"id": "SSL-01"})
# Check Free SSL certificates.
if cert["issuer"] in free_issuers:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["SSL-02"]["title"].format(host),
"description": self.template["SSL-02"]["description"],
"host": host,
"level": "Moderate",
"id": "SSL-02"})
# Check for self-signed certificates.
if cert["validation_status"] == "self signed certificate in certificate chain":
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["SSL-03"]["title"].format(host),
"description": self.template["SSL-03"]["description"].format(host),
"host": host,
"level": "Moderate",
"id": "SSL-03"})
if self.iocs_analysis:
for cert in self.ssl:
# Check if the domain in the certificate haven't been blacklisted
# This check can be good if the domain has already been cached by
# the device so it wont appear in self.dns.
if any([cert["cn"].endswith(r["domain"]) for r in self.dns]):
continue
for domain in self.bl_domains:
if domain[1] != "tracker":
if cert["cn"].endswith(domain[0]):
self.alerts.append({"title": self.template["SSL-04"]["title"].format(domain[0], domain[1].upper()),
"description": self.template["SSL-04"]["description"].format(domain[0]),
"host": domain[0],
"level": "High",
"id": "SSL-04"})
def alerts_check(self):
"""
Leverage an advice to the user based on the trigered hosts
:return: nothing - all generated alerts appended to self.alerts
"""
hosts = {}
for alert in [dict(t) for t in {tuple(d.items()) for d in self.alerts}]:
if alert["host"] not in hosts:
hosts[alert["host"]] = 1
else:
hosts[alert["host"]] += 1
for host, nb in hosts.items():
if nb >= get_config(("analysis", "max_alerts")):
self.alerts.append({"title": self.template["ADV-01"]["title"].format(host),
"description": self.template["ADV-01"]["description"].format(host, nb),
"host": host,
"level": "Moderate",
"id": "ADV-01"})
def resolve(self, ip_addr):
"""
A simple method to retreive DNS names from IP addresses
in order to replace them in alerts.
:return: String - DNS record or IP Address.
"""
for record in self.dns:
if ip_addr in record["answers"]:
return record["domain"]
return ip_addr
def start_zeek(self):
"""
Start zeek and check the logs.
"""
sp.Popen("cd {} && /opt/zeek/bin/zeek -Cr capture.pcap protocols/ssl/validate-certs".format(
self.working_dir), shell=True).wait()
sp.Popen("cd {} && mv *.log assets/".format(self.working_dir),
shell=True).wait()
self.fill_dns(self.working_dir + "/assets/")
self.netflow_check(self.working_dir + "/assets/")
self.ssl_check(self.working_dir + "/assets/")
self.http_check(self.working_dir + "/assets/")
self.files_check(self.working_dir + "/assets/")
self.alerts_check()
def retrieve_alerts(self):
"""
Retrieve alerts.
:return: list - a list of alerts wihout duplicates.
"""
return [dict(t) for t in {tuple(d.items()) for d in self.alerts}]
def retrieve_whitelist(self):
"""
Retrieve whitelisted elements.
:return: list - a list of whitelisted elements wihout duplicates.
"""
return [dict(t) for t in {tuple(d.items()) for d in self.whitelist}]
def retrieve_conns(self):
"""
Retrieve not whitelisted elements.
:return: list - a list of non-whitelisted elements wihout duplicates.
"""
return self.conns
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from classes.parsezeeklogs import ParseZeekLogs
from netaddr import IPNetwork, IPAddress
from utils import get_iocs, get_config, get_whitelist
from datetime import datetime
import subprocess as sp
import json
import pydig
import os
import re
import sys
import whois
from security import safe_command
class ZeekEngine(object):
def __init__(self, capture_directory):
self.working_dir = capture_directory
self.alerts = []
self.conns = []
self.ssl = []
self.http = []
self.dns = []
self.files = []
self.whitelist = []
# Get analysis and userlang configuration
self.heuristics_analysis = get_config(("analysis", "heuristics"))
self.iocs_analysis = get_config(("analysis", "iocs"))
self.whitelist_analysis = get_config(("analysis", "whitelist"))
self.active_analysis = get_config(("analysis", "active"))
self.userlang = get_config(("frontend", "user_lang"))
# Retreive IOCs.
if self.iocs_analysis:
self.bl_cidrs = [[IPNetwork(cidr[0]), cidr[1]]
for cidr in get_iocs("cidr")]
self.bl_hosts = get_iocs("ip4addr") + get_iocs("ip6addr")
self.bl_domains = get_iocs("domain")
self.bl_freedns = get_iocs("freedns")
self.bl_nameservers = get_iocs("ns")
self.bl_tlds = get_iocs("tld")
# Retreive whitelisted items.
if self.whitelist_analysis:
self.wl_cidrs = [IPNetwork(cidr) for cidr in get_whitelist("cidr")]
self.wl_hosts = get_whitelist("ip4addr") + get_whitelist("ip6addr")
self.wl_domains = get_whitelist("domain")
# Load template language
if not re.match("^[a-z]{2,3}$", self.userlang):
self.userlang = "en"
with open(os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "locales/{}.json".format(self.userlang))) as f:
self.template = json.load(f)["alerts"]
def fill_dns(self, dir):
"""
Fill the DNS resolutions thanks to the dns.log.
:return: nothing - all resolutions appended to self.dns.
"""
if os.path.isfile(os.path.join(dir, "dns.log")):
for record in ParseZeekLogs(os.path.join(dir, "dns.log"), output_format="json", safe_headers=False):
if record is not None:
if record["qtype_name"] in ["A", "AAAA"]:
d = {"domain": record["query"],
"answers": record["answers"].split(",")}
if d not in self.dns:
self.dns.append(d)
def netflow_check(self, dir):
"""
Enrich and check the netflow from the conn.log against whitelist and IOCs.
:return: nothing - all stuff appended to self.alerts
"""
max_ports = get_config(("analysis", "max_ports"))
http_default_port = get_config(("analysis", "http_default_port"))
# Get the netflow from conn.log.
if os.path.isfile(os.path.join(dir, "conn.log")):
for record in ParseZeekLogs(os.path.join(dir, "conn.log"), output_format="json", safe_headers=False):
if record is not None:
c = {"ip_dst": record["id.resp_h"],
"proto": record["proto"],
"port_dst": record["id.resp_p"],
"service": record["service"],
"alert_tiggered": False}
if c not in self.conns:
self.conns.append(c)
# Let's add some dns resolutions.
for c in self.conns:
c["resolution"] = self.resolve(c["ip_dst"])
# Order the conns list by the resolution field.
self.conns = sorted(self.conns, key=lambda c: c["resolution"])
# Check for whitelisted assets, if any, delete the record.
if self.whitelist_analysis:
for i, c in enumerate(self.conns):
if c["ip_dst"] in [ip for ip in self.wl_hosts]:
self.whitelist.append(self.conns[i])
self.conns[i] = False
elif c["resolution"] in self.wl_domains:
self.whitelist.append(self.conns[i])
self.conns[i] = False
elif True in [c["resolution"].endswith("." + dom) for dom in self.wl_domains]:
self.whitelist.append(self.conns[i])
self.conns[i] = False
elif True in [IPAddress(c["ip_dst"]) in cidr for cidr in self.wl_cidrs]:
self.whitelist.append(self.conns[i])
self.conns[i] = False
# Let's delete whitelisted connections.
self.conns = list(filter(lambda c: c != False, self.conns))
if self.heuristics_analysis:
for c in self.conns:
# Check for UDP / ICMP (strange from a smartphone.)
if c["proto"] in ["UDP", "ICMP"]:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["PROTO-01"]["title"].format(c["proto"].upper(), c["resolution"]),
"description": self.template["PROTO-01"]["description"].format(c["proto"].upper(), c["resolution"]),
"host": c["resolution"],
"level": "Moderate",
"id": "PROTO-01"})
# Check for use of ports over 1024.
if c["port_dst"] >= max_ports:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["PROTO-02"]["title"].format(c["proto"].upper(), c["resolution"], max_ports),
"description": self.template["PROTO-02"]["description"].format(c["proto"].upper(), c["resolution"], c["port_dst"]),
"host": c["resolution"],
"level": "Low",
"id": "PROTO-02"})
# Check for use of HTTP.
if c["service"] == "http" and c["port_dst"] == http_default_port:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["PROTO-03"]["title"].format(c["resolution"]),
"description": self.template["PROTO-03"]["description"].format(c["resolution"]),
"host": c["resolution"],
"level": "Low",
"id": "PROTO-03"})
# Check for use of HTTP on a non standard port.
if c["service"] == "http" and c["port_dst"] != http_default_port:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["PROTO-04"]["title"].format(c["resolution"], c["port_dst"]),
"description": self.template["PROTO-04"]["description"].format(c["resolution"], c["port_dst"]),
"host": c["resolution"],
"level": "Moderate",
"id": "PROTO-04"})
# Check for non-resolved IP address.
if c["ip_dst"] == c["resolution"]:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["PROTO-05"]["title"].format(c["ip_dst"]),
"description": self.template["PROTO-05"]["description"].format(c["ip_dst"]),
"host": c["ip_dst"],
"level": "Low",
"id": "PROTO-05"})
if self.iocs_analysis:
for c in self.conns:
# Check for blacklisted IP address.
for host in self.bl_hosts:
if c["ip_dst"] == host[0]:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["IOC-01"]["title"].format(c["resolution"], c["ip_dst"], host[1].upper()),
"description": self.template["IOC-01"]["description"].format(c["ip_dst"]),
"host": c["resolution"],
"level": "High",
"id": "IOC-01"})
break
# Check for blacklisted CIDR.
for cidr in self.bl_cidrs:
if IPAddress(c["ip_dst"]) in cidr[0]:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["IOC-02"]["title"].format(c["resolution"], cidr[0], cidr[1].upper()),
"description": self.template["IOC-02"]["description"].format(c["resolution"]),
"host": c["resolution"],
"level": "Moderate",
"id": "IOC-02"})
# Check for blacklisted domain.
for domain in self.bl_domains:
if c["resolution"].endswith(domain[0]):
if domain[1] != "tracker":
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["IOC-03"]["title"].format(c["resolution"], domain[1].upper()),
"description": self.template["IOC-03"]["description"].format(c["resolution"]),
"host": c["resolution"],
"level": "High",
"id": "IOC-03"})
else:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["IOC-04"]["title"].format(c["resolution"], domain[1].upper()),
"description": self.template["IOC-04"]["description"].format(c["resolution"]),
"host": c["resolution"],
"level": "Moderate",
"id": "IOC-04"})
# Check for blacklisted FreeDNS.
for domain in self.bl_freedns:
if c["resolution"].endswith("." + domain[0]):
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["IOC-05"]["title"].format(c["resolution"]),
"description": self.template["IOC-05"]["description"].format(c["resolution"]),
"host": c["resolution"],
"level": "Moderate",
"id": "IOC-05"})
# Check for suspect tlds.
for tld in self.bl_tlds:
if c["resolution"].endswith(tld[0]):
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["IOC-06"]["title"].format(c["resolution"]),
"description": self.template["IOC-06"]["description"].format(c["resolution"], tld[0]),
"host": c["resolution"],
"level": "Low",
"id": "IOC-06"})
if self.active_analysis:
for c in self.conns:
try: # Domain nameservers check.
name_servers = pydig.query(c["resolution"], "NS")
if len(name_servers):
for ns in self.bl_nameservers:
if name_servers[0].endswith(".{}.".format(ns[0])):
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["ACT-01"]["title"].format(c["resolution"], name_servers[0]),
"description": self.template["ACT-01"]["description"].format(c["resolution"]),
"host": c["resolution"],
"level": "Moderate",
"id": "ACT-01"})
except:
pass
try: # Domain history check.
whois_record = whois.whois(c["resolution"])
creation_date = whois_record.creation_date if type(
whois_record.creation_date) is not list else whois_record.creation_date[0]
creation_days = abs((datetime.now() - creation_date).days)
if creation_days < 365:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["ACT-02"]["title"].format(c["resolution"], creation_days),
"description": self.template["ACT-02"]["description"].format(c["resolution"]),
"host": c["resolution"],
"level": "Moderate",
"id": "ACT-02"})
except:
pass
def files_check(self, dir):
"""
Check on the files.log:
* Check certificates sha1
* [todo] Check possible binary data or APKs?
:return: nothing - all stuff appended to self.alerts
"""
if not self.iocs_analysis:
return
bl_certs = get_iocs("sha1cert")
if os.path.isfile(os.path.join(dir, "files.log")):
for record in ParseZeekLogs(os.path.join(dir, "files.log"), output_format="json", safe_headers=False):
if record is not None:
f = {"filename": record["filename"],
"ip_src": record["id.orig_h"],
"ip_dst": record["id.resp_h"],
"mime_type": record["mime_type"],
"sha1": record["sha1"]}
if f not in self.files:
self.files.append(f)
for f in self.files:
if f["mime_type"] == "application/x-x509-user-cert":
for cert in bl_certs: # Check for blacklisted certificate.
if f["sha1"] == cert[0]:
host = self.resolve(f["ip_src"])
self.alerts.append({"title": self.template["IOC-07"]["title"].format(cert[1].upper(), host),
"description": self.template["IOC-07"]["description"].format(f["sha1"], host),
"host": host,
"level": "High",
"id": "IOC-07"})
def http_check(self, dir):
"""
Check on the http.log:
* Blacklisted domain/tld from the Host header field.
Can be used when no DNS query have been done during the session (already cached by the device.)
:return: nothing - all stuff appended to self.alerts
"""
if os.path.isfile(os.path.join(dir, "http.log")):
for record in ParseZeekLogs(os.path.join(dir, "http.log"), output_format="json", safe_headers=False):
if record is not None:
c = {"host": record['host']}
if c not in self.http:
self.http.append(c)
if self.iocs_analysis:
for c in self.http:
# If we already know the host form DNS, let's continue.
if c["host"] in [r["domain"] for r in self.dns]:
continue
# Check for blacklisted domain.
for h in self.bl_domains:
if h[1] != "tracker":
if c["host"].endswith(h[0]):
self.alerts.append({"title": self.template["IOC-08"]["title"].format(c["host"], h[1].upper()),
"description": self.template["IOC-08"]["description"].format(c["host"]),
"host": c["host"],
"level": "High",
"id": "IOC-08"})
# Check for freedns.
for h in self.bl_freedns:
if c["host"].endswith(h[0]):
self.alerts.append({"title": self.template["IOC-09"]["title"].format(c["host"]),
"description": self.template["IOC-09"]["description"].format(c["host"]),
"host": c["host"],
"level": "Moderate",
"id": "IOC-09"})
# Check for fancy TLD.
for h in self.bl_tlds:
if c["host"].endswith(h[0]):
self.alerts.append({"title": self.template["IOC-10"]["title"].format(c["host"]),
"description": self.template["IOC-10"]["description"].format(c["host"], h[0]),
"host": c["host"],
"level": "Low",
"id": "IOC-10"})
def ssl_check(self, dir):
"""
Check on the ssl.log:
* SSL connections which doesn't use the 443.
* "Free" certificate issuer (taken from the config).
* Self-signed certificates.
* Blacklisted domain in the CN
:return: nothing - all stuff appended to self.alerts
"""
ssl_default_ports = get_config(("analysis", "ssl_default_ports"))
free_issuers = get_config(("analysis", "free_issuers"))
if os.path.isfile(os.path.join(dir, "ssl.log")):
for record in ParseZeekLogs(os.path.join(dir, "ssl.log"), output_format="json", safe_headers=False):
if record is not None:
c = {"host": record['id.resp_h'],
"port": record['id.resp_p'],
"issuer": record["issuer"] if "issuer" in record else "",
"validation_status": record["validation_status"],
"cn": record["server_name"] if "server_name" in record else ""}
if c not in self.ssl:
self.ssl.append(c)
if self.heuristics_analysis:
for cert in self.ssl:
host = self.resolve(cert["host"])
# If the associated host has not whitelisted, check the cert.
for c in self.conns:
if host in c["resolution"]:
# Check for non generic SSL port.
if cert["port"] not in ssl_default_ports:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["SSL-01"]["title"].format(cert["port"], host),
"description": self.template["SSL-01"]["description"].format(host),
"host": host,
"level": "Moderate",
"id": "SSL-01"})
# Check Free SSL certificates.
if cert["issuer"] in free_issuers:
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["SSL-02"]["title"].format(host),
"description": self.template["SSL-02"]["description"],
"host": host,
"level": "Moderate",
"id": "SSL-02"})
# Check for self-signed certificates.
if cert["validation_status"] == "self signed certificate in certificate chain":
c["alert_tiggered"] = True
self.alerts.append({"title": self.template["SSL-03"]["title"].format(host),
"description": self.template["SSL-03"]["description"].format(host),
"host": host,
"level": "Moderate",
"id": "SSL-03"})
if self.iocs_analysis:
for cert in self.ssl:
# Check if the domain in the certificate haven't been blacklisted
# This check can be good if the domain has already been cached by
# the device so it wont appear in self.dns.
if any(cert["cn"].endswith(r["domain"]) for r in self.dns):
continue
for domain in self.bl_domains:
if domain[1] != "tracker":
if cert["cn"].endswith(domain[0]):
self.alerts.append({"title": self.template["SSL-04"]["title"].format(domain[0], domain[1].upper()),
"description": self.template["SSL-04"]["description"].format(domain[0]),
"host": domain[0],
"level": "High",
"id": "SSL-04"})
def alerts_check(self):
"""
Leverage an advice to the user based on the trigered hosts
:return: nothing - all generated alerts appended to self.alerts
"""
hosts = {}
for alert in [dict(t) for t in {tuple(d.items()) for d in self.alerts}]:
if alert["host"] not in hosts:
hosts[alert["host"]] = 1
else:
hosts[alert["host"]] += 1
for host, nb in hosts.items():
if nb >= get_config(("analysis", "max_alerts")):
self.alerts.append({"title": self.template["ADV-01"]["title"].format(host),
"description": self.template["ADV-01"]["description"].format(host, nb),
"host": host,
"level": "Moderate",
"id": "ADV-01"})
def resolve(self, ip_addr):
"""
A simple method to retreive DNS names from IP addresses
in order to replace them in alerts.
:return: String - DNS record or IP Address.
"""
for record in self.dns:
if ip_addr in record["answers"]:
return record["domain"]
return ip_addr
def start_zeek(self):
"""
Start zeek and check the logs.
"""
safe_command.run(sp.Popen, "cd {} && /opt/zeek/bin/zeek -Cr capture.pcap protocols/ssl/validate-certs".format(
self.working_dir), shell=False).wait()
safe_command.run(sp.Popen, "cd {} && mv *.log assets/".format(self.working_dir),
shell=False).wait()
self.fill_dns(self.working_dir + "/assets/")
self.netflow_check(self.working_dir + "/assets/")
self.ssl_check(self.working_dir + "/assets/")
self.http_check(self.working_dir + "/assets/")
self.files_check(self.working_dir + "/assets/")
self.alerts_check()
def retrieve_alerts(self):
"""
Retrieve alerts.
:return: list - a list of alerts wihout duplicates.
"""
return [dict(t) for t in {tuple(d.items()) for d in self.alerts}]
def retrieve_whitelist(self):
"""
Retrieve whitelisted elements.
:return: list - a list of whitelisted elements wihout duplicates.
"""
return [dict(t) for t in {tuple(d.items()) for d in self.whitelist}]
def retrieve_conns(self):
"""
Retrieve not whitelisted elements.
:return: list - a list of non-whitelisted elements wihout duplicates.
"""
return self.conns

View File

@ -1,19 +1,20 @@
ipwhois
M2Crypto
pyOpenSSL
pydig
pymisp
netaddr
pyyaml
flask
flask_httpauth
pyjwt
sqlalchemy
psutil
pyudev
wifi
qrcode
netifaces
weasyprint
python-whois
six
pymisp==2.4.165.1
sqlalchemy==1.4.48
ipwhois==1.2.0
netaddr==0.8.0
flask==1.1.2
flask_httpauth==4.8.0
pyjwt==1.7.1
psutil==5.8.0
pydig==0.4.0
pyudev==0.24.0
pyyaml==5.3.1
wifi==0.3.8
qrcode==7.3.1
netifaces==0.11.0
weasyprint==59.0
python-whois==0.8.0
six==1.16.0
security==1.2.1 \
--hash=sha256:4ca5f8cfc6b836e2192a84bb5a28b72c17f3cd1abbfe3281f917394c6e6c9238
--hash=sha256:0a9dc7b457330e6d0f92bdae3603fecb85394beefad0fd3b5058758a58781ded

View File

@ -1,30 +1,30 @@
CREATE TABLE "iocs" (
"id" INTEGER UNIQUE,
"value" TEXT NOT NULL,
"type" TEXT NOT NULL,
"tlp" TEXT NOT NULL,
"tag" TEXT NOT NULL,
"source" TEXT NOT NULL,
"added_on" NUMERIC NOT NULL,
"value" TEXT NOT NULL,
"type" TEXT NOT NULL,
"tlp" TEXT NOT NULL,
"tag" TEXT NOT NULL,
"source" TEXT NOT NULL,
"added_on" TEXT NOT NULL,
PRIMARY KEY("id" AUTOINCREMENT)
);
CREATE TABLE "whitelist" (
"id" INTEGER UNIQUE,
"element" TEXT NOT NULL UNIQUE,
"type" TEXT NOT NULL,
"source" TEXT NOT NULL,
"added_on" INTEGER NOT NULL,
"element" TEXT NOT NULL UNIQUE,
"type" TEXT NOT NULL,
"source" TEXT NOT NULL,
"added_on" TEXT NOT NULL,
PRIMARY KEY("id" AUTOINCREMENT)
);
CREATE TABLE "misp" (
"id" INTEGER UNIQUE,
"name" TEXT,
"url" TEXT NOT NULL,
"apikey" TEXT NOT NULL,
"id" INTEGER UNIQUE,
"name" TEXT,
"url" TEXT NOT NULL,
"apikey" TEXT NOT NULL,
"verifycert" INTEGER NOT NULL DEFAULT 0,
"added_on" NUMERIC NOT NULL,
"last_sync" NUMERIC NOT NULL DEFAULT 0,
"added_on" TEXT NOT NULL,
"last_sync" TEXT NOT NULL DEFAULT 0,
PRIMARY KEY("id" AUTOINCREMENT)
);

View File

@ -69,7 +69,7 @@ set_credentials() {
read -s password2
echo ""
if [ $password1 = $password2 ]; then
if [ "$password1" == "$password2" ]; then
password=$(echo -n "$password1" | sha256sum | cut -d" " -f1)
sed -i "s/userlogin/$login/g" /usr/share/tinycheck/config.yaml
sed -i "s/userpassword/$password/g" /usr/share/tinycheck/config.yaml

View File

@ -0,0 +1,191 @@
#!/usr/bin/python
import os
import subprocess
import platform
import socket
import pkg_resources
import psutil
__author__ = 'Eugeny N Ablesov'
__version__ = '1.0.17'
def collect_accounts_info():
""" This call collects generic information about
user accounts presented on system running TinyCheck.
No personal information collected or provided by this call.
"""
accs = { }
users = psutil.users()
for user in users:
accs[user.name + '@' + user.host] = {
'started': user.started,
'term': user.terminal
}
alt_user = os.getenv('SUDO_USER', os.getenv('USER'))
usr = 'root' if os.path.expanduser('~') == '/root' else alt_user
pid = psutil.Process().pid
term = psutil.Process().terminal() if 'Linux' in platform.system() else 'win'
accs[usr + '@' + term] = { 'pid': pid }
return accs
def collect_os_info():
""" This call collects generic information about
operating system running TinyCheck.
No personal information collected or provided by this call.
"""
os_info = { }
os_info['system'] = platform.system()
os_info['release'] = platform.release()
os_info['version'] = platform.version()
os_info['platform'] = platform.platform(aliased=True)
if 'Windows' in os_info['system']:
os_info['dist'] = platform.win32_ver()
if 'Linux' in os_info['system']:
os_info['dist'] = platform.libc_ver()
return os_info
def collect_hardware_info():
""" This call collects information about hardware running TinyCheck.
No personal information collected or provided by this call.
"""
hw_info = { }
hw_info['arch'] = platform.architecture()
hw_info['machine'] = platform.machine()
hw_info['cpus'] = psutil.cpu_count(logical=False)
hw_info['cores'] = psutil.cpu_count()
hw_info['load'] = psutil.getloadavg()
disk_info = psutil.disk_usage('/')
hw_info['disk'] = {
'total': disk_info.total,
'used': disk_info.used,
'free': disk_info.free
}
return hw_info
def collect_network_info():
""" This call collects information about
network configuration and state running TinyCheck.
No personal information collected or provided by this call.
"""
net_info = { }
net_info['namei'] = socket.if_nameindex()
addrs = psutil.net_if_addrs()
state = psutil.net_io_counters(pernic=True)
for interface in addrs.keys():
net_info[interface] = { }
int_info = state[interface]
props = [p for p in dir(int_info)
if not p.startswith("_")
and not p == "index"
and not p == "count"]
for prop in props:
net_info[interface][prop] = getattr(int_info, prop)
return net_info
def collect_dependency_info(package_list):
""" This call collects information about
python packages required to run TinyCheck.
No personal information collected or provided by this call.
"""
dependencies = { }
installed_packages = list(pkg_resources.working_set)
installed_packages_list = sorted(["%s==%s"
% (installed.key, installed.version)
for installed in installed_packages])
for pkg in installed_packages_list:
[package_name, package_version] = pkg.split('==')
if package_name in package_list:
dependencies[package_name] = package_version
return dependencies
def collect_db_tables_records_count(db_path, tables):
result = { }
for table in tables:
query = 'SELECT COUNT(*) FROM %s' % (table)
sqlite_call = subprocess.Popen(['sqlite3', db_path, query], stdout = subprocess.PIPE)
stout, sterr = sqlite_call.communicate()
val = stout.decode("utf-8")
recs = int(val) if val else 0
result[table] = recs
return result
def collect_internal_state(db_path, tables, to_check):
""" This call collects information about
installed TinyCheck instance and its internal state.
No personal information collected or provided by this call.
"""
state_ = { }
available = os.path.isfile(db_path)
dbsize = 0
state_['db'] = {
'available': available,
'size': dbsize
}
state_['db']['records'] = { }
if available:
state_['db']['size'] = os.stat(db_path).st_size
state_['db']['records'] = collect_db_tables_records_count(db_path, tables)
services_ = { }
for alias in to_check:
status = subprocess.call(['systemctl', 'is-active', '--quiet', '%s' % (to_check[alias])])
state = ''
if status != 0:
sysctl_call = subprocess.Popen(
["systemctl", "status", "%s" % (to_check[alias]),
r"|",
"grep",
r"''"],
stdout = subprocess.PIPE,
stderr = subprocess.PIPE)
stout, sterr = sysctl_call.communicate()
state = stout.decode("utf-8")
errs = sterr.decode("utf-8")
if "could not be found" in errs:
state = 'Service not found'
services_[alias] = {
'running': status == 0,
'status': status,
'state': state
}
state_['svc'] = services_
return state_
def main():
print("TinyCheck diagnostics script.\nVersion: %s" % (__version__))
print("")
db_path = '/usr/share/tinycheck/tinycheck.sqlite3'
tables = ['iocs', 'whitelist', 'misp']
services = { }
services['frontend'] = 'tinycheck-frontend.service'
services['backend'] = 'tinycheck-backend.service'
services['kiosk'] = 'tinycheck-kiosk.service'
services['watchers'] = 'tinycheck-watchers.service'
deps = [
'pymisp', 'sqlalchemy', 'ipwhois',
'netaddr', 'flask', 'flask_httpauth',
'pyjwt', 'psutil', 'pydig', 'pyudev',
'pyyaml', 'wifi', 'qrcode', 'netifaces',
'weasyprint', 'python-whois', 'six' ]
diagnostics = { }
diagnostics['acc'] = collect_accounts_info()
diagnostics['os'] = collect_os_info()
diagnostics['hw'] = collect_hardware_info()
diagnostics['net'] = collect_network_info()
diagnostics['deps'] = collect_dependency_info(deps)
diagnostics['state'] = collect_internal_state(db_path, tables, services)
report = { 'diagnostics': diagnostics }
print(report)
print("")
if __name__ == '__main__':
main()

View File

@ -10,7 +10,6 @@ from app.blueprints.misp import misp_bp
import datetime
import secrets
import jwt
from OpenSSL import SSL
from app.utils import read_config
from sys import path
@ -64,7 +63,6 @@ if __name__ == '__main__':
ssl_key = "{}/{}".format(path[0], 'key.pem')
if read_config(("backend", "remote_access")):
app.run(host="0.0.0.0", port=443,
ssl_context=(ssl_cert, ssl_key))
app.run(host="0.0.0.0", port=443, ssl_context=(ssl_cert, ssl_key))
else:
app.run(port=443, ssl_context=(ssl_cert, ssl_key))
app.run(port=443)