First commit

This commit is contained in:
Félix Aime
2020-11-24 19:45:03 +01:00
parent c042b71634
commit 513f6b1b02
130 changed files with 76873 additions and 0 deletions

0
analysis/__init__.py Normal file
View File

61
analysis/analysis.py Normal file
View File

@ -0,0 +1,61 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from classes.zeekengine import ZeekEngine
from classes.suricataengine import SuricataEngine
from multiprocessing import Process, Manager
import sys
import re
import json
import os
"""
This file is called by the frontend but the analysis
can be done in standalone by just submitting the directory
containing a capture.pcap file.
"""
if __name__ == "__main__":
if len(sys.argv) == 2:
capture_directory = sys.argv[1]
if os.path.isdir(capture_directory):
# Alerts bucket.
manager = Manager()
alerts = manager.dict()
def zeekengine(alerts):
zeek = ZeekEngine(capture_directory)
zeek.start_zeek()
alerts["zeek"] = zeek.get_alerts()
def snortengine(alerts):
suricata = SuricataEngine(capture_directory)
suricata.start_suricata()
alerts["suricata"] = suricata.get_alerts()
# Start the engines.
p1 = Process(target=zeekengine, args=(alerts,))
p2 = Process(target=snortengine, args=(alerts,))
p1.start()
p2.start()
# Wait to their end.
p1.join()
p2.join()
# Some formating and alerts.json writing.
with open(os.path.join(capture_directory, "alerts.json"), "w") as f:
report = {"high": [], "moderate": [], "low": []}
for alert in (alerts["zeek"] + alerts["suricata"]):
if alert["level"] == "High":
report["high"].append(alert)
if alert["level"] == "Moderate":
report["moderate"].append(alert)
if alert["level"] == "Low":
report["low"].append(alert)
f.write(json.dumps(report))
else:
print("The directory doesn't exist.")
else:
print("Please specify a capture directory in argument.")

View File

