Adding the export of PDF report

This commit is contained in:
Félix Aime 2021-01-06 21:19:03 +01:00
parent d59c1c9cfa
commit 8f56909e4f
7 changed files with 536 additions and 53 deletions

View File

@ -3,6 +3,7 @@
from classes.zeekengine import ZeekEngine from classes.zeekengine import ZeekEngine
from classes.suricataengine import SuricataEngine from classes.suricataengine import SuricataEngine
from classes.report import Report
from multiprocessing import Process, Manager from multiprocessing import Process, Manager
import sys import sys
import re import re
@ -20,14 +21,23 @@ if __name__ == "__main__":
capture_directory = sys.argv[1] capture_directory = sys.argv[1]
if os.path.isdir(capture_directory): if os.path.isdir(capture_directory):
# Alerts bucket.
manager = Manager() manager = Manager()
alerts = manager.dict() alerts = manager.dict()
def zeekengine(alerts): def zeekengine(alerts):
zeek = ZeekEngine(capture_directory) zeek = ZeekEngine(capture_directory)
zeek.start_zeek() zeek.start_zeek()
alerts["zeek"] = zeek.get_alerts() alerts["zeek"] = zeek.retrieve_alerts()
# whitelist.json writing.
with open(os.path.join(capture_directory, "assets/whitelist.json"), "w") as f:
f.write(json.dumps(zeek.retrieve_whitelist(),
indent=4, separators=(',', ': ')))
# conns.json writing.
with open(os.path.join(capture_directory, "assets/conns.json"), "w") as f:
f.write(json.dumps(zeek.retrieve_conns(),
indent=4, separators=(',', ': ')))
def snortengine(alerts): def snortengine(alerts):
suricata = SuricataEngine(capture_directory) suricata = SuricataEngine(capture_directory)
@ -45,7 +55,7 @@ if __name__ == "__main__":
p2.join() p2.join()
# Some formating and alerts.json writing. # Some formating and alerts.json writing.
with open(os.path.join(capture_directory, "alerts.json"), "w") as f: with open(os.path.join(capture_directory, "assets/alerts.json"), "w") as f:
report = {"high": [], "moderate": [], "low": []} report = {"high": [], "moderate": [], "low": []}
for alert in (alerts["zeek"] + alerts["suricata"]): for alert in (alerts["zeek"] + alerts["suricata"]):
if alert["level"] == "High": if alert["level"] == "High":
@ -54,7 +64,11 @@ if __name__ == "__main__":
report["moderate"].append(alert) report["moderate"].append(alert)
if alert["level"] == "Low": if alert["level"] == "Low":
report["low"].append(alert) report["low"].append(alert)
f.write(json.dumps(report)) f.write(json.dumps(report, indent=4, separators=(',', ': ')))
# Generate the report
report = Report(capture_directory)
report.generate_report()
else: else:
print("The directory doesn't exist.") print("The directory doesn't exist.")
else: else:

431
analysis/classes/report.py Normal file

File diff suppressed because one or more lines are too long

View File

