import weasyprint import os import json import hashlib import re import sys from weasyprint import HTML from pathlib import Path from datetime import datetime from utils import get_config class Report(object): def __init__(self, capture_directory, analysis_duration): self.capture_directory = capture_directory self.alerts = self.read_json(os.path.join(capture_directory, "assets/alerts.json")) self.records = self.read_json(os.path.join(capture_directory, "assets/records.json")) self.methods = self.read_json(os.path.join(capture_directory, "assets/detection_methods.json")) self.device = self.read_json(os.path.join(capture_directory, "assets/device.json")) self.capinfos = self.read_json(os.path.join(capture_directory, "assets/capinfos.json")) self.instance = self.read_json(os.path.join(capture_directory, "assets/instance.json")) self.analysis_duration = analysis_duration with open(os.path.join(self.capture_directory, "capture.pcap"), "rb") as f: self.capture_sha1 = hashlib.sha1(f.read()).hexdigest() self.userlang = get_config(("frontend", "user_lang")) # Load template language if not re.match("^[a-z]{2,3}$", self.userlang): self.userlang = "en" with open(os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "locales/{}.json".format(self.userlang))) as f: self.template = json.load(f)["report"] def read_json(self, json_path): """Read a JSON Args: json_path (_type_): _description_ Returns: _type_: _description_ """ with open(json_path, "r") as json_file: return json.load(json_file) def generate_report(self): """Generate the full report and save it as report.pdf """ content = self.generate_page_header() content += self.generate_header() content += self.generate_warning() content += self.generate_alerts() content += self.generate_suspect_conns_block() content += self.generate_uncat_conns_block() content += self.generate_whitelist_block() htmldoc = HTML(string=content, base_url="").write_pdf() Path(os.path.join(self.capture_directory, "report.pdf")).write_bytes(htmldoc) def generate_warning(self): """Generate the main warning message on the report Returns: string: HTML code of the main warning message. """ if len(self.alerts["high"]): msg = "
" msg += self.template["high_msg"].format( self.nb_translate(len(self.alerts["high"]))) msg += "
" return msg elif len(self.alerts["moderate"]): msg = "
" msg += self.template["moderate_msg"].format( self.nb_translate(len(self.alerts["moderate"]))) msg += "
" return msg elif len(self.alerts["low"]): msg = "
" msg += self.template["low_msg"].format( self.nb_translate(len(self.alerts["low"]))) msg += "
" return msg else: msg = "
" msg += self.template["none_msg"] msg += "
" return msg def nb_translate(self, nb): """ Translate a number in a string. Args: nb (int): integer Returns: string: A translated string related to the integer """ a = self.template["numbers"] return a[nb-1] if nb <= 9 else str(nb) def generate_suspect_conns_block(self): """Generate the block and the table of communications categorized as suspect. Returns: str: HTML block of the suspect communications """ tbody = "" for record in self.records: if record["suspicious"] == True: tbody += "" tbody += f"{', '.join([p['name'] for p in record['protocols']])}" tbody += f"{', '.join(record['domains'])}" tbody += f"{record['ip_dst']}" tbody += f"{', '.join([str(p['port']) if p['port'] != -1 else '--' for p in record['protocols']])}" tbody += "" if len(tbody): title = f"

{self.template['suspect_title']}

" table = "" table += " " table += " " table += f" " table += f" " table += f" " table += f" " table += " " table += " " table += "" table += tbody table += "
{self.template['protocol']}{self.template['domain']}{self.template['dst_ip']}{self.template['dst_port']}
" return title + table else: return "" def generate_uncat_conns_block(self): """Generate the block and the table of the uncategorized communications Returns: str: HTML block of the uncategorized communications """ tbody = "" for record in self.records: if record["suspicious"] == False and record["whitelisted"] == False: tbody += "" tbody += f"{', '.join([p['name'] for p in record['protocols']])}" tbody += f"{', '.join(record['domains'])}" tbody += f"{record['ip_dst']}" tbody += f"{', '.join([str(p['port']) if p['port'] != -1 else '--' for p in record['protocols']])}" tbody += "" if len(tbody): title = "

{}

".format(self.template["uncat_title"]) table = "" table += " " table += " " table += f" " table += f" " table += f" " table += f" " table += " " table += " " table += "" table += tbody table += "
{self.template['protocol']}{self.template['domain']}{self.template['dst_ip']}{self.template['dst_port']}
" return title + table else: return "" def generate_whitelist_block(self): """Generate the block and the table of the whitelisted communications Returns: str: HTML block of the whitelisted communications """ tbody = "" for record in self.records: if record["whitelisted"] == True: tbody += "" tbody += f"{', '.join([p['name'] for p in record['protocols']])}" tbody += f"{', '.join(record['domains'])}" tbody += f"{record['ip_dst']}" tbody += f"{', '.join([str(p['port']) if p['port'] != -1 else '--' for p in record['protocols']])}" tbody += "" if len(tbody): title = "

{}

".format(self.template["whitelist_title"]) table = "" table += " " table += " " table += f" " table += f" " table += f" " table += f" " table += " " table += " " table += "" table += tbody table += "
{self.template['protocol']}{self.template['domain']}{self.template['dst_ip']}{self.template['dst_port']}
" return title + table else: return "" def generate_header(self): """Generate the report headers with the capture's metadata. Returns: str: HTML block containing the data. """ header = "
" header += "
" header += f"


{self.template['device_mac']}: {self.device['mac_address']}
" header += f"{self.template['detection_methods']}: {'☑' if self.methods['iocs'] else '☐'} IOCs {'☑' if self.methods['heuristics'] else '☐'} Heuristics {'☑' if self.methods['active'] else '☐'} Active analysis
" header += f"{self.template['capture_sha1']}: {self.capture_sha1}
" header += f"{self.template['instance_uuid']}: {self.instance['instance_uuid']}
" header += f"{self.template['report_generated_on']} {datetime.now().strftime('%d/%m/%Y - %H:%M:%S')}
" if self.capinfos is not None: header += f"{self.template['capture_duration']}: {self.capinfos['Capture duration'].split(' ')[0]} {self.template['seconds']}
" header += f"{self.template['analysis_duration']}: {self.analysis_duration} {self.template['seconds']}
" header += f"{self.template['packets_number']}: {self.capinfos['Number of packets']}
" header += "

" header += "
" return header def generate_alerts(self): """Generate a block embedding the alerts triggered during the analysis. Returns: str: HTML block containing the data. """ alerts = "" return alerts def generate_page_footer(self): """Generate the page footer Returns: str: HTML block closing the page """ return "" def generate_page_header(self): """Generate the page header Returns: str: HTML block containing the page header with the CSS. """ return """ """.replace("REPORT_HEADER", "{} {}".format(self.template["report_for_the_capture"], self.capture_sha1)).replace("REPORT_FOOTER", self.template["report_footer"])