@ -0,0 +1,178 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from json import loads, dumps
from collections import OrderedDict
from datetime import datetime
from traceback import print_exc
# Taken from https://github.com/dgunter/ParseZeekLogs <3
class ParseZeekLogs(object):
"""
Class that parses Zeek logs and allows log data to be output in CSV or json format.
Attributes: filepath: Path of Zeek log file to read
"""
def __init__(self, filepath, batchsize=500, fields=None, output_format=None, ignore_keys=[], meta={}, safe_headers=False):
self.fd = open(filepath, "r")
self.options = OrderedDict()
self.firstRun = True
self.filtered_fields = fields
self.batchsize = batchsize
self.output_format = output_format
self.ignore_keys = ignore_keys
self.meta = meta
self.safe_headers = safe_headers
# Convert ' to " in meta string
meta = loads(dumps(meta).replace("'", '"'))
# Read the header option lines
l = self.fd.readline().strip()
while l.strip().startswith("#"):
# Parse the options out
if l.startswith("#separator"):
key = str(l[1:].split(" ")[0])
value = str.encode(l[1:].split(
" ")[1].strip()).decode('unicode_escape')
self.options[key] = value
elif l.startswith("#"):
key = str(l[1:].split(self.options.get('separator'))[0])
value = l[1:].split(self.options.get('separator'))[1:]
self.options[key] = value
# Read the next line
l = self.fd.readline().strip()
self.firstLine = l
# Save mapping of fields to values:
self.fields = self.options.get('fields')
self.types = self.options.get('types')
self.data_types = {}
for i, val in enumerate(self.fields):
# Convert field names if safe_headers is enabled
if self.safe_headers is True:
self.fields[i] = self.fields[i].replace(".", "_")
# Match types with each other
self.data_types[self.fields[i]] = self.types[i]
def __del__(self):
self.fd.close()
def __iter__(self):
return self
def __next__(self):
retVal = ""
if self.firstRun is True:
retVal = self.firstLine
self.firstRun = False
else:
retVal = self.fd.readline().strip()
# If an empty string is returned, readline is done reading
if retVal == "" or retVal is None:
raise StopIteration
# Split out the data we are going to return
retVal = retVal.split(self.options.get('separator'))
record = None
# Make sure we aren't dealing with a comment line
if len(retVal) > 0 and not str(retVal[0]).strip().startswith("#") \
and len(retVal) is len(self.options.get("fields")):
record = OrderedDict()
# Prepare fields for conversion
for x in range(0, len(retVal)):
if self.safe_headers is True:
converted_field_name = self.options.get(
"fields")[x].replace(".", "_")
else:
converted_field_name = self.options.get("fields")[x]
if self.filtered_fields is None or converted_field_name in self.filtered_fields:
# Translate - to "" to fix a conversation error
if retVal[x] == "-":
retVal[x] = ""
# Save the record field if the field isn't filtered out
record[converted_field_name] = retVal[x]
# Convert values to the appropriate record type
record = self.convert_values(
record, self.ignore_keys, self.data_types)
if record is not None and self.output_format == "json":
# Output will be json
# Add metadata to json
for k, v in self.meta.items():
record[k] = v
retVal = record
elif record is not None and self.output_format == "csv":
retVal = ""
# Add escaping to csv format
for k, v in record.items():
# Add escaping to string values
if isinstance(v, str):
retVal += str("\"" + str(v).strip() + "\"" + ",")
else:
retVal += str(str(v).strip() + ",")
# Remove the trailing comma
retVal = retVal[:-1]
else:
retVal = None
return retVal
def convert_values(self, data, ignore_keys=[], data_types={}):
keys_to_delete = []
for k, v in data.items():
# print("evaluating k: " + str(k) + " v: " + str(v))
if isinstance(v, dict):
data[k] = self.convert_values(v)
else:
if data_types.get(k) is not None:
if (data_types.get(k) == "port" or data_types.get(k) == "count"):
if v != "":
data[k] = int(v)
else:
keys_to_delete.append(k)
elif (data_types.get(k) == "double" or data_types.get(k) == "interval"):
if v != "":
data[k] = float(v)
else:
keys_to_delete.append(k)
elif data_types.get(k) == "bool":
data[k] = bool(v)
else:
data[k] = v
for k in keys_to_delete:
del data[k]
return data
def get_fields(self):
"""Returns all fields present in the log file
Returns:
A python list containing all field names in the log file
"""
field_names = ""
if self.output_format == "csv":
for i, v in enumerate(self.fields):
if self.filtered_fields is None or v in self.filtered_fields:
field_names += str(v) + ","
# Remove the trailing comma
field_names = field_names[:-1].strip()
else:
field_names = []
for i, v in enumerate(self.fields):
if self.filtered_fields is None or v in self.filtered_fields:
field_names.append(v)
return field_names

View File