@ -22,11 +22,12 @@ class ZeekEngine(object):
self.http = [] self.http = []
self.dns = [] self.dns = []
self.files = [] self.files = []
self.whitelist = []
# Get analysis configuration # Get analysis configuration
self.heuristics_analysis = get_config(("analysis", "heuristics")) self.heuristics_analysis = get_config(("analysis", "heuristics"))
self.iocs_analysis = get_config(("analysis", "iocs")) self.iocs_analysis = get_config(("analysis", "iocs"))
self.white_analysis = get_config(("analysis", "whitelist")) self.whitelist_analysis = get_config(("analysis", "whitelist"))
def fill_dns(self, dir): def fill_dns(self, dir):
""" """
@ -57,7 +58,8 @@ class ZeekEngine(object):
c = {"ip_dst": record["id.resp_h"], c = {"ip_dst": record["id.resp_h"],
"proto": record["proto"], "proto": record["proto"],
"port_dst": record["id.resp_p"], "port_dst": record["id.resp_p"],
"service": record["service"]} "service": record["service"],
"alert_tiggered": False}
if c not in self.conns: if c not in self.conns:
self.conns.append(c) self.conns.append(c)
@ -69,7 +71,7 @@ class ZeekEngine(object):
self.conns = sorted(self.conns, key=lambda c: c["resolution"]) self.conns = sorted(self.conns, key=lambda c: c["resolution"])
# Check for whitelisted assets, if any, delete the record. # Check for whitelisted assets, if any, delete the record.
if self.white_analysis: if self.whitelist_analysis:
wl_cidrs = [IPNetwork(cidr) for cidr in get_whitelist("cidr")] wl_cidrs = [IPNetwork(cidr) for cidr in get_whitelist("cidr")]
wl_hosts = get_whitelist("ip4addr") + get_whitelist("ip6addr") wl_hosts = get_whitelist("ip4addr") + get_whitelist("ip6addr")
@ -77,12 +79,16 @@ class ZeekEngine(object):
for i, c in enumerate(self.conns): for i, c in enumerate(self.conns):
if c["ip_dst"] in [ip for ip in wl_hosts]: if c["ip_dst"] in [ip for ip in wl_hosts]:
self.whitelist.append(self.conns[i])
self.conns[i] = False self.conns[i] = False
elif c["resolution"] in wl_domains: elif c["resolution"] in wl_domains:
self.whitelist.append(self.conns[i])
self.conns[i] = False self.conns[i] = False
elif True in [c["resolution"].endswith("." + dom) for dom in wl_domains]: elif True in [c["resolution"].endswith("." + dom) for dom in wl_domains]:
self.whitelist.append(self.conns[i])
self.conns[i] = False self.conns[i] = False
elif True in [IPAddress(c["ip_dst"]) in cidr for cidr in wl_cidrs]: elif True in [IPAddress(c["ip_dst"]) in cidr for cidr in wl_cidrs]:
self.whitelist.append(self.conns[i])
self.conns[i] = False self.conns[i] = False
# Let's delete whitelisted connections. # Let's delete whitelisted connections.
@ -92,6 +98,7 @@ class ZeekEngine(object):
for c in self.conns: for c in self.conns:
# Check for UDP / ICMP (strange from a smartphone.) # Check for UDP / ICMP (strange from a smartphone.)
if c["proto"] in ["UDP", "ICMP"]: if c["proto"] in ["UDP", "ICMP"]:
c["alert_tiggered"] = True
self.alerts.append({"title": "{} communication going outside the local network to {}.".format(c["proto"].upper(), c["resolution"]), self.alerts.append({"title": "{} communication going outside the local network to {}.".format(c["proto"].upper(), c["resolution"]),
"description": "The {} protocol is commonly used in internal networks. Please, verify if the host {} leveraged other alerts which may ".format(c["proto"].upper(), c["resolution"]) "description": "The {} protocol is commonly used in internal networks. Please, verify if the host {} leveraged other alerts which may ".format(c["proto"].upper(), c["resolution"])
+ "indicates a possible malicious behavior.", + "indicates a possible malicious behavior.",
@ -100,6 +107,7 @@ class ZeekEngine(object):
"id": "PROTO-01"}) "id": "PROTO-01"})
# Check for use of ports over 1024. # Check for use of ports over 1024.
if c["port_dst"] >= max_ports: if c["port_dst"] >= max_ports:
c["alert_tiggered"] = True
self.alerts.append({"title": "{} connection to {} to a port over or equal to {}.".format(c["proto"].upper(), c["resolution"], max_ports), self.alerts.append({"title": "{} connection to {} to a port over or equal to {}.".format(c["proto"].upper(), c["resolution"], max_ports),
"description": "{} connections have been seen to {} by using the port {}. The use of non-standard port can be sometimes associated to malicious activities. ".format(c["proto"].upper(), c["resolution"], c["port_dst"]) "description": "{} connections have been seen to {} by using the port {}. The use of non-standard port can be sometimes associated to malicious activities. ".format(c["proto"].upper(), c["resolution"], c["port_dst"])
+ "We recommend to check if this host has a good reputation by looking on other alerts and search it on the internet.", + "We recommend to check if this host has a good reputation by looking on other alerts and search it on the internet.",
@ -108,6 +116,7 @@ class ZeekEngine(object):
"id": "PROTO-02"}) "id": "PROTO-02"})
# Check for use of HTTP. # Check for use of HTTP.
if c["service"] == "http" and c["port_dst"] == http_default_port: if c["service"] == "http" and c["port_dst"] == http_default_port:
c["alert_tiggered"] = True
self.alerts.append({"title": "HTTP communications been done to the host {}".format(c["resolution"]), self.alerts.append({"title": "HTTP communications been done to the host {}".format(c["resolution"]),
"description": "Your device exchanged with the host {} by using HTTP, an unencrypted protocol. ".format(c["resolution"]) "description": "Your device exchanged with the host {} by using HTTP, an unencrypted protocol. ".format(c["resolution"])
+ "Even if this behavior is not malicious by itself, it is unusual to see HTTP communications issued from smartphone applications " + "Even if this behavior is not malicious by itself, it is unusual to see HTTP communications issued from smartphone applications "
@ -118,6 +127,7 @@ class ZeekEngine(object):
# Check for use of HTTP on a non standard port. # Check for use of HTTP on a non standard port.
if c["service"] == "http" and c["port_dst"] != http_default_port: if c["service"] == "http" and c["port_dst"] != http_default_port:
c["alert_tiggered"] = True
self.alerts.append({"title": "HTTP communications have been seen to the host {} on a non standard port ({}).".format(c["resolution"], c["port_dst"]), self.alerts.append({"title": "HTTP communications have been seen to the host {} on a non standard port ({}).".format(c["resolution"], c["port_dst"]),
"description": "Your device exchanged with the host {} by using HTTP, an unencrypted protocol on the port {}. ".format(c["resolution"], c["port_dst"]) "description": "Your device exchanged with the host {} by using HTTP, an unencrypted protocol on the port {}. ".format(c["resolution"], c["port_dst"])
+ "This behavior is quite unusual. Please check the host reputation by searching it on the internet.", + "This behavior is quite unusual. Please check the host reputation by searching it on the internet.",
@ -126,6 +136,7 @@ class ZeekEngine(object):
"id": "PROTO-04"}) "id": "PROTO-04"})
# Check for non-resolved IP address. # Check for non-resolved IP address.
if c["service"] == c["resolution"]: if c["service"] == c["resolution"]:
c["alert_tiggered"] = True
self.alerts.append({"title": "The server {} hasn't been resolved by any DNS query during the session".format(c["ip_dst"]), self.alerts.append({"title": "The server {} hasn't been resolved by any DNS query during the session".format(c["ip_dst"]),
"description": "It means that the server {} is likely not resolved by any domain name or the resolution has already been cached by ".format(c["ip_dst"]) "description": "It means that the server {} is likely not resolved by any domain name or the resolution has already been cached by ".format(c["ip_dst"])
+ "the device. If the host appears in other alerts, please check it.", + "the device. If the host appears in other alerts, please check it.",
@ -147,6 +158,7 @@ class ZeekEngine(object):
# Check for blacklisted IP address. # Check for blacklisted IP address.
for host in bl_hosts: for host in bl_hosts:
if c["ip_dst"] == host[0]: if c["ip_dst"] == host[0]:
c["alert_tiggered"] = True
self.alerts.append({"title": "A connection has been made to {} ({}) which is tagged as {}.".format(c["resolution"], c["ip_dst"], host[1].upper()), self.alerts.append({"title": "A connection has been made to {} ({}) which is tagged as {}.".format(c["resolution"], c["ip_dst"], host[1].upper()),
"description": "The host {} has been explicitly blacklisted for malicious activities. Your device is likely compromised ".format(c["ip_dst"]) "description": "The host {} has been explicitly blacklisted for malicious activities. Your device is likely compromised ".format(c["ip_dst"])
+ "and needs to be investigated more deeply by IT security professionals.", + "and needs to be investigated more deeply by IT security professionals.",
@ -158,6 +170,7 @@ class ZeekEngine(object):
# Check for blacklisted CIDR. # Check for blacklisted CIDR.
for cidr in bl_cidrs: for cidr in bl_cidrs:
if IPAddress(c["ip_dst"]) in cidr[0]: if IPAddress(c["ip_dst"]) in cidr[0]:
c["alert_tiggered"] = True
self.alerts.append({"title": "Communication to {} under the CIDR {} which is tagged as {}.".format(c["resolution"], cidr[0], cidr[1].upper()), self.alerts.append({"title": "Communication to {} under the CIDR {} which is tagged as {}.".format(c["resolution"], cidr[0], cidr[1].upper()),
"description": "The server {} is hosted under a network which is known to host malicious activities. Even if this behavior is not malicious by itself, ".format(c["resolution"]) "description": "The server {} is hosted under a network which is known to host malicious activities. Even if this behavior is not malicious by itself, ".format(c["resolution"])
+ "you need to check if other alerts are also mentioning this host. If you have some doubts, please " + "you need to check if other alerts are also mentioning this host. If you have some doubts, please "
@ -169,6 +182,7 @@ class ZeekEngine(object):
for domain in bl_domains: for domain in bl_domains:
if c["resolution"].endswith(domain[0]): if c["resolution"].endswith(domain[0]):
if domain[1] != "tracker": if domain[1] != "tracker":
c["alert_tiggered"] = True
self.alerts.append({"title": "A DNS request have been done to {} which is tagged as {}.".format(c["resolution"], domain[1].upper()), self.alerts.append({"title": "A DNS request have been done to {} which is tagged as {}.".format(c["resolution"], domain[1].upper()),
"description": "The domain name {} seen in the capture has been explicitly tagged as malicious. This indicates that ".format(c["resolution"]) "description": "The domain name {} seen in the capture has been explicitly tagged as malicious. This indicates that ".format(c["resolution"])
+ "your device is likely compromised and needs to be investigated deeply.", + "your device is likely compromised and needs to be investigated deeply.",
@ -176,6 +190,7 @@ class ZeekEngine(object):
"level": "High", "level": "High",
"id": "IOC-03"}) "id": "IOC-03"})
else: else:
c["alert_tiggered"] = True
self.alerts.append({"title": "A DNS request have been done to {} which is tagged as {}.".format(c["resolution"], domain[1].upper()), self.alerts.append({"title": "A DNS request have been done to {} which is tagged as {}.".format(c["resolution"], domain[1].upper()),
"description": "The domain name {} seen in the capture has been explicitly tagged as a Tracker. This ".format(c["resolution"]) "description": "The domain name {} seen in the capture has been explicitly tagged as a Tracker. This ".format(c["resolution"])
+ "indicates that one of the active apps is geo-tracking your moves.", + "indicates that one of the active apps is geo-tracking your moves.",
@ -185,6 +200,7 @@ class ZeekEngine(object):
# Check for blacklisted FreeDNS. # Check for blacklisted FreeDNS.
for domain in bl_freedns: for domain in bl_freedns:
if c["resolution"].endswith("." + domain[0]): if c["resolution"].endswith("." + domain[0]):
c["alert_tiggered"] = True
self.alerts.append({"title": "A DNS request have been done to the domain {} which is a Free DNS.".format(c["resolution"]), self.alerts.append({"title": "A DNS request have been done to the domain {} which is a Free DNS.".format(c["resolution"]),
"description": "The domain name {} is using a Free DNS service. This kind of service is commonly used by cybercriminals ".format(c["resolution"]) "description": "The domain name {} is using a Free DNS service. This kind of service is commonly used by cybercriminals ".format(c["resolution"])
+ "or state-sponsored threat actors during their operations.", + "or state-sponsored threat actors during their operations.",
@ -195,6 +211,7 @@ class ZeekEngine(object):
# Check for suspect tlds. # Check for suspect tlds.
for tld in bl_tlds: for tld in bl_tlds:
if c["resolution"].endswith(tld[0]): if c["resolution"].endswith(tld[0]):
c["alert_tiggered"] = True
self.alerts.append({"title": "A DNS request have been done to the domain {} which contains a suspect TLD.".format(c["resolution"]), self.alerts.append({"title": "A DNS request have been done to the domain {} which contains a suspect TLD.".format(c["resolution"]),
"description": "The domain name {} is using a suspect Top Level Domain ({}). Even not malicious, this non-generic TLD is used regularly by cybercrime ".format(c["resolution"], tld[0]) "description": "The domain name {} is using a suspect Top Level Domain ({}). Even not malicious, this non-generic TLD is used regularly by cybercrime ".format(c["resolution"], tld[0])
+ "or state-sponsored operations. Please check this domain by searching it on an internet search engine. If other alerts are related to this " + "or state-sponsored operations. Please check this domain by searching it on an internet search engine. If other alerts are related to this "
@ -212,6 +229,7 @@ class ZeekEngine(object):
if len(name_servers): if len(name_servers):
for ns in bl_nameservers: for ns in bl_nameservers:
if name_servers[0].endswith(".{}.".format(ns[0])): if name_servers[0].endswith(".{}.".format(ns[0])):
c["alert_tiggered"] = True
self.alerts.append({"title": "The domain {} is using a suspect nameserver ({}).".format(c["resolution"], name_servers[0]), self.alerts.append({"title": "The domain {} is using a suspect nameserver ({}).".format(c["resolution"], name_servers[0]),
"description": "The domain name {} is using a nameserver that has been explicitly tagged to be associated to malicious activities. ".format(c["resolution"]) "description": "The domain name {} is using a nameserver that has been explicitly tagged to be associated to malicious activities. ".format(c["resolution"])
+ "Many cybercriminals and state-sponsored threat actors are using this kind of registrars because they allow cryptocurrencies and anonymous payments.", + "Many cybercriminals and state-sponsored threat actors are using this kind of registrars because they allow cryptocurrencies and anonymous payments.",
@ -248,6 +266,7 @@ class ZeekEngine(object):
for cert in bl_certs: # Check for blacklisted certificate. for cert in bl_certs: # Check for blacklisted certificate.
if f["sha1"] == cert[0]: if f["sha1"] == cert[0]:
host = self.resolve(f["ip_dst"]) host = self.resolve(f["ip_dst"])
c["alert_tiggered"] = True
self.alerts.append({"title": "A certificate associated to {} activities have been found in the communication to {}.".format(cert[1].upper(), host), self.alerts.append({"title": "A certificate associated to {} activities have been found in the communication to {}.".format(cert[1].upper(), host),
"description": "The certificate ({}) associated to {} has been explicitly tagged as malicious. This indicates that ".format(f["sha1"], host) "description": "The certificate ({}) associated to {} has been explicitly tagged as malicious. This indicates that ".format(f["sha1"], host)
+ "your device is likely compromised and need a forensic analysis.", + "your device is likely compromised and need a forensic analysis.",
@ -281,35 +300,38 @@ class ZeekEngine(object):
host = self.resolve(cert["host"]) host = self.resolve(cert["host"])
# If the associated host has not whitelisted, check the cert. # If the associated host has not whitelisted, check the cert.
if host in [c["resolution"] for c in self.conns]: for c in self.conns:
if host in c["resolution"]:
# Check for non generic SSL port. # Check for non generic SSL port.
if cert["port"] not in ssl_default_ports: if cert["port"] not in ssl_default_ports:
self.alerts.append({"title": "SSL connection done on an non standart port ({}) to {}".format(cert["port"], host), c["alert_tiggered"] = True
"description": "It is not common to see SSL connections issued from smartphones using non-standard ports. Even this can be totally legit," self.alerts.append({"title": "SSL connection done on an non standart port ({}) to {}".format(cert["port"], host),
+ " we recommend to check the reputation of {}, by looking at its WHOIS record, the associated autonomus system, its creation date, and ".format(host) "description": "It is not common to see SSL connections issued from smartphones using non-standard ports. Even this can be totally legit,"
+ " by searching it the internet.", + " we recommend to check the reputation of {}, by looking at its WHOIS record, the associated autonomus system, its creation date, and ".format(host)
"host": host, + " by searching it the internet.",
"level": "Moderate", "host": host,
"id": "SSL-01"}) "level": "Moderate",
# Check Free SSL certificates. "id": "SSL-01"})
if cert["issuer"] in free_issuers: # Check Free SSL certificates.
self.alerts.append({"title": "An SSL connection to {} is using a free certificate.".format(host), if cert["issuer"] in free_issuers:
"description": "Free certificates — such as Let's Encrypt — are wildly used by command and control servers associated to " c["alert_tiggered"] = True
+ "malicious implants or phishing web pages. We recommend to check the host associated to this certificate, " self.alerts.append({"title": "An SSL connection to {} is using a free certificate.".format(host),
+ "by looking at the domain name, its creation date, or by checking its reputation on the internet.", "description": "Free certificates — such as Let's Encrypt — are wildly used by command and control servers associated to "
"host": host, + "malicious implants or phishing web pages. We recommend to check the host associated to this certificate, "
"level": "Moderate", + "by looking at the domain name, its creation date, or by checking its reputation on the internet.",
"id": "SSL-02"}) "host": host,
# Check for self-signed certificates. "level": "Moderate",
if cert["validation_status"] == "self signed certificate in certificate chain": "id": "SSL-02"})
self.alerts.append({"title": "The certificate associated to {} is self-signed.".format(host), # Check for self-signed certificates.
"description": "The use of self-signed certificates is a common thing for attacker infrastructure. We recommend to check the host {} ".format(host) if cert["validation_status"] == "self signed certificate in certificate chain":
+ "which is associated to this certificate, by looking at the domain name (if any), its WHOIS record, its creation date, and " c["alert_tiggered"] = True
+ " by checking its reputation on the internet.", self.alerts.append({"title": "The certificate associated to {} is self-signed.".format(host),
"host": host, "description": "The use of self-signed certificates is a common thing for attacker infrastructure. We recommend to check the host {} ".format(host)
"level": "Moderate", + "which is associated to this certificate, by looking at the domain name (if any), its WHOIS record, its creation date, and "
"id": "SSL-03"}) + " by checking its reputation on the internet.",
"host": host,
"level": "Moderate",
"id": "SSL-03"})
def alerts_check(self): def alerts_check(self):
""" """
@ -350,8 +372,6 @@ class ZeekEngine(object):
""" """
sp.Popen("cd {} && /opt/zeek/bin/zeek -Cr capture.pcap protocols/ssl/validate-certs".format( sp.Popen("cd {} && /opt/zeek/bin/zeek -Cr capture.pcap protocols/ssl/validate-certs".format(
self.working_dir), shell=True).wait() self.working_dir), shell=True).wait()
sp.Popen("cd {} && mkdir assets".format(
self.working_dir), shell=True).wait()
sp.Popen("cd {} && mv *.log assets/".format(self.working_dir), sp.Popen("cd {} && mv *.log assets/".format(self.working_dir),
shell=True).wait() shell=True).wait()
@ -361,9 +381,23 @@ class ZeekEngine(object):
self.files_check(self.working_dir + "/assets/") self.files_check(self.working_dir + "/assets/")
self.alerts_check() self.alerts_check()
def get_alerts(self): def retrieve_alerts(self):
""" """
Retreive alerts. Retrieve alerts.
:return: list - a list of alerts wihout duplicates. :return: list - a list of alerts wihout duplicates.
""" """
return [dict(t) for t in {tuple(d.items()) for d in self.alerts}] return [dict(t) for t in {tuple(d.items()) for d in self.alerts}]
def retrieve_whitelist(self):
"""
Retrieve whitelisted elements.
:return: list - a list of whitelisted elements wihout duplicates.
"""
return [dict(t) for t in {tuple(d.items()) for d in self.whitelist}]
def retrieve_conns(self):
"""
Retrieve not whitelisted elements.
:return: list - a list of non-whitelisted elements wihout duplicates.
"""
return self.conns

View File

@ -13,3 +13,4 @@ pyudev
wifi wifi
qrcode qrcode
netifaces netifaces
weasyprint

View File

@ -44,13 +44,13 @@ class Analysis(object):
device, alerts = {}, {} device, alerts = {}, {}
# Getting device configuration. # Getting device configuration.
if os.path.isfile("/tmp/{}/device.json".format(self.token)): if os.path.isfile("/tmp/{}/assets/device.json".format(self.token)):
with open("/tmp/{}/device.json".format(self.token), "r") as f: with open("/tmp/{}/assets/device.json".format(self.token), "r") as f:
device = json.load(f) device = json.load(f)
# Getting alerts configuration. # Getting alerts configuration.
if os.path.isfile("/tmp/{}/alerts.json".format(self.token)): if os.path.isfile("/tmp/{}/assets/alerts.json".format(self.token)):
with open("/tmp/{}/alerts.json".format(self.token), "r") as f: with open("/tmp/{}/assets/alerts.json".format(self.token), "r") as f:
alerts = json.load(f) alerts = json.load(f)
if device != {} and alerts != {}: if device != {} and alerts != {}:

View File

@ -15,7 +15,8 @@ import re
class Capture(object): class Capture(object):
def __init__(self): def __init__(self):
self.working_dir = False self.capture_dir = False
self.assets_dir = False
self.capture_token = False self.capture_token = False
self.random_choice_alphabet = "ABCDEF1234567890" self.random_choice_alphabet = "ABCDEF1234567890"
@ -33,16 +34,18 @@ class Capture(object):
# Few context variable assignment # Few context variable assignment
self.capture_token = "".join( self.capture_token = "".join(
[random.choice(self.random_choice_alphabet) for i in range(8)]) [random.choice(self.random_choice_alphabet) for i in range(8)])
self.working_dir = "/tmp/{}/".format(self.capture_token) self.capture_dir = "/tmp/{}/".format(self.capture_token)
self.pcap = self.working_dir + "capture.pcap" self.assets_dir = "/tmp/{}/assets/".format(self.capture_token)
self.pcap = self.capture_dir + "capture.pcap"
self.iface = read_config(("network", "in")) self.iface = read_config(("network", "in"))
# For packets monitoring # For packets monitoring
self.list_pkts = [] self.list_pkts = []
self.last_pkts = 0 self.last_pkts = 0
# Make the capture directory # Make the capture and the assets directory
mkdir(self.working_dir) mkdir(self.capture_dir)
mkdir(self.assets_dir)
try: try:
sp.Popen(["tshark", "-i", self.iface, "-w", sp.Popen(["tshark", "-i", self.iface, "-w",

View File

@ -20,13 +20,13 @@ class Device(object):
:return: dict containing device properties. :return: dict containing device properties.
""" """
if not os.path.isfile("/tmp/{}/device.json".format(self.token)): if not os.path.isfile("/tmp/{}/assets/device.json".format(self.token)):
device = self.read_leases() device = self.read_leases()
if device["status"] != False: if device["status"] != False:
with open("/tmp/{}/device.json".format(self.token), "w") as f: with open("/tmp/{}/assets/device.json".format(self.token), "w") as f:
f.write(json.dumps(device)) f.write(json.dumps(device))
else: else:
with open("/tmp/{}/device.json".format(self.token)) as f: with open("/tmp/{}/assets/device.json".format(self.token)) as f:
device = json.load(f) device = json.load(f)
return device return device