Compare commits
6 Commits
ablesov/ad
...
ablesov/fi
Author | SHA1 | Date | |
---|---|---|---|
13ddb3b0bc | |||
daca707ad8 | |||
216db5d387 | |||
887e90cd06 | |||
4e4c6172cd | |||
9939dc3bf4 |
@ -1,478 +1,479 @@
|
||||
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from classes.parsezeeklogs import ParseZeekLogs
|
||||
from netaddr import IPNetwork, IPAddress
|
||||
from utils import get_iocs, get_config, get_whitelist
|
||||
from datetime import datetime
|
||||
import subprocess as sp
|
||||
import json
|
||||
import pydig
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import whois
|
||||
|
||||
|
||||
class ZeekEngine(object):
|
||||
|
||||
def __init__(self, capture_directory):
|
||||
self.working_dir = capture_directory
|
||||
self.alerts = []
|
||||
self.conns = []
|
||||
self.ssl = []
|
||||
self.http = []
|
||||
self.dns = []
|
||||
self.files = []
|
||||
self.whitelist = []
|
||||
|
||||
# Get analysis and userlang configuration
|
||||
self.heuristics_analysis = get_config(("analysis", "heuristics"))
|
||||
self.iocs_analysis = get_config(("analysis", "iocs"))
|
||||
self.whitelist_analysis = get_config(("analysis", "whitelist"))
|
||||
self.active_analysis = get_config(("analysis", "active"))
|
||||
self.userlang = get_config(("frontend", "user_lang"))
|
||||
|
||||
# Retreive IOCs.
|
||||
if self.iocs_analysis:
|
||||
self.bl_cidrs = [[IPNetwork(cidr[0]), cidr[1]]
|
||||
for cidr in get_iocs("cidr")]
|
||||
self.bl_hosts = get_iocs("ip4addr") + get_iocs("ip6addr")
|
||||
self.bl_domains = get_iocs("domain")
|
||||
self.bl_freedns = get_iocs("freedns")
|
||||
self.bl_nameservers = get_iocs("ns")
|
||||
self.bl_tlds = get_iocs("tld")
|
||||
|
||||
# Retreive whitelisted items.
|
||||
if self.whitelist_analysis:
|
||||
self.wl_cidrs = [IPNetwork(cidr) for cidr in get_whitelist("cidr")]
|
||||
self.wl_hosts = get_whitelist("ip4addr") + get_whitelist("ip6addr")
|
||||
self.wl_domains = get_whitelist("domain")
|
||||
|
||||
# Load template language
|
||||
if not re.match("^[a-z]{2,3}$", self.userlang):
|
||||
self.userlang = "en"
|
||||
with open(os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "locales/{}.json".format(self.userlang))) as f:
|
||||
self.template = json.load(f)["alerts"]
|
||||
|
||||
def fill_dns(self, dir):
|
||||
"""
|
||||
Fill the DNS resolutions thanks to the dns.log.
|
||||
:return: nothing - all resolutions appended to self.dns.
|
||||
"""
|
||||
if os.path.isfile(os.path.join(dir, "dns.log")):
|
||||
for record in ParseZeekLogs(os.path.join(dir, "dns.log"), output_format="json", safe_headers=False):
|
||||
if record is not None:
|
||||
if record["qtype_name"] in ["A", "AAAA"]:
|
||||
d = {"domain": record["query"],
|
||||
"answers": record["answers"].split(",")}
|
||||
if d not in self.dns:
|
||||
self.dns.append(d)
|
||||
|
||||
def netflow_check(self, dir):
|
||||
"""
|
||||
Enrich and check the netflow from the conn.log against whitelist and IOCs.
|
||||
:return: nothing - all stuff appended to self.alerts
|
||||
"""
|
||||
max_ports = get_config(("analysis", "max_ports"))
|
||||
http_default_port = get_config(("analysis", "http_default_port"))
|
||||
|
||||
# Get the netflow from conn.log.
|
||||
if os.path.isfile(os.path.join(dir, "conn.log")):
|
||||
for record in ParseZeekLogs(os.path.join(dir, "conn.log"), output_format="json", safe_headers=False):
|
||||
if record is not None:
|
||||
c = {"ip_dst": record["id.resp_h"],
|
||||
"proto": record["proto"],
|
||||
"port_dst": record["id.resp_p"],
|
||||
"service": record["service"],
|
||||
"alert_tiggered": False}
|
||||
if c not in self.conns:
|
||||
self.conns.append(c)
|
||||
|
||||
# Let's add some dns resolutions.
|
||||
for c in self.conns:
|
||||
c["resolution"] = self.resolve(c["ip_dst"])
|
||||
|
||||
# Order the conns list by the resolution field.
|
||||
self.conns = sorted(self.conns, key=lambda c: c["resolution"])
|
||||
|
||||
# Check for whitelisted assets, if any, delete the record.
|
||||
if self.whitelist_analysis:
|
||||
|
||||
for i, c in enumerate(self.conns):
|
||||
if c["ip_dst"] in [ip for ip in self.wl_hosts]:
|
||||
self.whitelist.append(self.conns[i])
|
||||
self.conns[i] = False
|
||||
elif c["resolution"] in self.wl_domains:
|
||||
self.whitelist.append(self.conns[i])
|
||||
self.conns[i] = False
|
||||
elif True in [c["resolution"].endswith("." + dom) for dom in self.wl_domains]:
|
||||
self.whitelist.append(self.conns[i])
|
||||
self.conns[i] = False
|
||||
elif True in [IPAddress(c["ip_dst"]) in cidr for cidr in self.wl_cidrs]:
|
||||
self.whitelist.append(self.conns[i])
|
||||
self.conns[i] = False
|
||||
|
||||
# Let's delete whitelisted connections.
|
||||
self.conns = list(filter(lambda c: c != False, self.conns))
|
||||
|
||||
if self.heuristics_analysis:
|
||||
for c in self.conns:
|
||||
# Check for UDP / ICMP (strange from a smartphone.)
|
||||
if c["proto"] in ["UDP", "ICMP"]:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["PROTO-01"]["title"].format(c["proto"].upper(), c["resolution"]),
|
||||
"description": self.template["PROTO-01"]["description"].format(c["proto"].upper(), c["resolution"]),
|
||||
"host": c["resolution"],
|
||||
"level": "Moderate",
|
||||
"id": "PROTO-01"})
|
||||
# Check for use of ports over 1024.
|
||||
if c["port_dst"] >= max_ports:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["PROTO-02"]["title"].format(c["proto"].upper(), c["resolution"], max_ports),
|
||||
"description": self.template["PROTO-02"]["description"].format(c["proto"].upper(), c["resolution"], c["port_dst"]),
|
||||
"host": c["resolution"],
|
||||
"level": "Low",
|
||||
"id": "PROTO-02"})
|
||||
# Check for use of HTTP.
|
||||
if c["service"] == "http" and c["port_dst"] == http_default_port:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["PROTO-03"]["title"].format(c["resolution"]),
|
||||
"description": self.template["PROTO-03"]["description"].format(c["resolution"]),
|
||||
"host": c["resolution"],
|
||||
"level": "Low",
|
||||
"id": "PROTO-03"})
|
||||
|
||||
# Check for use of HTTP on a non standard port.
|
||||
if c["service"] == "http" and c["port_dst"] != http_default_port:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["PROTO-04"]["title"].format(c["resolution"], c["port_dst"]),
|
||||
"description": self.template["PROTO-04"]["description"].format(c["resolution"], c["port_dst"]),
|
||||
"host": c["resolution"],
|
||||
"level": "Moderate",
|
||||
"id": "PROTO-04"})
|
||||
# Check for non-resolved IP address.
|
||||
if c["ip_dst"] == c["resolution"]:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["PROTO-05"]["title"].format(c["ip_dst"]),
|
||||
"description": self.template["PROTO-05"]["description"].format(c["ip_dst"]),
|
||||
"host": c["ip_dst"],
|
||||
"level": "Low",
|
||||
"id": "PROTO-05"})
|
||||
|
||||
if self.iocs_analysis:
|
||||
|
||||
for c in self.conns:
|
||||
# Check for blacklisted IP address.
|
||||
for host in self.bl_hosts:
|
||||
if c["ip_dst"] == host[0]:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["IOC-01"]["title"].format(c["resolution"], c["ip_dst"], host[1].upper()),
|
||||
"description": self.template["IOC-01"]["description"].format(c["ip_dst"]),
|
||||
"host": c["resolution"],
|
||||
"level": "High",
|
||||
"id": "IOC-01"})
|
||||
break
|
||||
# Check for blacklisted CIDR.
|
||||
for cidr in self.bl_cidrs:
|
||||
if IPAddress(c["ip_dst"]) in cidr[0]:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["IOC-02"]["title"].format(c["resolution"], cidr[0], cidr[1].upper()),
|
||||
"description": self.template["IOC-02"]["description"].format(c["resolution"]),
|
||||
"host": c["resolution"],
|
||||
"level": "Moderate",
|
||||
"id": "IOC-02"})
|
||||
# Check for blacklisted domain.
|
||||
for domain in self.bl_domains:
|
||||
if c["resolution"].endswith(domain[0]):
|
||||
if domain[1] != "tracker":
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["IOC-03"]["title"].format(c["resolution"], domain[1].upper()),
|
||||
"description": self.template["IOC-03"]["description"].format(c["resolution"]),
|
||||
"host": c["resolution"],
|
||||
"level": "High",
|
||||
"id": "IOC-03"})
|
||||
else:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["IOC-04"]["title"].format(c["resolution"], domain[1].upper()),
|
||||
"description": self.template["IOC-04"]["description"].format(c["resolution"]),
|
||||
"host": c["resolution"],
|
||||
"level": "Moderate",
|
||||
"id": "IOC-04"})
|
||||
# Check for blacklisted FreeDNS.
|
||||
for domain in self.bl_freedns:
|
||||
if c["resolution"].endswith("." + domain[0]):
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["IOC-05"]["title"].format(c["resolution"]),
|
||||
"description": self.template["IOC-05"]["description"].format(c["resolution"]),
|
||||
"host": c["resolution"],
|
||||
"level": "Moderate",
|
||||
"id": "IOC-05"})
|
||||
|
||||
# Check for suspect tlds.
|
||||
for tld in self.bl_tlds:
|
||||
if c["resolution"].endswith(tld[0]):
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["IOC-06"]["title"].format(c["resolution"]),
|
||||
"description": self.template["IOC-06"]["description"].format(c["resolution"], tld[0]),
|
||||
"host": c["resolution"],
|
||||
"level": "Low",
|
||||
"id": "IOC-06"})
|
||||
if self.active_analysis:
|
||||
for c in self.conns:
|
||||
try: # Domain nameservers check.
|
||||
name_servers = pydig.query(c["resolution"], "NS")
|
||||
if len(name_servers):
|
||||
for ns in self.bl_nameservers:
|
||||
if name_servers[0].endswith(".{}.".format(ns[0])):
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["ACT-01"]["title"].format(c["resolution"], name_servers[0]),
|
||||
"description": self.template["ACT-01"]["description"].format(c["resolution"]),
|
||||
"host": c["resolution"],
|
||||
"level": "Moderate",
|
||||
"id": "ACT-01"})
|
||||
except:
|
||||
pass
|
||||
|
||||
try: # Domain history check.
|
||||
|
||||
whois_record = whois.whois(c["resolution"])
|
||||
creation_date = whois_record.creation_date if type(
|
||||
whois_record.creation_date) is not list else whois_record.creation_date[0]
|
||||
creation_days = abs((datetime.now() - creation_date).days)
|
||||
if creation_days < 365:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["ACT-02"]["title"].format(c["resolution"], creation_days),
|
||||
"description": self.template["ACT-02"]["description"].format(c["resolution"]),
|
||||
"host": c["resolution"],
|
||||
"level": "Moderate",
|
||||
"id": "ACT-02"})
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
def files_check(self, dir):
|
||||
"""
|
||||
Check on the files.log:
|
||||
* Check certificates sha1
|
||||
* [todo] Check possible binary data or APKs?
|
||||
:return: nothing - all stuff appended to self.alerts
|
||||
"""
|
||||
|
||||
if not self.iocs_analysis:
|
||||
return
|
||||
|
||||
bl_certs = get_iocs("sha1cert")
|
||||
|
||||
if os.path.isfile(os.path.join(dir, "files.log")):
|
||||
for record in ParseZeekLogs(os.path.join(dir, "files.log"), output_format="json", safe_headers=False):
|
||||
if record is not None:
|
||||
f = {"filename": record["filename"],
|
||||
"ip_src": record["id.orig_h"],
|
||||
"ip_dst": record["id.orig_p"],
|
||||
"mime_type": record["mime_type"],
|
||||
"sha1": record["sha1"]}
|
||||
if f not in self.files:
|
||||
self.files.append(f)
|
||||
|
||||
for f in self.files:
|
||||
if f["mime_type"] == "application/x-x509-user-cert":
|
||||
for cert in bl_certs: # Check for blacklisted certificate.
|
||||
if f["sha1"] == cert[0]:
|
||||
host = self.resolve(f["ip_src"])
|
||||
self.alerts.append({"title": self.template["IOC-07"]["title"].format(cert[1].upper(), host),
|
||||
"description": self.template["IOC-07"]["description"].format(f["sha1"], host),
|
||||
"host": host,
|
||||
"level": "High",
|
||||
"id": "IOC-07"})
|
||||
|
||||
def http_check(self, dir):
|
||||
"""
|
||||
Check on the http.log:
|
||||
* Blacklisted domain/tld from the Host header field.
|
||||
Can be used when no DNS query have been done during the session (already cached by the device.)
|
||||
:return: nothing - all stuff appended to self.alerts
|
||||
"""
|
||||
|
||||
if os.path.isfile(os.path.join(dir, "http.log")):
|
||||
for record in ParseZeekLogs(os.path.join(dir, "http.log"), output_format="json", safe_headers=False):
|
||||
if record is not None:
|
||||
c = {"host": record['host']}
|
||||
if c not in self.http:
|
||||
self.http.append(c)
|
||||
|
||||
if self.iocs_analysis:
|
||||
for c in self.http:
|
||||
|
||||
# If we already know the host form DNS, let's continue.
|
||||
if c["host"] in [r["domain"] for r in self.dns]:
|
||||
continue
|
||||
|
||||
# Check for blacklisted domain.
|
||||
for h in self.bl_domains:
|
||||
if h[1] != "tracker":
|
||||
if c["host"].endswith(h[0]):
|
||||
self.alerts.append({"title": self.template["IOC-08"]["title"].format(c["host"], h[1].upper()),
|
||||
"description": self.template["IOC-08"]["description"].format(c["host"]),
|
||||
"host": c["host"],
|
||||
"level": "High",
|
||||
"id": "IOC-08"})
|
||||
# Check for freedns.
|
||||
for h in self.bl_freedns:
|
||||
if c["host"].endswith(h[0]):
|
||||
self.alerts.append({"title": self.template["IOC-09"]["title"].format(c["host"]),
|
||||
"description": self.template["IOC-09"]["description"].format(c["host"]),
|
||||
"host": c["host"],
|
||||
"level": "Moderate",
|
||||
"id": "IOC-09"})
|
||||
# Check for fancy TLD.
|
||||
for h in self.bl_tlds:
|
||||
if c["host"].endswith(h[0]):
|
||||
self.alerts.append({"title": self.template["IOC-10"]["title"].format(c["host"]),
|
||||
"description": self.template["IOC-10"]["description"].format(c["host"], h[0]),
|
||||
"host": c["host"],
|
||||
"level": "Low",
|
||||
"id": "IOC-10"})
|
||||
|
||||
def ssl_check(self, dir):
|
||||
"""
|
||||
Check on the ssl.log:
|
||||
* SSL connections which doesn't use the 443.
|
||||
* "Free" certificate issuer (taken from the config).
|
||||
* Self-signed certificates.
|
||||
* Blacklisted domain in the CN
|
||||
:return: nothing - all stuff appended to self.alerts
|
||||
"""
|
||||
ssl_default_ports = get_config(("analysis", "ssl_default_ports"))
|
||||
free_issuers = get_config(("analysis", "free_issuers"))
|
||||
|
||||
if os.path.isfile(os.path.join(dir, "ssl.log")):
|
||||
for record in ParseZeekLogs(os.path.join(dir, "ssl.log"), output_format="json", safe_headers=False):
|
||||
if record is not None:
|
||||
c = {"host": record['id.resp_h'],
|
||||
"port": record['id.resp_p'],
|
||||
"issuer": record["issuer"] if "issuer" in record else "",
|
||||
"validation_status": record["validation_status"],
|
||||
"cn": record["server_name"] if "server_name" in record else ""}
|
||||
if c not in self.ssl:
|
||||
self.ssl.append(c)
|
||||
|
||||
if self.heuristics_analysis:
|
||||
for cert in self.ssl:
|
||||
host = self.resolve(cert["host"])
|
||||
|
||||
# If the associated host has not whitelisted, check the cert.
|
||||
for c in self.conns:
|
||||
if host in c["resolution"]:
|
||||
# Check for non generic SSL port.
|
||||
if cert["port"] not in ssl_default_ports:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["SSL-01"]["title"].format(cert["port"], host),
|
||||
"description": self.template["SSL-01"]["description"].format(host),
|
||||
"host": host,
|
||||
"level": "Moderate",
|
||||
"id": "SSL-01"})
|
||||
# Check Free SSL certificates.
|
||||
if cert["issuer"] in free_issuers:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["SSL-02"]["title"].format(host),
|
||||
"description": self.template["SSL-02"]["description"],
|
||||
"host": host,
|
||||
"level": "Moderate",
|
||||
"id": "SSL-02"})
|
||||
# Check for self-signed certificates.
|
||||
if cert["validation_status"] == "self signed certificate in certificate chain":
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["SSL-03"]["title"].format(host),
|
||||
"description": self.template["SSL-03"]["description"].format(host),
|
||||
"host": host,
|
||||
"level": "Moderate",
|
||||
"id": "SSL-03"})
|
||||
|
||||
if self.iocs_analysis:
|
||||
for cert in self.ssl:
|
||||
# Check if the domain in the certificate haven't been blacklisted
|
||||
# This check can be good if the domain has already been cached by
|
||||
# the device so it wont appear in self.dns.
|
||||
|
||||
if any([cert["cn"].endswith(r["domain"]) for r in self.dns]):
|
||||
continue
|
||||
|
||||
for domain in self.bl_domains:
|
||||
if domain[1] != "tracker":
|
||||
if cert["cn"].endswith(domain[0]):
|
||||
self.alerts.append({"title": self.template["SSL-04"]["title"].format(domain[0], domain[1].upper()),
|
||||
"description": self.template["SSL-04"]["description"].format(domain[0]),
|
||||
"host": domain[0],
|
||||
"level": "High",
|
||||
"id": "SSL-04"})
|
||||
|
||||
def alerts_check(self):
|
||||
"""
|
||||
Leverage an advice to the user based on the trigered hosts
|
||||
:return: nothing - all generated alerts appended to self.alerts
|
||||
"""
|
||||
hosts = {}
|
||||
|
||||
for alert in [dict(t) for t in {tuple(d.items()) for d in self.alerts}]:
|
||||
if alert["host"] not in hosts:
|
||||
hosts[alert["host"]] = 1
|
||||
else:
|
||||
hosts[alert["host"]] += 1
|
||||
|
||||
for host, nb in hosts.items():
|
||||
if nb >= get_config(("analysis", "max_alerts")):
|
||||
self.alerts.append({"title": self.template["ADV-01"]["title"].format(host),
|
||||
"description": self.template["ADV-01"]["description"].format(host, nb),
|
||||
"host": host,
|
||||
"level": "Moderate",
|
||||
"id": "ADV-01"})
|
||||
|
||||
def resolve(self, ip_addr):
|
||||
"""
|
||||
A simple method to retreive DNS names from IP addresses
|
||||
in order to replace them in alerts.
|
||||
|
||||
:return: String - DNS record or IP Address.
|
||||
"""
|
||||
for record in self.dns:
|
||||
if ip_addr in record["answers"]:
|
||||
return record["domain"]
|
||||
return ip_addr
|
||||
|
||||
def start_zeek(self):
|
||||
"""
|
||||
Start zeek and check the logs.
|
||||
"""
|
||||
sp.Popen("cd {} && /opt/zeek/bin/zeek -Cr capture.pcap protocols/ssl/validate-certs".format(
|
||||
self.working_dir), shell=True).wait()
|
||||
sp.Popen("cd {} && mv *.log assets/".format(self.working_dir),
|
||||
shell=True).wait()
|
||||
self.fill_dns(self.working_dir + "/assets/")
|
||||
self.netflow_check(self.working_dir + "/assets/")
|
||||
self.ssl_check(self.working_dir + "/assets/")
|
||||
self.http_check(self.working_dir + "/assets/")
|
||||
self.files_check(self.working_dir + "/assets/")
|
||||
self.alerts_check()
|
||||
|
||||
def retrieve_alerts(self):
|
||||
"""
|
||||
Retrieve alerts.
|
||||
:return: list - a list of alerts wihout duplicates.
|
||||
"""
|
||||
return [dict(t) for t in {tuple(d.items()) for d in self.alerts}]
|
||||
|
||||
def retrieve_whitelist(self):
|
||||
"""
|
||||
Retrieve whitelisted elements.
|
||||
:return: list - a list of whitelisted elements wihout duplicates.
|
||||
"""
|
||||
return [dict(t) for t in {tuple(d.items()) for d in self.whitelist}]
|
||||
|
||||
def retrieve_conns(self):
|
||||
"""
|
||||
Retrieve not whitelisted elements.
|
||||
:return: list - a list of non-whitelisted elements wihout duplicates.
|
||||
"""
|
||||
return self.conns
|
||||
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from classes.parsezeeklogs import ParseZeekLogs
|
||||
from netaddr import IPNetwork, IPAddress
|
||||
from utils import get_iocs, get_config, get_whitelist
|
||||
from datetime import datetime
|
||||
import subprocess as sp
|
||||
import json
|
||||
import pydig
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import whois
|
||||
from security import safe_command
|
||||
|
||||
|
||||
class ZeekEngine(object):
|
||||
|
||||
def __init__(self, capture_directory):
|
||||
self.working_dir = capture_directory
|
||||
self.alerts = []
|
||||
self.conns = []
|
||||
self.ssl = []
|
||||
self.http = []
|
||||
self.dns = []
|
||||
self.files = []
|
||||
self.whitelist = []
|
||||
|
||||
# Get analysis and userlang configuration
|
||||
self.heuristics_analysis = get_config(("analysis", "heuristics"))
|
||||
self.iocs_analysis = get_config(("analysis", "iocs"))
|
||||
self.whitelist_analysis = get_config(("analysis", "whitelist"))
|
||||
self.active_analysis = get_config(("analysis", "active"))
|
||||
self.userlang = get_config(("frontend", "user_lang"))
|
||||
|
||||
# Retreive IOCs.
|
||||
if self.iocs_analysis:
|
||||
self.bl_cidrs = [[IPNetwork(cidr[0]), cidr[1]]
|
||||
for cidr in get_iocs("cidr")]
|
||||
self.bl_hosts = get_iocs("ip4addr") + get_iocs("ip6addr")
|
||||
self.bl_domains = get_iocs("domain")
|
||||
self.bl_freedns = get_iocs("freedns")
|
||||
self.bl_nameservers = get_iocs("ns")
|
||||
self.bl_tlds = get_iocs("tld")
|
||||
|
||||
# Retreive whitelisted items.
|
||||
if self.whitelist_analysis:
|
||||
self.wl_cidrs = [IPNetwork(cidr) for cidr in get_whitelist("cidr")]
|
||||
self.wl_hosts = get_whitelist("ip4addr") + get_whitelist("ip6addr")
|
||||
self.wl_domains = get_whitelist("domain")
|
||||
|
||||
# Load template language
|
||||
if not re.match("^[a-z]{2,3}$", self.userlang):
|
||||
self.userlang = "en"
|
||||
with open(os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "locales/{}.json".format(self.userlang))) as f:
|
||||
self.template = json.load(f)["alerts"]
|
||||
|
||||
def fill_dns(self, dir):
|
||||
"""
|
||||
Fill the DNS resolutions thanks to the dns.log.
|
||||
:return: nothing - all resolutions appended to self.dns.
|
||||
"""
|
||||
if os.path.isfile(os.path.join(dir, "dns.log")):
|
||||
for record in ParseZeekLogs(os.path.join(dir, "dns.log"), output_format="json", safe_headers=False):
|
||||
if record is not None:
|
||||
if record["qtype_name"] in ["A", "AAAA"]:
|
||||
d = {"domain": record["query"],
|
||||
"answers": record["answers"].split(",")}
|
||||
if d not in self.dns:
|
||||
self.dns.append(d)
|
||||
|
||||
def netflow_check(self, dir):
|
||||
"""
|
||||
Enrich and check the netflow from the conn.log against whitelist and IOCs.
|
||||
:return: nothing - all stuff appended to self.alerts
|
||||
"""
|
||||
max_ports = get_config(("analysis", "max_ports"))
|
||||
http_default_port = get_config(("analysis", "http_default_port"))
|
||||
|
||||
# Get the netflow from conn.log.
|
||||
if os.path.isfile(os.path.join(dir, "conn.log")):
|
||||
for record in ParseZeekLogs(os.path.join(dir, "conn.log"), output_format="json", safe_headers=False):
|
||||
if record is not None:
|
||||
c = {"ip_dst": record["id.resp_h"],
|
||||
"proto": record["proto"],
|
||||
"port_dst": record["id.resp_p"],
|
||||
"service": record["service"],
|
||||
"alert_tiggered": False}
|
||||
if c not in self.conns:
|
||||
self.conns.append(c)
|
||||
|
||||
# Let's add some dns resolutions.
|
||||
for c in self.conns:
|
||||
c["resolution"] = self.resolve(c["ip_dst"])
|
||||
|
||||
# Order the conns list by the resolution field.
|
||||
self.conns = sorted(self.conns, key=lambda c: c["resolution"])
|
||||
|
||||
# Check for whitelisted assets, if any, delete the record.
|
||||
if self.whitelist_analysis:
|
||||
|
||||
for i, c in enumerate(self.conns):
|
||||
if c["ip_dst"] in [ip for ip in self.wl_hosts]:
|
||||
self.whitelist.append(self.conns[i])
|
||||
self.conns[i] = False
|
||||
elif c["resolution"] in self.wl_domains:
|
||||
self.whitelist.append(self.conns[i])
|
||||
self.conns[i] = False
|
||||
elif True in [c["resolution"].endswith("." + dom) for dom in self.wl_domains]:
|
||||
self.whitelist.append(self.conns[i])
|
||||
self.conns[i] = False
|
||||
elif True in [IPAddress(c["ip_dst"]) in cidr for cidr in self.wl_cidrs]:
|
||||
self.whitelist.append(self.conns[i])
|
||||
self.conns[i] = False
|
||||
|
||||
# Let's delete whitelisted connections.
|
||||
self.conns = list(filter(lambda c: c != False, self.conns))
|
||||
|
||||
if self.heuristics_analysis:
|
||||
for c in self.conns:
|
||||
# Check for UDP / ICMP (strange from a smartphone.)
|
||||
if c["proto"] in ["UDP", "ICMP"]:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["PROTO-01"]["title"].format(c["proto"].upper(), c["resolution"]),
|
||||
"description": self.template["PROTO-01"]["description"].format(c["proto"].upper(), c["resolution"]),
|
||||
"host": c["resolution"],
|
||||
"level": "Moderate",
|
||||
"id": "PROTO-01"})
|
||||
# Check for use of ports over 1024.
|
||||
if c["port_dst"] >= max_ports:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["PROTO-02"]["title"].format(c["proto"].upper(), c["resolution"], max_ports),
|
||||
"description": self.template["PROTO-02"]["description"].format(c["proto"].upper(), c["resolution"], c["port_dst"]),
|
||||
"host": c["resolution"],
|
||||
"level": "Low",
|
||||
"id": "PROTO-02"})
|
||||
# Check for use of HTTP.
|
||||
if c["service"] == "http" and c["port_dst"] == http_default_port:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["PROTO-03"]["title"].format(c["resolution"]),
|
||||
"description": self.template["PROTO-03"]["description"].format(c["resolution"]),
|
||||
"host": c["resolution"],
|
||||
"level": "Low",
|
||||
"id": "PROTO-03"})
|
||||
|
||||
# Check for use of HTTP on a non standard port.
|
||||
if c["service"] == "http" and c["port_dst"] != http_default_port:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["PROTO-04"]["title"].format(c["resolution"], c["port_dst"]),
|
||||
"description": self.template["PROTO-04"]["description"].format(c["resolution"], c["port_dst"]),
|
||||
"host": c["resolution"],
|
||||
"level": "Moderate",
|
||||
"id": "PROTO-04"})
|
||||
# Check for non-resolved IP address.
|
||||
if c["ip_dst"] == c["resolution"]:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["PROTO-05"]["title"].format(c["ip_dst"]),
|
||||
"description": self.template["PROTO-05"]["description"].format(c["ip_dst"]),
|
||||
"host": c["ip_dst"],
|
||||
"level": "Low",
|
||||
"id": "PROTO-05"})
|
||||
|
||||
if self.iocs_analysis:
|
||||
|
||||
for c in self.conns:
|
||||
# Check for blacklisted IP address.
|
||||
for host in self.bl_hosts:
|
||||
if c["ip_dst"] == host[0]:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["IOC-01"]["title"].format(c["resolution"], c["ip_dst"], host[1].upper()),
|
||||
"description": self.template["IOC-01"]["description"].format(c["ip_dst"]),
|
||||
"host": c["resolution"],
|
||||
"level": "High",
|
||||
"id": "IOC-01"})
|
||||
break
|
||||
# Check for blacklisted CIDR.
|
||||
for cidr in self.bl_cidrs:
|
||||
if IPAddress(c["ip_dst"]) in cidr[0]:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["IOC-02"]["title"].format(c["resolution"], cidr[0], cidr[1].upper()),
|
||||
"description": self.template["IOC-02"]["description"].format(c["resolution"]),
|
||||
"host": c["resolution"],
|
||||
"level": "Moderate",
|
||||
"id": "IOC-02"})
|
||||
# Check for blacklisted domain.
|
||||
for domain in self.bl_domains:
|
||||
if c["resolution"].endswith(domain[0]):
|
||||
if domain[1] != "tracker":
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["IOC-03"]["title"].format(c["resolution"], domain[1].upper()),
|
||||
"description": self.template["IOC-03"]["description"].format(c["resolution"]),
|
||||
"host": c["resolution"],
|
||||
"level": "High",
|
||||
"id": "IOC-03"})
|
||||
else:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["IOC-04"]["title"].format(c["resolution"], domain[1].upper()),
|
||||
"description": self.template["IOC-04"]["description"].format(c["resolution"]),
|
||||
"host": c["resolution"],
|
||||
"level": "Moderate",
|
||||
"id": "IOC-04"})
|
||||
# Check for blacklisted FreeDNS.
|
||||
for domain in self.bl_freedns:
|
||||
if c["resolution"].endswith("." + domain[0]):
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["IOC-05"]["title"].format(c["resolution"]),
|
||||
"description": self.template["IOC-05"]["description"].format(c["resolution"]),
|
||||
"host": c["resolution"],
|
||||
"level": "Moderate",
|
||||
"id": "IOC-05"})
|
||||
|
||||
# Check for suspect tlds.
|
||||
for tld in self.bl_tlds:
|
||||
if c["resolution"].endswith(tld[0]):
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["IOC-06"]["title"].format(c["resolution"]),
|
||||
"description": self.template["IOC-06"]["description"].format(c["resolution"], tld[0]),
|
||||
"host": c["resolution"],
|
||||
"level": "Low",
|
||||
"id": "IOC-06"})
|
||||
if self.active_analysis:
|
||||
for c in self.conns:
|
||||
try: # Domain nameservers check.
|
||||
name_servers = pydig.query(c["resolution"], "NS")
|
||||
if len(name_servers):
|
||||
for ns in self.bl_nameservers:
|
||||
if name_servers[0].endswith(".{}.".format(ns[0])):
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["ACT-01"]["title"].format(c["resolution"], name_servers[0]),
|
||||
"description": self.template["ACT-01"]["description"].format(c["resolution"]),
|
||||
"host": c["resolution"],
|
||||
"level": "Moderate",
|
||||
"id": "ACT-01"})
|
||||
except:
|
||||
pass
|
||||
|
||||
try: # Domain history check.
|
||||
|
||||
whois_record = whois.whois(c["resolution"])
|
||||
creation_date = whois_record.creation_date if type(
|
||||
whois_record.creation_date) is not list else whois_record.creation_date[0]
|
||||
creation_days = abs((datetime.now() - creation_date).days)
|
||||
if creation_days < 365:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["ACT-02"]["title"].format(c["resolution"], creation_days),
|
||||
"description": self.template["ACT-02"]["description"].format(c["resolution"]),
|
||||
"host": c["resolution"],
|
||||
"level": "Moderate",
|
||||
"id": "ACT-02"})
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
def files_check(self, dir):
|
||||
"""
|
||||
Check on the files.log:
|
||||
* Check certificates sha1
|
||||
* [todo] Check possible binary data or APKs?
|
||||
:return: nothing - all stuff appended to self.alerts
|
||||
"""
|
||||
|
||||
if not self.iocs_analysis:
|
||||
return
|
||||
|
||||
bl_certs = get_iocs("sha1cert")
|
||||
|
||||
if os.path.isfile(os.path.join(dir, "files.log")):
|
||||
for record in ParseZeekLogs(os.path.join(dir, "files.log"), output_format="json", safe_headers=False):
|
||||
if record is not None:
|
||||
f = {"filename": record["filename"],
|
||||
"ip_src": record["id.orig_h"],
|
||||
"ip_dst": record["id.resp_h"],
|
||||
"mime_type": record["mime_type"],
|
||||
"sha1": record["sha1"]}
|
||||
if f not in self.files:
|
||||
self.files.append(f)
|
||||
|
||||
for f in self.files:
|
||||
if f["mime_type"] == "application/x-x509-user-cert":
|
||||
for cert in bl_certs: # Check for blacklisted certificate.
|
||||
if f["sha1"] == cert[0]:
|
||||
host = self.resolve(f["ip_src"])
|
||||
self.alerts.append({"title": self.template["IOC-07"]["title"].format(cert[1].upper(), host),
|
||||
"description": self.template["IOC-07"]["description"].format(f["sha1"], host),
|
||||
"host": host,
|
||||
"level": "High",
|
||||
"id": "IOC-07"})
|
||||
|
||||
def http_check(self, dir):
|
||||
"""
|
||||
Check on the http.log:
|
||||
* Blacklisted domain/tld from the Host header field.
|
||||
Can be used when no DNS query have been done during the session (already cached by the device.)
|
||||
:return: nothing - all stuff appended to self.alerts
|
||||
"""
|
||||
|
||||
if os.path.isfile(os.path.join(dir, "http.log")):
|
||||
for record in ParseZeekLogs(os.path.join(dir, "http.log"), output_format="json", safe_headers=False):
|
||||
if record is not None:
|
||||
c = {"host": record['host']}
|
||||
if c not in self.http:
|
||||
self.http.append(c)
|
||||
|
||||
if self.iocs_analysis:
|
||||
for c in self.http:
|
||||
|
||||
# If we already know the host form DNS, let's continue.
|
||||
if c["host"] in [r["domain"] for r in self.dns]:
|
||||
continue
|
||||
|
||||
# Check for blacklisted domain.
|
||||
for h in self.bl_domains:
|
||||
if h[1] != "tracker":
|
||||
if c["host"].endswith(h[0]):
|
||||
self.alerts.append({"title": self.template["IOC-08"]["title"].format(c["host"], h[1].upper()),
|
||||
"description": self.template["IOC-08"]["description"].format(c["host"]),
|
||||
"host": c["host"],
|
||||
"level": "High",
|
||||
"id": "IOC-08"})
|
||||
# Check for freedns.
|
||||
for h in self.bl_freedns:
|
||||
if c["host"].endswith(h[0]):
|
||||
self.alerts.append({"title": self.template["IOC-09"]["title"].format(c["host"]),
|
||||
"description": self.template["IOC-09"]["description"].format(c["host"]),
|
||||
"host": c["host"],
|
||||
"level": "Moderate",
|
||||
"id": "IOC-09"})
|
||||
# Check for fancy TLD.
|
||||
for h in self.bl_tlds:
|
||||
if c["host"].endswith(h[0]):
|
||||
self.alerts.append({"title": self.template["IOC-10"]["title"].format(c["host"]),
|
||||
"description": self.template["IOC-10"]["description"].format(c["host"], h[0]),
|
||||
"host": c["host"],
|
||||
"level": "Low",
|
||||
"id": "IOC-10"})
|
||||
|
||||
def ssl_check(self, dir):
|
||||
"""
|
||||
Check on the ssl.log:
|
||||
* SSL connections which doesn't use the 443.
|
||||
* "Free" certificate issuer (taken from the config).
|
||||
* Self-signed certificates.
|
||||
* Blacklisted domain in the CN
|
||||
:return: nothing - all stuff appended to self.alerts
|
||||
"""
|
||||
ssl_default_ports = get_config(("analysis", "ssl_default_ports"))
|
||||
free_issuers = get_config(("analysis", "free_issuers"))
|
||||
|
||||
if os.path.isfile(os.path.join(dir, "ssl.log")):
|
||||
for record in ParseZeekLogs(os.path.join(dir, "ssl.log"), output_format="json", safe_headers=False):
|
||||
if record is not None:
|
||||
c = {"host": record['id.resp_h'],
|
||||
"port": record['id.resp_p'],
|
||||
"issuer": record["issuer"] if "issuer" in record else "",
|
||||
"validation_status": record["validation_status"],
|
||||
"cn": record["server_name"] if "server_name" in record else ""}
|
||||
if c not in self.ssl:
|
||||
self.ssl.append(c)
|
||||
|
||||
if self.heuristics_analysis:
|
||||
for cert in self.ssl:
|
||||
host = self.resolve(cert["host"])
|
||||
|
||||
# If the associated host has not whitelisted, check the cert.
|
||||
for c in self.conns:
|
||||
if host in c["resolution"]:
|
||||
# Check for non generic SSL port.
|
||||
if cert["port"] not in ssl_default_ports:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["SSL-01"]["title"].format(cert["port"], host),
|
||||
"description": self.template["SSL-01"]["description"].format(host),
|
||||
"host": host,
|
||||
"level": "Moderate",
|
||||
"id": "SSL-01"})
|
||||
# Check Free SSL certificates.
|
||||
if cert["issuer"] in free_issuers:
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["SSL-02"]["title"].format(host),
|
||||
"description": self.template["SSL-02"]["description"],
|
||||
"host": host,
|
||||
"level": "Moderate",
|
||||
"id": "SSL-02"})
|
||||
# Check for self-signed certificates.
|
||||
if cert["validation_status"] == "self signed certificate in certificate chain":
|
||||
c["alert_tiggered"] = True
|
||||
self.alerts.append({"title": self.template["SSL-03"]["title"].format(host),
|
||||
"description": self.template["SSL-03"]["description"].format(host),
|
||||
"host": host,
|
||||
"level": "Moderate",
|
||||
"id": "SSL-03"})
|
||||
|
||||
if self.iocs_analysis:
|
||||
for cert in self.ssl:
|
||||
# Check if the domain in the certificate haven't been blacklisted
|
||||
# This check can be good if the domain has already been cached by
|
||||
# the device so it wont appear in self.dns.
|
||||
|
||||
if any(cert["cn"].endswith(r["domain"]) for r in self.dns):
|
||||
continue
|
||||
|
||||
for domain in self.bl_domains:
|
||||
if domain[1] != "tracker":
|
||||
if cert["cn"].endswith(domain[0]):
|
||||
self.alerts.append({"title": self.template["SSL-04"]["title"].format(domain[0], domain[1].upper()),
|
||||
"description": self.template["SSL-04"]["description"].format(domain[0]),
|
||||
"host": domain[0],
|
||||
"level": "High",
|
||||
"id": "SSL-04"})
|
||||
|
||||
def alerts_check(self):
|
||||
"""
|
||||
Leverage an advice to the user based on the trigered hosts
|
||||
:return: nothing - all generated alerts appended to self.alerts
|
||||
"""
|
||||
hosts = {}
|
||||
|
||||
for alert in [dict(t) for t in {tuple(d.items()) for d in self.alerts}]:
|
||||
if alert["host"] not in hosts:
|
||||
hosts[alert["host"]] = 1
|
||||
else:
|
||||
hosts[alert["host"]] += 1
|
||||
|
||||
for host, nb in hosts.items():
|
||||
if nb >= get_config(("analysis", "max_alerts")):
|
||||
self.alerts.append({"title": self.template["ADV-01"]["title"].format(host),
|
||||
"description": self.template["ADV-01"]["description"].format(host, nb),
|
||||
"host": host,
|
||||
"level": "Moderate",
|
||||
"id": "ADV-01"})
|
||||
|
||||
def resolve(self, ip_addr):
|
||||
"""
|
||||
A simple method to retreive DNS names from IP addresses
|
||||
in order to replace them in alerts.
|
||||
|
||||
:return: String - DNS record or IP Address.
|
||||
"""
|
||||
for record in self.dns:
|
||||
if ip_addr in record["answers"]:
|
||||
return record["domain"]
|
||||
return ip_addr
|
||||
|
||||
def start_zeek(self):
|
||||
"""
|
||||
Start zeek and check the logs.
|
||||
"""
|
||||
safe_command.run(sp.Popen, "cd {} && /opt/zeek/bin/zeek -Cr capture.pcap protocols/ssl/validate-certs".format(
|
||||
self.working_dir), shell=False).wait()
|
||||
safe_command.run(sp.Popen, "cd {} && mv *.log assets/".format(self.working_dir),
|
||||
shell=False).wait()
|
||||
self.fill_dns(self.working_dir + "/assets/")
|
||||
self.netflow_check(self.working_dir + "/assets/")
|
||||
self.ssl_check(self.working_dir + "/assets/")
|
||||
self.http_check(self.working_dir + "/assets/")
|
||||
self.files_check(self.working_dir + "/assets/")
|
||||
self.alerts_check()
|
||||
|
||||
def retrieve_alerts(self):
|
||||
"""
|
||||
Retrieve alerts.
|
||||
:return: list - a list of alerts wihout duplicates.
|
||||
"""
|
||||
return [dict(t) for t in {tuple(d.items()) for d in self.alerts}]
|
||||
|
||||
def retrieve_whitelist(self):
|
||||
"""
|
||||
Retrieve whitelisted elements.
|
||||
:return: list - a list of whitelisted elements wihout duplicates.
|
||||
"""
|
||||
return [dict(t) for t in {tuple(d.items()) for d in self.whitelist}]
|
||||
|
||||
def retrieve_conns(self):
|
||||
"""
|
||||
Retrieve not whitelisted elements.
|
||||
:return: list - a list of non-whitelisted elements wihout duplicates.
|
||||
"""
|
||||
return self.conns
|
||||
|
@ -1,17 +1,20 @@
|
||||
pymisp==2.4.165.1
|
||||
sqlalchemy==1.4.48
|
||||
ipwhois==1.2.0
|
||||
netaddr==0.8.0
|
||||
flask==1.1.2
|
||||
flask_httpauth==4.8.0
|
||||
pyjwt==1.7.1
|
||||
psutil==5.8.0
|
||||
pydig==0.4.0
|
||||
pyudev==0.24.0
|
||||
pyyaml==5.3.1
|
||||
wifi==0.3.8
|
||||
qrcode==7.3.1
|
||||
netifaces==0.11.0
|
||||
weasyprint==59.0
|
||||
python-whois==0.8.0
|
||||
six==1.16.0
|
||||
pymisp==2.4.165.1
|
||||
sqlalchemy==1.4.48
|
||||
ipwhois==1.2.0
|
||||
netaddr==0.8.0
|
||||
flask==1.1.2
|
||||
flask_httpauth==4.8.0
|
||||
pyjwt==1.7.1
|
||||
psutil==5.8.0
|
||||
pydig==0.4.0
|
||||
pyudev==0.24.0
|
||||
pyyaml==5.3.1
|
||||
wifi==0.3.8
|
||||
qrcode==7.3.1
|
||||
netifaces==0.11.0
|
||||
weasyprint==59.0
|
||||
python-whois==0.8.0
|
||||
six==1.16.0
|
||||
security==1.2.1 \
|
||||
--hash=sha256:4ca5f8cfc6b836e2192a84bb5a28b72c17f3cd1abbfe3281f917394c6e6c9238
|
||||
--hash=sha256:0a9dc7b457330e6d0f92bdae3603fecb85394beefad0fd3b5058758a58781ded
|
||||
|
191
server/backend/diagnostics.py
Normal file
191
server/backend/diagnostics.py
Normal file
@ -0,0 +1,191 @@
|
||||
#!/usr/bin/python
|
||||
import os
|
||||
import subprocess
|
||||
import platform
|
||||
import socket
|
||||
import pkg_resources
|
||||
import psutil
|
||||
|
||||
__author__ = 'Eugeny N Ablesov'
|
||||
__version__ = '1.0.17'
|
||||
|
||||
def collect_accounts_info():
|
||||
""" This call collects generic information about
|
||||
user accounts presented on system running TinyCheck.
|
||||
|
||||
No personal information collected or provided by this call.
|
||||
"""
|
||||
accs = { }
|
||||
users = psutil.users()
|
||||
for user in users:
|
||||
accs[user.name + '@' + user.host] = {
|
||||
'started': user.started,
|
||||
'term': user.terminal
|
||||
}
|
||||
alt_user = os.getenv('SUDO_USER', os.getenv('USER'))
|
||||
usr = 'root' if os.path.expanduser('~') == '/root' else alt_user
|
||||
pid = psutil.Process().pid
|
||||
term = psutil.Process().terminal() if 'Linux' in platform.system() else 'win'
|
||||
accs[usr + '@' + term] = { 'pid': pid }
|
||||
return accs
|
||||
|
||||
def collect_os_info():
|
||||
""" This call collects generic information about
|
||||
operating system running TinyCheck.
|
||||
|
||||
No personal information collected or provided by this call.
|
||||
"""
|
||||
os_info = { }
|
||||
os_info['system'] = platform.system()
|
||||
os_info['release'] = platform.release()
|
||||
os_info['version'] = platform.version()
|
||||
os_info['platform'] = platform.platform(aliased=True)
|
||||
if 'Windows' in os_info['system']:
|
||||
os_info['dist'] = platform.win32_ver()
|
||||
if 'Linux' in os_info['system']:
|
||||
os_info['dist'] = platform.libc_ver()
|
||||
return os_info
|
||||
|
||||
def collect_hardware_info():
|
||||
""" This call collects information about hardware running TinyCheck.
|
||||
|
||||
No personal information collected or provided by this call.
|
||||
"""
|
||||
hw_info = { }
|
||||
hw_info['arch'] = platform.architecture()
|
||||
hw_info['machine'] = platform.machine()
|
||||
hw_info['cpus'] = psutil.cpu_count(logical=False)
|
||||
hw_info['cores'] = psutil.cpu_count()
|
||||
hw_info['load'] = psutil.getloadavg()
|
||||
disk_info = psutil.disk_usage('/')
|
||||
hw_info['disk'] = {
|
||||
'total': disk_info.total,
|
||||
'used': disk_info.used,
|
||||
'free': disk_info.free
|
||||
}
|
||||
return hw_info
|
||||
|
||||
def collect_network_info():
|
||||
""" This call collects information about
|
||||
network configuration and state running TinyCheck.
|
||||
|
||||
No personal information collected or provided by this call.
|
||||
"""
|
||||
net_info = { }
|
||||
net_info['namei'] = socket.if_nameindex()
|
||||
addrs = psutil.net_if_addrs()
|
||||
state = psutil.net_io_counters(pernic=True)
|
||||
for interface in addrs.keys():
|
||||
net_info[interface] = { }
|
||||
int_info = state[interface]
|
||||
props = [p for p in dir(int_info)
|
||||
if not p.startswith("_")
|
||||
and not p == "index"
|
||||
and not p == "count"]
|
||||
for prop in props:
|
||||
net_info[interface][prop] = getattr(int_info, prop)
|
||||
return net_info
|
||||
|
||||
def collect_dependency_info(package_list):
|
||||
""" This call collects information about
|
||||
python packages required to run TinyCheck.
|
||||
|
||||
No personal information collected or provided by this call.
|
||||
"""
|
||||
dependencies = { }
|
||||
installed_packages = list(pkg_resources.working_set)
|
||||
installed_packages_list = sorted(["%s==%s"
|
||||
% (installed.key, installed.version)
|
||||
for installed in installed_packages])
|
||||
for pkg in installed_packages_list:
|
||||
[package_name, package_version] = pkg.split('==')
|
||||
if package_name in package_list:
|
||||
dependencies[package_name] = package_version
|
||||
return dependencies
|
||||
|
||||
def collect_db_tables_records_count(db_path, tables):
|
||||
result = { }
|
||||
for table in tables:
|
||||
query = 'SELECT COUNT(*) FROM %s' % (table)
|
||||
sqlite_call = subprocess.Popen(['sqlite3', db_path, query], stdout = subprocess.PIPE)
|
||||
stout, sterr = sqlite_call.communicate()
|
||||
val = stout.decode("utf-8")
|
||||
recs = int(val) if val else 0
|
||||
result[table] = recs
|
||||
return result
|
||||
|
||||
def collect_internal_state(db_path, tables, to_check):
|
||||
""" This call collects information about
|
||||
installed TinyCheck instance and its internal state.
|
||||
|
||||
No personal information collected or provided by this call.
|
||||
"""
|
||||
state_ = { }
|
||||
available = os.path.isfile(db_path)
|
||||
dbsize = 0
|
||||
state_['db'] = {
|
||||
'available': available,
|
||||
'size': dbsize
|
||||
}
|
||||
state_['db']['records'] = { }
|
||||
if available:
|
||||
state_['db']['size'] = os.stat(db_path).st_size
|
||||
state_['db']['records'] = collect_db_tables_records_count(db_path, tables)
|
||||
|
||||
services_ = { }
|
||||
for alias in to_check:
|
||||
status = subprocess.call(['systemctl', 'is-active', '--quiet', '%s' % (to_check[alias])])
|
||||
state = ''
|
||||
if status != 0:
|
||||
sysctl_call = subprocess.Popen(
|
||||
["systemctl", "status", "%s" % (to_check[alias]),
|
||||
r"|",
|
||||
"grep",
|
||||
r"''"],
|
||||
stdout = subprocess.PIPE,
|
||||
stderr = subprocess.PIPE)
|
||||
stout, sterr = sysctl_call.communicate()
|
||||
state = stout.decode("utf-8")
|
||||
errs = sterr.decode("utf-8")
|
||||
if "could not be found" in errs:
|
||||
state = 'Service not found'
|
||||
services_[alias] = {
|
||||
'running': status == 0,
|
||||
'status': status,
|
||||
'state': state
|
||||
}
|
||||
state_['svc'] = services_
|
||||
return state_
|
||||
|
||||
def main():
|
||||
print("TinyCheck diagnostics script.\nVersion: %s" % (__version__))
|
||||
print("")
|
||||
|
||||
db_path = '/usr/share/tinycheck/tinycheck.sqlite3'
|
||||
tables = ['iocs', 'whitelist', 'misp']
|
||||
services = { }
|
||||
services['frontend'] = 'tinycheck-frontend.service'
|
||||
services['backend'] = 'tinycheck-backend.service'
|
||||
services['kiosk'] = 'tinycheck-kiosk.service'
|
||||
services['watchers'] = 'tinycheck-watchers.service'
|
||||
|
||||
deps = [
|
||||
'pymisp', 'sqlalchemy', 'ipwhois',
|
||||
'netaddr', 'flask', 'flask_httpauth',
|
||||
'pyjwt', 'psutil', 'pydig', 'pyudev',
|
||||
'pyyaml', 'wifi', 'qrcode', 'netifaces',
|
||||
'weasyprint', 'python-whois', 'six' ]
|
||||
|
||||
diagnostics = { }
|
||||
diagnostics['acc'] = collect_accounts_info()
|
||||
diagnostics['os'] = collect_os_info()
|
||||
diagnostics['hw'] = collect_hardware_info()
|
||||
diagnostics['net'] = collect_network_info()
|
||||
diagnostics['deps'] = collect_dependency_info(deps)
|
||||
diagnostics['state'] = collect_internal_state(db_path, tables, services)
|
||||
report = { 'diagnostics': diagnostics }
|
||||
print(report)
|
||||
print("")
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Reference in New Issue
Block a user