@ -0,0 +1,92 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from utils import get_iocs, get_apname, get_device
import time
import os
import subprocess as sp
import re
import json
class SuricataEngine():
def __init__(self, capture_directory):
self.wdir = capture_directory
self.alerts = []
self.rules_file = "/tmp/rules.rules"
self.pcap_path = os.path.join(self.wdir, "capture.pcap")
self.rules = [r[0] for r in get_iocs(
"snort")] + self.generate_contextual_alerts()
def start_suricata(self):
"""
Launch suricata against the capture.pcap file.
:return: nothing.
"""
# Generate the rule file an launch suricata.
if self.generate_rule_file():
sp.Popen("suricata -S {} -r {} -l /tmp/".format(self.rules_file,
self.pcap_path), shell=True).wait()
# Let's parse the log file.
for line in open("/tmp/fast.log", "r").readlines():
if "[**]" in line:
s = line.split("[**]")[1].strip()
m = re.search(
r"\[\d+\:(?P<sid>\d+)\:(?P<rev>\d+)\] (?P<title>[ -~]+)", s)
self.alerts.append({"title": "Suricata rule tiggered: {}".format(m.group('title')),
"description": """A network detection rule has been tiggered. It's likely that your device has been compromised
or contains a malicious application.""",
"level": "High",
"id": "SNORT-01"})
# Remove fast.log
os.remove("/tmp/fast.log")
def generate_rule_file(self):
"""
Generate the rules file passed to suricata.
:return: bool if operation succeed.
"""
try:
with open(self.rules_file, "w+") as f:
f.write("\n".join(self.rules))
return True
except:
return False
def generate_contextual_alerts(self):
"""
Generate contextual alerts related to the current
ssid or the device itself.
"""
apname = get_apname()
device = get_device(self.wdir.split("/")[-1])
rules = []
# Devices names to be whitelisted (can appear in UA of HTTP requests. So FP high alerts)
device_names = ["iphone", "ipad", "android", "samsung", "galaxy",
"huawei", "oneplus", "oppo", "pixel", "xiaomi", "realme", "chrome",
"safari"]
if apname and device:
# See if the AP name is sent in clear text over the internet.
if len(apname) >= 5:
rules.append(
'alert tcp {} any -> $EXTERNAL_NET any (content:"{}"; msg:"WiFi name sent in clear text"; sid:10000101; rev:001;)'.format(device["ip_address"], apname))
rules.append(
'alert udp {} any -> $EXTERNAL_NET any (content:"{}"; msg:"WiFi name sent in clear text"; sid:10000102; rev:001;)'.format(device["ip_address"], apname))
# See if the device name is sent in clear text over the internet.
if len(device["name"]) >= 5 and device["name"].lower() not in device_names:
rules.append('alert tcp {} any -> $EXTERNAL_NET any (content:"{}"; msg:"Device name sent in clear text"; sid:10000103; rev:001;)'.format(
device["ip_address"], device["name"]))
rules.append('alert udp {} any -> $EXTERNAL_NET any (content:"{}"; msg:"Device name sent in clear text"; sid:10000104; rev:001;)'.format(
device["ip_address"], device["name"]))
return rules
def get_alerts(self):
return [dict(t) for t in {tuple(d.items()) for d in self.alerts}]

View File

