diff --git a/analysis/analysis.py b/analysis/analysis.py index c153293..345e82e 100644 --- a/analysis/analysis.py +++ b/analysis/analysis.py @@ -3,6 +3,7 @@ from classes.zeekengine import ZeekEngine from classes.suricataengine import SuricataEngine +from classes.report import Report from multiprocessing import Process, Manager import sys import re @@ -20,14 +21,23 @@ if __name__ == "__main__": capture_directory = sys.argv[1] if os.path.isdir(capture_directory): - # Alerts bucket. manager = Manager() alerts = manager.dict() def zeekengine(alerts): zeek = ZeekEngine(capture_directory) zeek.start_zeek() - alerts["zeek"] = zeek.get_alerts() + alerts["zeek"] = zeek.retrieve_alerts() + + # whitelist.json writing. + with open(os.path.join(capture_directory, "assets/whitelist.json"), "w") as f: + f.write(json.dumps(zeek.retrieve_whitelist(), + indent=4, separators=(',', ': '))) + + # conns.json writing. + with open(os.path.join(capture_directory, "assets/conns.json"), "w") as f: + f.write(json.dumps(zeek.retrieve_conns(), + indent=4, separators=(',', ': '))) def snortengine(alerts): suricata = SuricataEngine(capture_directory) @@ -45,7 +55,7 @@ if __name__ == "__main__": p2.join() # Some formating and alerts.json writing. - with open(os.path.join(capture_directory, "alerts.json"), "w") as f: + with open(os.path.join(capture_directory, "assets/alerts.json"), "w") as f: report = {"high": [], "moderate": [], "low": []} for alert in (alerts["zeek"] + alerts["suricata"]): if alert["level"] == "High": @@ -54,7 +64,11 @@ if __name__ == "__main__": report["moderate"].append(alert) if alert["level"] == "Low": report["low"].append(alert) - f.write(json.dumps(report)) + f.write(json.dumps(report, indent=4, separators=(',', ': '))) + + # Generate the report + report = Report(capture_directory) + report.generate_report() else: print("The directory doesn't exist.") else: diff --git a/analysis/classes/report.py b/analysis/classes/report.py new file mode 100644 index 0000000..49a9b78 --- /dev/null +++ b/analysis/classes/report.py @@ -0,0 +1,431 @@ +import weasyprint +import os +import json +import hashlib + +from weasyprint import HTML +from pathlib import Path +from datetime import datetime + + +class Report(object): + + def __init__(self, capture_directory): + self.capture_directory = capture_directory + self.alerts = self.read_json(os.path.join( + capture_directory, "assets/alerts.json")) + self.whitelist = self.read_json(os.path.join( + capture_directory, "assets/whitelist.json")) + self.conns = self.read_json(os.path.join( + capture_directory, "assets/conns.json")) + self.device = self.read_json(os.path.join( + capture_directory, "assets/device.json")) + + try: + with open(os.path.join(self.capture_directory, "capture.pcap"), "rb") as f: + self.capture_sha1 = hashlib.sha1(f.read()).hexdigest() + except: + self.capture_sha1 = "N/A" + + def read_json(self, json_path): + """ + Read and convert a JSON file. + :return: array or dict. + """ + with open(json_path, "r") as json_file: + return json.load(json_file) + + def generate_report(self): + """ + Generate the full report in PDF + :return: nothing + """ + content = self.generate_page_header() + content += self.generate_header() + content += self.generate_warning() + content += self.generate_alerts() + content += self.generate_suspect_conns_block() + content += self.generate_uncat_conns_block() + content += self.generate_whitelist_block() + + htmldoc = HTML(string=content, base_url="").write_pdf() + Path(os.path.join(self.capture_directory, + "report.pdf")).write_bytes(htmldoc) + + def generate_warning(self): + """ + Generate the warning message. + :return: str + """ + if len(self.alerts["high"]): + return "
Your device seems to be compromised as you have {} high alerts.
".format(self.nb_translate(len(self.alerts["high"]))) + elif len(self.alerts["moderate"]): + return "
You have {} moderate alerts, your device might be compromised. Please look at them carefully.
".format(self.nb_translate(len(self.alerts["moderate"]))) + elif len(self.alerts["low"]): + return "
You have only {} low alerts, don't hesitate to check them.
".format(self.nb_translate(len(self.alerts["low"]))) + else: + return "
Everything looks fine, zero alerts. Don't hesitate to check the uncategorized communications, if any.
" + + def nb_translate(self, nb): + """ + Translate a number in a string. + :return: str + """ + a = ["one", "two", "three", "four", "five", + "six", "seven", "height", "nine"] + return a[nb-1] if nb <= 9 else str(nb) + + def generate_suspect_conns_block(self): + """ + Generate the table of the network non-whitelisted communications. + :return: string + """ + + if not len([c for c in self.conns if c["alert_tiggered"] == True]): + return "" + + title = "

Suspect communications

" + table = """ + + + + + + + + + """ + + for rec in self.conns: + if rec["alert_tiggered"] == True: + table += "" + table += "".format(rec["proto"].upper()) + table += "".format(rec["resolution"] + if rec["resolution"] != rec["ip_dst"] else "--") + table += "".format(rec["ip_dst"]) + table += "".format(rec["port_dst"]) + table += "" + table += "
ProtocolDomainDst IP AddressDst port
{}{}{}{}
" + return title + table + + def generate_uncat_conns_block(self): + """ + Generate the table of the network non-whitelisted communications. + :return: string + """ + if not len([c for c in self.conns if c["alert_tiggered"] == False]): + return "" + + title = "

Uncategorized communications

" + table = """ + + + + + + + + + """ + + for rec in self.conns: + if rec["alert_tiggered"] == False: + table += "" + table += "".format(rec["proto"].upper()) + table += "".format(rec["resolution"] + if rec["resolution"] != rec["ip_dst"] else "--") + table += "".format(rec["ip_dst"]) + table += "".format(rec["port_dst"]) + table += "" + table += "
ProtocolDomainDst IP AddressDst port
{}{}{}{}
" + return title + table + + def generate_whitelist_block(self): + """ + Generate the table of the whitelisted communications. + :return: string + """ + if not len(self.whitelist): + return "" + + title = "

Whitelisted communications

" + table = """ + + + + + + + + + """ + + for rec in sorted(self.whitelist, key=lambda k: k['resolution']): + table += "" + table += "".format(rec["proto"].upper()) + table += "".format(rec["resolution"] + if rec["resolution"] != rec["ip_dst"] else "--") + table += "".format(rec["ip_dst"]) + table += "".format(rec["port_dst"]) + table += "" + table += "
ProtocolDomainDst IP AddressDst port
{}{}{}{}
" + return title + table + + def generate_header(self): + """ + Generate the report header with context data. + :return: string + """ + header = "
" + header += "
" + header += "


Device name: {}
".format( + self.device["name"]) + header += "Device MAC address: {}
".format( + self.device["mac_address"]) + header += "Report generated on {}
".format( + datetime.now().strftime("%d/%m/%Y at %H:%M:%S")) + + header += "Capture SHA1: {}
".format(self.capture_sha1) + header += "

" + header += "
" + return header + + def generate_alerts(self): + """ + Generate the alerts. + :return: string + """ + alerts = "" + return alerts + + def generate_page_footer(self): + """ + Generate the html footer. + :return: string + """ + return "" + + def generate_page_header(self): + """ + Generate the html header. + :return: string + """ + return """ + + + """.replace("REPORT_HEADER", "Report for the capture {}".format(self.capture_sha1)) diff --git a/analysis/classes/zeekengine.py b/analysis/classes/zeekengine.py index 13fec83..58837cc 100644 --- a/analysis/classes/zeekengine.py +++ b/analysis/classes/zeekengine.py @@ -22,11 +22,12 @@ class ZeekEngine(object): self.http = [] self.dns = [] self.files = [] + self.whitelist = [] # Get analysis configuration self.heuristics_analysis = get_config(("analysis", "heuristics")) self.iocs_analysis = get_config(("analysis", "iocs")) - self.white_analysis = get_config(("analysis", "whitelist")) + self.whitelist_analysis = get_config(("analysis", "whitelist")) def fill_dns(self, dir): """ @@ -57,7 +58,8 @@ class ZeekEngine(object): c = {"ip_dst": record["id.resp_h"], "proto": record["proto"], "port_dst": record["id.resp_p"], - "service": record["service"]} + "service": record["service"], + "alert_tiggered": False} if c not in self.conns: self.conns.append(c) @@ -69,7 +71,7 @@ class ZeekEngine(object): self.conns = sorted(self.conns, key=lambda c: c["resolution"]) # Check for whitelisted assets, if any, delete the record. - if self.white_analysis: + if self.whitelist_analysis: wl_cidrs = [IPNetwork(cidr) for cidr in get_whitelist("cidr")] wl_hosts = get_whitelist("ip4addr") + get_whitelist("ip6addr") @@ -77,12 +79,16 @@ class ZeekEngine(object): for i, c in enumerate(self.conns): if c["ip_dst"] in [ip for ip in wl_hosts]: + self.whitelist.append(self.conns[i]) self.conns[i] = False elif c["resolution"] in wl_domains: + self.whitelist.append(self.conns[i]) self.conns[i] = False elif True in [c["resolution"].endswith("." + dom) for dom in wl_domains]: + self.whitelist.append(self.conns[i]) self.conns[i] = False elif True in [IPAddress(c["ip_dst"]) in cidr for cidr in wl_cidrs]: + self.whitelist.append(self.conns[i]) self.conns[i] = False # Let's delete whitelisted connections. @@ -92,6 +98,7 @@ class ZeekEngine(object): for c in self.conns: # Check for UDP / ICMP (strange from a smartphone.) if c["proto"] in ["UDP", "ICMP"]: + c["alert_tiggered"] = True self.alerts.append({"title": "{} communication going outside the local network to {}.".format(c["proto"].upper(), c["resolution"]), "description": "The {} protocol is commonly used in internal networks. Please, verify if the host {} leveraged other alerts which may ".format(c["proto"].upper(), c["resolution"]) + "indicates a possible malicious behavior.", @@ -100,6 +107,7 @@ class ZeekEngine(object): "id": "PROTO-01"}) # Check for use of ports over 1024. if c["port_dst"] >= max_ports: + c["alert_tiggered"] = True self.alerts.append({"title": "{} connection to {} to a port over or equal to {}.".format(c["proto"].upper(), c["resolution"], max_ports), "description": "{} connections have been seen to {} by using the port {}. The use of non-standard port can be sometimes associated to malicious activities. ".format(c["proto"].upper(), c["resolution"], c["port_dst"]) + "We recommend to check if this host has a good reputation by looking on other alerts and search it on the internet.", @@ -108,6 +116,7 @@ class ZeekEngine(object): "id": "PROTO-02"}) # Check for use of HTTP. if c["service"] == "http" and c["port_dst"] == http_default_port: + c["alert_tiggered"] = True self.alerts.append({"title": "HTTP communications been done to the host {}".format(c["resolution"]), "description": "Your device exchanged with the host {} by using HTTP, an unencrypted protocol. ".format(c["resolution"]) + "Even if this behavior is not malicious by itself, it is unusual to see HTTP communications issued from smartphone applications " @@ -118,6 +127,7 @@ class ZeekEngine(object): # Check for use of HTTP on a non standard port. if c["service"] == "http" and c["port_dst"] != http_default_port: + c["alert_tiggered"] = True self.alerts.append({"title": "HTTP communications have been seen to the host {} on a non standard port ({}).".format(c["resolution"], c["port_dst"]), "description": "Your device exchanged with the host {} by using HTTP, an unencrypted protocol on the port {}. ".format(c["resolution"], c["port_dst"]) + "This behavior is quite unusual. Please check the host reputation by searching it on the internet.", @@ -126,6 +136,7 @@ class ZeekEngine(object): "id": "PROTO-04"}) # Check for non-resolved IP address. if c["service"] == c["resolution"]: + c["alert_tiggered"] = True self.alerts.append({"title": "The server {} hasn't been resolved by any DNS query during the session".format(c["ip_dst"]), "description": "It means that the server {} is likely not resolved by any domain name or the resolution has already been cached by ".format(c["ip_dst"]) + "the device. If the host appears in other alerts, please check it.", @@ -147,6 +158,7 @@ class ZeekEngine(object): # Check for blacklisted IP address. for host in bl_hosts: if c["ip_dst"] == host[0]: + c["alert_tiggered"] = True self.alerts.append({"title": "A connection has been made to {} ({}) which is tagged as {}.".format(c["resolution"], c["ip_dst"], host[1].upper()), "description": "The host {} has been explicitly blacklisted for malicious activities. Your device is likely compromised ".format(c["ip_dst"]) + "and needs to be investigated more deeply by IT security professionals.", @@ -158,6 +170,7 @@ class ZeekEngine(object): # Check for blacklisted CIDR. for cidr in bl_cidrs: if IPAddress(c["ip_dst"]) in cidr[0]: + c["alert_tiggered"] = True self.alerts.append({"title": "Communication to {} under the CIDR {} which is tagged as {}.".format(c["resolution"], cidr[0], cidr[1].upper()), "description": "The server {} is hosted under a network which is known to host malicious activities. Even if this behavior is not malicious by itself, ".format(c["resolution"]) + "you need to check if other alerts are also mentioning this host. If you have some doubts, please " @@ -169,6 +182,7 @@ class ZeekEngine(object): for domain in bl_domains: if c["resolution"].endswith(domain[0]): if domain[1] != "tracker": + c["alert_tiggered"] = True self.alerts.append({"title": "A DNS request have been done to {} which is tagged as {}.".format(c["resolution"], domain[1].upper()), "description": "The domain name {} seen in the capture has been explicitly tagged as malicious. This indicates that ".format(c["resolution"]) + "your device is likely compromised and needs to be investigated deeply.", @@ -176,6 +190,7 @@ class ZeekEngine(object): "level": "High", "id": "IOC-03"}) else: + c["alert_tiggered"] = True self.alerts.append({"title": "A DNS request have been done to {} which is tagged as {}.".format(c["resolution"], domain[1].upper()), "description": "The domain name {} seen in the capture has been explicitly tagged as a Tracker. This ".format(c["resolution"]) + "indicates that one of the active apps is geo-tracking your moves.", @@ -185,6 +200,7 @@ class ZeekEngine(object): # Check for blacklisted FreeDNS. for domain in bl_freedns: if c["resolution"].endswith("." + domain[0]): + c["alert_tiggered"] = True self.alerts.append({"title": "A DNS request have been done to the domain {} which is a Free DNS.".format(c["resolution"]), "description": "The domain name {} is using a Free DNS service. This kind of service is commonly used by cybercriminals ".format(c["resolution"]) + "or state-sponsored threat actors during their operations.", @@ -195,6 +211,7 @@ class ZeekEngine(object): # Check for suspect tlds. for tld in bl_tlds: if c["resolution"].endswith(tld[0]): + c["alert_tiggered"] = True self.alerts.append({"title": "A DNS request have been done to the domain {} which contains a suspect TLD.".format(c["resolution"]), "description": "The domain name {} is using a suspect Top Level Domain ({}). Even not malicious, this non-generic TLD is used regularly by cybercrime ".format(c["resolution"], tld[0]) + "or state-sponsored operations. Please check this domain by searching it on an internet search engine. If other alerts are related to this " @@ -212,6 +229,7 @@ class ZeekEngine(object): if len(name_servers): for ns in bl_nameservers: if name_servers[0].endswith(".{}.".format(ns[0])): + c["alert_tiggered"] = True self.alerts.append({"title": "The domain {} is using a suspect nameserver ({}).".format(c["resolution"], name_servers[0]), "description": "The domain name {} is using a nameserver that has been explicitly tagged to be associated to malicious activities. ".format(c["resolution"]) + "Many cybercriminals and state-sponsored threat actors are using this kind of registrars because they allow cryptocurrencies and anonymous payments.", @@ -248,6 +266,7 @@ class ZeekEngine(object): for cert in bl_certs: # Check for blacklisted certificate. if f["sha1"] == cert[0]: host = self.resolve(f["ip_dst"]) + c["alert_tiggered"] = True self.alerts.append({"title": "A certificate associated to {} activities have been found in the communication to {}.".format(cert[1].upper(), host), "description": "The certificate ({}) associated to {} has been explicitly tagged as malicious. This indicates that ".format(f["sha1"], host) + "your device is likely compromised and need a forensic analysis.", @@ -281,35 +300,38 @@ class ZeekEngine(object): host = self.resolve(cert["host"]) # If the associated host has not whitelisted, check the cert. - if host in [c["resolution"] for c in self.conns]: - - # Check for non generic SSL port. - if cert["port"] not in ssl_default_ports: - self.alerts.append({"title": "SSL connection done on an non standart port ({}) to {}".format(cert["port"], host), - "description": "It is not common to see SSL connections issued from smartphones using non-standard ports. Even this can be totally legit," - + " we recommend to check the reputation of {}, by looking at its WHOIS record, the associated autonomus system, its creation date, and ".format(host) - + " by searching it the internet.", - "host": host, - "level": "Moderate", - "id": "SSL-01"}) - # Check Free SSL certificates. - if cert["issuer"] in free_issuers: - self.alerts.append({"title": "An SSL connection to {} is using a free certificate.".format(host), - "description": "Free certificates — such as Let's Encrypt — are wildly used by command and control servers associated to " - + "malicious implants or phishing web pages. We recommend to check the host associated to this certificate, " - + "by looking at the domain name, its creation date, or by checking its reputation on the internet.", - "host": host, - "level": "Moderate", - "id": "SSL-02"}) - # Check for self-signed certificates. - if cert["validation_status"] == "self signed certificate in certificate chain": - self.alerts.append({"title": "The certificate associated to {} is self-signed.".format(host), - "description": "The use of self-signed certificates is a common thing for attacker infrastructure. We recommend to check the host {} ".format(host) - + "which is associated to this certificate, by looking at the domain name (if any), its WHOIS record, its creation date, and " - + " by checking its reputation on the internet.", - "host": host, - "level": "Moderate", - "id": "SSL-03"}) + for c in self.conns: + if host in c["resolution"]: + # Check for non generic SSL port. + if cert["port"] not in ssl_default_ports: + c["alert_tiggered"] = True + self.alerts.append({"title": "SSL connection done on an non standart port ({}) to {}".format(cert["port"], host), + "description": "It is not common to see SSL connections issued from smartphones using non-standard ports. Even this can be totally legit," + + " we recommend to check the reputation of {}, by looking at its WHOIS record, the associated autonomus system, its creation date, and ".format(host) + + " by searching it the internet.", + "host": host, + "level": "Moderate", + "id": "SSL-01"}) + # Check Free SSL certificates. + if cert["issuer"] in free_issuers: + c["alert_tiggered"] = True + self.alerts.append({"title": "An SSL connection to {} is using a free certificate.".format(host), + "description": "Free certificates — such as Let's Encrypt — are wildly used by command and control servers associated to " + + "malicious implants or phishing web pages. We recommend to check the host associated to this certificate, " + + "by looking at the domain name, its creation date, or by checking its reputation on the internet.", + "host": host, + "level": "Moderate", + "id": "SSL-02"}) + # Check for self-signed certificates. + if cert["validation_status"] == "self signed certificate in certificate chain": + c["alert_tiggered"] = True + self.alerts.append({"title": "The certificate associated to {} is self-signed.".format(host), + "description": "The use of self-signed certificates is a common thing for attacker infrastructure. We recommend to check the host {} ".format(host) + + "which is associated to this certificate, by looking at the domain name (if any), its WHOIS record, its creation date, and " + + " by checking its reputation on the internet.", + "host": host, + "level": "Moderate", + "id": "SSL-03"}) def alerts_check(self): """ @@ -350,8 +372,6 @@ class ZeekEngine(object): """ sp.Popen("cd {} && /opt/zeek/bin/zeek -Cr capture.pcap protocols/ssl/validate-certs".format( self.working_dir), shell=True).wait() - sp.Popen("cd {} && mkdir assets".format( - self.working_dir), shell=True).wait() sp.Popen("cd {} && mv *.log assets/".format(self.working_dir), shell=True).wait() @@ -361,9 +381,23 @@ class ZeekEngine(object): self.files_check(self.working_dir + "/assets/") self.alerts_check() - def get_alerts(self): + def retrieve_alerts(self): """ - Retreive alerts. + Retrieve alerts. :return: list - a list of alerts wihout duplicates. """ return [dict(t) for t in {tuple(d.items()) for d in self.alerts}] + + def retrieve_whitelist(self): + """ + Retrieve whitelisted elements. + :return: list - a list of whitelisted elements wihout duplicates. + """ + return [dict(t) for t in {tuple(d.items()) for d in self.whitelist}] + + def retrieve_conns(self): + """ + Retrieve not whitelisted elements. + :return: list - a list of non-whitelisted elements wihout duplicates. + """ + return self.conns diff --git a/assets/requirements.txt b/assets/requirements.txt index 68cdf5f..0aa7606 100644 --- a/assets/requirements.txt +++ b/assets/requirements.txt @@ -12,4 +12,5 @@ psutil pyudev wifi qrcode -netifaces \ No newline at end of file +netifaces +weasyprint \ No newline at end of file diff --git a/server/frontend/app/classes/analysis.py b/server/frontend/app/classes/analysis.py index 8a20d75..0b865ef 100644 --- a/server/frontend/app/classes/analysis.py +++ b/server/frontend/app/classes/analysis.py @@ -44,13 +44,13 @@ class Analysis(object): device, alerts = {}, {} # Getting device configuration. - if os.path.isfile("/tmp/{}/device.json".format(self.token)): - with open("/tmp/{}/device.json".format(self.token), "r") as f: + if os.path.isfile("/tmp/{}/assets/device.json".format(self.token)): + with open("/tmp/{}/assets/device.json".format(self.token), "r") as f: device = json.load(f) # Getting alerts configuration. - if os.path.isfile("/tmp/{}/alerts.json".format(self.token)): - with open("/tmp/{}/alerts.json".format(self.token), "r") as f: + if os.path.isfile("/tmp/{}/assets/alerts.json".format(self.token)): + with open("/tmp/{}/assets/alerts.json".format(self.token), "r") as f: alerts = json.load(f) if device != {} and alerts != {}: diff --git a/server/frontend/app/classes/capture.py b/server/frontend/app/classes/capture.py index 944a415..7e0bc12 100644 --- a/server/frontend/app/classes/capture.py +++ b/server/frontend/app/classes/capture.py @@ -15,7 +15,8 @@ import re class Capture(object): def __init__(self): - self.working_dir = False + self.capture_dir = False + self.assets_dir = False self.capture_token = False self.random_choice_alphabet = "ABCDEF1234567890" @@ -33,16 +34,18 @@ class Capture(object): # Few context variable assignment self.capture_token = "".join( [random.choice(self.random_choice_alphabet) for i in range(8)]) - self.working_dir = "/tmp/{}/".format(self.capture_token) - self.pcap = self.working_dir + "capture.pcap" + self.capture_dir = "/tmp/{}/".format(self.capture_token) + self.assets_dir = "/tmp/{}/assets/".format(self.capture_token) + self.pcap = self.capture_dir + "capture.pcap" self.iface = read_config(("network", "in")) # For packets monitoring self.list_pkts = [] self.last_pkts = 0 - # Make the capture directory - mkdir(self.working_dir) + # Make the capture and the assets directory + mkdir(self.capture_dir) + mkdir(self.assets_dir) try: sp.Popen(["tshark", "-i", self.iface, "-w", diff --git a/server/frontend/app/classes/device.py b/server/frontend/app/classes/device.py index a58a365..2d589ac 100644 --- a/server/frontend/app/classes/device.py +++ b/server/frontend/app/classes/device.py @@ -20,13 +20,13 @@ class Device(object): :return: dict containing device properties. """ - if not os.path.isfile("/tmp/{}/device.json".format(self.token)): + if not os.path.isfile("/tmp/{}/assets/device.json".format(self.token)): device = self.read_leases() if device["status"] != False: - with open("/tmp/{}/device.json".format(self.token), "w") as f: + with open("/tmp/{}/assets/device.json".format(self.token), "w") as f: f.write(json.dumps(device)) else: - with open("/tmp/{}/device.json".format(self.token)) as f: + with open("/tmp/{}/assets/device.json".format(self.token)) as f: device = json.load(f) return device