@ -0,0 +1,369 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from classes.parsezeeklogs import ParseZeekLogs
from netaddr import IPNetwork, IPAddress
from utils import get_iocs, get_config, get_whitelist
from ipwhois import IPWhois
import subprocess as sp
import json
import pydig
import os
class ZeekEngine(object):
def __init__(self, capture_directory):
self.working_dir = capture_directory
self.alerts = []
self.conns = []
self.ssl = []
self.http = []
self.dns = []
self.files = []
# Get analysis configuration
self.heuristics_analysis = get_config(("analysis", "heuristics"))
self.iocs_analysis = get_config(("analysis", "iocs"))
self.white_analysis = get_config(("analysis", "whitelist"))
def fill_dns(self, dir):
"""
Fill the DNS resolutions thanks to the dns.log.
:return: nothing - all resolutions appended to self.dns.
"""
if os.path.isfile(os.path.join(dir, "dns.log")):
for record in ParseZeekLogs(os.path.join(dir, "dns.log"), output_format="json", safe_headers=False):
if record is not None:
if record["qtype_name"] in ["A", "AAAA"]:
d = {"domain": record["query"],
"answers": record["answers"].split(",")}
if d not in self.dns:
self.dns.append(d)
def netflow_check(self, dir):
"""
Enrich and check the netflow from the conn.log against whitelist and IOCs.
:return: nothing - all stuff appended to self.alerts
"""
max_ports = get_config(("analysis", "max_ports"))
http_default_port = get_config(("analysis", "http_default_port"))
# Get the netflow from conn.log.
if os.path.isfile(os.path.join(dir, "conn.log")):
for record in ParseZeekLogs(os.path.join(dir, "conn.log"), output_format="json", safe_headers=False):
if record is not None:
c = {"ip_dst": record["id.resp_h"],
"proto": record["proto"],
"port_dst": record["id.resp_p"],
"service": record["service"]}
if c not in self.conns:
self.conns.append(c)
# Let's add some dns resolutions.
for c in self.conns:
c["resolution"] = self.resolve(c["ip_dst"])
# Order the conns list by the resolution field.
self.conns = sorted(self.conns, key=lambda c: c["resolution"])
# Check for whitelisted assets, if any, delete the record.
if self.white_analysis:
wl_cidrs = [IPNetwork(cidr) for cidr in get_whitelist("cidr")]
wl_hosts = get_whitelist("ip4addr") + get_whitelist("ip6addr")
wl_domains = get_whitelist("domain")
for i, c in enumerate(self.conns):
if c["ip_dst"] in [ip for ip in wl_hosts]:
self.conns[i] = False
elif c["resolution"] in wl_domains:
self.conns[i] = False
elif True in [c["resolution"].endswith("." + dom) for dom in wl_domains]:
self.conns[i] = False
elif True in [IPAddress(c["ip_dst"]) in cidr for cidr in wl_cidrs]:
self.conns[i] = False
# Let's delete whitelisted connections.
self.conns = list(filter(lambda c: c != False, self.conns))
if self.heuristics_analysis:
for c in self.conns:
# Check for UDP / ICMP (strange from a smartphone.)
if c["proto"] in ["UDP", "ICMP"]:
self.alerts.append({"title": "{} communication going outside the local network to {}.".format(c["proto"].upper(), c["resolution"]),
"description": "The {} protocol is commonly used in internal networks. Please, verify if the host {} leveraged other alerts which may ".format(c["proto"].upper(), c["resolution"])
+ "indicates a possible malicious behavior.",
"host": c["resolution"],
"level": "Moderate",
"id": "PROTO-01"})
# Check for use of ports over 1024.
if c["port_dst"] >= max_ports:
self.alerts.append({"title": "{} connection to {} to a port over or equal to {}.".format(c["proto"].upper(), c["resolution"], max_ports),
"description": "{} connections have been seen to {} by using the port {}. The use of non-standard port can be sometimes associated to malicious activities. ".format(c["proto"].upper(), c["resolution"], c["port_dst"])
+ "We recommend to check if this host has a good reputation by looking on other alerts and search it on the internet.",
"host": c["resolution"],
"level": "Low",
"id": "PROTO-02"})
# Check for use of HTTP.
if c["service"] == "http" and c["port_dst"] == http_default_port:
self.alerts.append({"title": "HTTP communications been done to the host {}".format(c["resolution"]),
"description": "Your device exchanged with the host {} by using HTTP, an unencrypted protocol. ".format(c["resolution"])
+ "Even if this behavior is not malicious by itself, it is unusual to see HTTP communications issued from smartphone applications "
+ "running in the background. Please check the host reputation by searching it on the internet.",
"host": c["resolution"],
"level": "Low",
"id": "PROTO-03"})
# Check for use of HTTP on a non standard port.
if c["service"] == "http" and c["port_dst"] != http_default_port:
self.alerts.append({"title": "HTTP communications have been seen to the host {} on a non standard port ({}).".format(c["resolution"], c["port_dst"]),
"description": "Your device exchanged with the host {} by using HTTP, an unencrypted protocol on the port {}. ".format(c["resolution"], c["port_dst"])
+ "This behavior is quite unusual. Please check the host reputation by searching it on the internet.",
"host": c["resolution"],
"level": "Moderate",
"id": "PROTO-04"})
# Check for non-resolved IP address.
if c["service"] == c["resolution"]:
self.alerts.append({"title": "The server {} hasn't been resolved by any DNS query during the session".format(c["ip_dst"]),
"description": "It means that the server {} is likely not resolved by any domain name or the resolution has already been cached by ".format(c["ip_dst"])
+ "the device. If the host appears in other alerts, please check it.",
"host": c["ip_dst"],
"level": "Low",
"id": "PROTO-05"})
if self.iocs_analysis:
bl_cidrs = [[IPNetwork(cidr[0]), cidr[1]]
for cidr in get_iocs("cidr")]
bl_hosts = get_iocs("ip4addr") + get_iocs("ip6addr")
bl_domains = get_iocs("domain")
bl_freedns = get_iocs("freedns")
bl_nameservers = get_iocs("ns")
bl_tlds = get_iocs("tld")
for c in self.conns:
# Check for blacklisted IP address.
for host in bl_hosts:
if c["ip_dst"] == host[0]:
self.alerts.append({"title": "A connection has been made to {} ({}) which is tagged as {}.".format(c["resolution"], c["ip_dst"], host[1].upper()),
"description": "The host {} has been explicitly blacklisted for malicious activities. Your device is likely compromised ".format(c["ip_dst"])
+ "and needs to be investigated more deeply by IT security professionals.",
"host": c["resolution"],
"level": "High",
"id": "IOC-01"})
break
# Check for blacklisted CIDR.
for cidr in bl_cidrs:
if IPAddress(c["ip_dst"]) in cidr[0]:
self.alerts.append({"title": "Communication to {} under the CIDR {} which is tagged as {}.".format(c["resolution"], cidr[0], cidr[1].upper()),
"description": "The server {} is hosted under a network which is known to host malicious activities. Even if this behavior is not malicious by itself, ".format(c["resolution"])
+ "you need to check if other alerts are also mentioning this host. If you have some doubts, please "
+ "search this host on the internet to see if its legit or not.",
"host": c["resolution"],
"level": "Moderate",
"id": "IOC-02"})
# Check for blacklisted domain.
for domain in bl_domains:
if c["resolution"].endswith(domain[0]):
if domain[1] != "tracker":
self.alerts.append({"title": "A DNS request have been done to {} which is tagged as {}.".format(c["resolution"], domain[1].upper()),
"description": "The domain name {} seen in the capture has been explicitly tagged as malicious. This indicates that ".format(c["resolution"])
+ "your device is likely compromised and needs to be investigated deeply.",
"host": c["resolution"],
"level": "High",
"id": "IOC-03"})
else:
self.alerts.append({"title": "A DNS request have been done to {} which is tagged as {}.".format(c["resolution"], domain[1].upper()),
"description": "The domain name {} seen in the capture has been explicitly tagged as a Tracker. This ".format(c["resolution"])
+ "indicates that one of the active apps is geo-tracking your moves.",
"host": c["resolution"],
"level": "Moderate",
"id": "IOC-03"})
# Check for blacklisted FreeDNS.
for domain in bl_freedns:
if c["resolution"].endswith("." + domain[0]):
self.alerts.append({"title": "A DNS request have been done to the domain {} which is a Free DNS.".format(c["resolution"]),
"description": "The domain name {} is using a Free DNS service. This kind of service is commonly used by cybercriminals ".format(c["resolution"])
+ "or state-sponsored threat actors during their operations.",
"host": c["resolution"],
"level": "Moderate",
"id": "IOC-04"})
# Check for suspect tlds.
for tld in bl_tlds:
if c["resolution"].endswith(tld[0]):
self.alerts.append({"title": "A DNS request have been done to the domain {} which contains a suspect TLD.".format(c["resolution"]),
"description": "The domain name {} is using a suspect Top Level Domain ({}). Even not malicious, this non-generic TLD is used regularly by cybercrime ".format(c["resolution"], tld[0])
+ "or state-sponsored operations. Please check this domain by searching it on an internet search engine. If other alerts are related to this "
+ "host, please consider it as very suspicious.",
"host": c["resolution"],
"level": "Low",
"id": "IOC-05"})
# Check for use of suspect nameservers.
try:
name_servers = pydig.query(c["resolution"], "NS")
except:
name_servers = []
if len(name_servers):
for ns in bl_nameservers:
if name_servers[0].endswith(".{}.".format(ns[0])):
self.alerts.append({"title": "The domain {} is using a suspect nameserver ({}).".format(c["resolution"], name_servers[0]),
"description": "The domain name {} is using a nameserver that has been explicitly tagged to be associated to malicious activities. ".format(c["resolution"])
+ "Many cybercriminals and state-sponsored threat actors are using this kind of registrars because they allow cryptocurrencies and anonymous payments.",
"host": c["resolution"],
"level": "Moderate",
"id": "IOC-06"})
def files_check(self, dir):
"""
Check on the files.log:
* Check certificates sha1
* [todo] Check possible binary data or APKs?
:return: nothing - all stuff appended to self.alerts
"""
if not self.iocs_analysis:
return
bl_certs = get_iocs("sha1cert")
if os.path.isfile(os.path.join(dir, "files.log")):
for record in ParseZeekLogs(os.path.join(dir, "files.log"), output_format="json", safe_headers=False):
if record is not None:
f = {"filename": record["filename"],
"ip_src": record["tx_hosts"],
"ip_dst": record["rx_hosts"],
"mime_type": record["mime_type"],
"sha1": record["sha1"]}
if f not in self.files:
self.files.append(f)
for f in self.files:
if f["mime_type"] == "application/x-x509-ca-cert":
for cert in bl_certs: # Check for blacklisted certificate.
if f["sha1"] == cert[0]:
host = self.resolve(f["ip_dst"])
self.alerts.append({"title": "A certificate associated to {} activities have been found in the communication to {}.".format(cert[1].upper(), host),
"description": "The certificate ({}) associated to {} has been explicitly tagged as malicious. This indicates that ".format(f["sha1"], host)
+ "your device is likely compromised and need a forensic analysis.",
"host": host,
"level": "High",
"id": "IOC-07"})
def ssl_check(self, dir):
"""
Check on the ssl.log:
* SSL connections which doesn't use the 443.
* "Free" certificate issuer (taken from the config).
* Self-signed certificates.
:return: nothing - all stuff appended to self.alerts
"""
ssl_default_ports = get_config(("analysis", "ssl_default_ports"))
free_issuers = get_config(("analysis", "free_issuers"))
if os.path.isfile(os.path.join(dir, "ssl.log")):
for record in ParseZeekLogs(os.path.join(dir, "ssl.log"), output_format="json", safe_headers=False):
if record is not None:
c = {"host": record['id.resp_h'],
"port": record['id.resp_p'],
"issuer": record["issuer"],
"validation_status": record["validation_status"]}
if c not in self.ssl:
self.ssl.append(c)
if self.heuristics_analysis:
for cert in self.ssl:
host = self.resolve(cert["host"])
# If the associated host has not whitelisted, check the cert.
if host in [c["resolution"] for c in self.conns]:
# Check for non generic SSL port.
if cert["port"] not in ssl_default_ports:
self.alerts.append({"title": "SSL connection done on an non standart port ({}) to {}".format(cert["port"], host),
"description": "It is not common to see SSL connections issued from smartphones using non-standard ports. Even this can be totally legit,"
+ " we recommend to check the reputation of {}, by looking at its WHOIS record, the associated autonomus system, its creation date, and ".format(host)
+ " by searching it the internet.",
"host": host,
"level": "Moderate",
"id": "SSL-01"})
# Check Free SSL certificates.
if cert["issuer"] in free_issuers:
self.alerts.append({"title": "An SSL connection to {} is using a free certificate.".format(host),
"description": "Free certificates — such as Let's Encrypt — are wildly used by command and control servers associated to "
+ "malicious implants or phishing web pages. We recommend to check the host associated to this certificate, "
+ "by looking at the domain name, its creation date, or by checking its reputation on the internet.",
"host": host,
"level": "Moderate",
"id": "SSL-02"})
# Check for self-signed certificates.
if cert["validation_status"] == "self signed certificate in certificate chain":
self.alerts.append({"title": "The certificate associated to {} is self-signed.".format(host),
"description": "The use of self-signed certificates is a common thing for attacker infrastructure. We recommend to check the host {} ".format(host)
+ "which is associated to this certificate, by looking at the domain name (if any), its WHOIS record, its creation date, and "
+ " by checking its reputation on the internet.",
"host": host,
"level": "Moderate",
"id": "SSL-03"})
def alerts_check(self):
"""
Leverage an advice to the user based on the trigered hosts
:return: nothing - all generated alerts appended to self.alerts
"""
hosts = {}
for alert in [dict(t) for t in {tuple(d.items()) for d in self.alerts}]:
if alert["host"] not in hosts:
hosts[alert["host"]] = 1
else:
hosts[alert["host"]] += 1
for host, nb in hosts.items():
if nb >= get_config(("analysis", "max_alerts")):
self.alerts.append({"title": "Check alerts for {}".format(host),
"description": "Please, check the reputation of the host {}, this one seems to be malicious as it leveraged {} alerts during the session.".format(host, nb),
"host": host,
"level": "High",
"id": "ADV-01"})
def resolve(self, ip_addr):
"""
A simple method to retreive DNS names from IP addresses
in order to replace them in alerts.
:return: String - DNS record or IP Address.
"""
for record in self.dns:
if ip_addr in record["answers"]:
return record["domain"]
return ip_addr
def start_zeek(self):
"""
Start zeek and check the logs.
"""
sp.Popen("cd {} && /opt/zeek/bin/zeek -Cr capture.pcap protocols/ssl/validate-certs".format(
self.working_dir), shell=True).wait()
sp.Popen("cd {} && mkdir assets".format(
self.working_dir), shell=True).wait()
sp.Popen("cd {} && mv *.log assets/".format(self.working_dir),
shell=True).wait()
self.fill_dns(self.working_dir + "/assets/")
self.netflow_check(self.working_dir + "/assets/")
self.ssl_check(self.working_dir + "/assets/")
self.files_check(self.working_dir + "/assets/")
self.alerts_check()
def get_alerts(self):
"""
Retreive alerts.
:return: list - a list of alerts wihout duplicates.
"""
return [dict(t) for t in {tuple(d.items()) for d in self.alerts}]

74
analysis/utils.py Normal file
View File

@ -0,0 +1,74 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sqlite3
import datetime
import yaml
import sys
import json
import os
from functools import reduce
# I'm not going to use an ORM for that.
parent = "/".join(sys.path[0].split("/")[:-1])
conn = sqlite3.connect(os.path.join(parent, "tinycheck.sqlite3"))
cursor = conn.cursor()
def get_iocs(ioc_type):
"""
Get a list of IOCs specified by their type.
:return: list of IOCs
"""
cursor.execute(
"SELECT value, tag FROM iocs WHERE type = ? ORDER BY value", (ioc_type,))
res = cursor.fetchall()
return [[r[0], r[1]] for r in res] if res is not None else []
def get_whitelist(elem_type):
"""
Get a list of whitelisted elements specified by their type.
:return: list of elements
"""
cursor.execute(
"SELECT element FROM whitelist WHERE type = ? ORDER BY element", (elem_type,))
res = cursor.fetchall()
return [r[0] for r in res] if res is not None else []
def get_config(path):
"""
Read a value from the configuration
:return: value (it can be any type)
"""
config = yaml.load(open(os.path.join(parent, "config.yaml"),
"r"), Loader=yaml.SafeLoader)
return reduce(dict.get, path, config)
def get_device(token):
"""
Read the device configuration from device.json file.
:return: dict - the device configuration
"""
try:
with open("/tmp/{}/device.json".format(token), "r") as f:
return json.load(f)
except:
pass
def get_apname():
"""
Read the current name of the Access Point from
the hostapd configuration file
:return: str - the AP name
"""
try:
with open("/tmp/hostapd.conf", "r") as f:
for l in f.readlines():
if "ssid=" in l:
return l.replace("ssid=", "").strip()
except:
pass