change hardcoded paths and introduce -f flag for calling analysis.py from frontende to skip device.json in report generation

This commit is contained in:
Janik Besendorf 2021-10-12 11:07:24 +02:00
parent 48539a13df
commit c170ae2fb2
5 changed files with 90 additions and 62 deletions

View File

@ -16,60 +16,82 @@ import os
containing a capture.pcap file. containing a capture.pcap file.
""" """
if __name__ == "__main__":
if len(sys.argv) == 2:
capture_directory = sys.argv[1]
if os.path.isdir(capture_directory):
manager = Manager() def analyze(capture_directory,frontend=False):
alerts = manager.dict() if os.path.isdir(capture_directory):
def zeekengine(alerts): manager = Manager()
zeek = ZeekEngine(capture_directory) alerts = manager.dict()
zeek.start_zeek()
alerts["zeek"] = zeek.retrieve_alerts()
# whitelist.json writing. def zeekengine(alerts):
with open(os.path.join(capture_directory, "assets/whitelist.json"), "w") as f: zeek = ZeekEngine(capture_directory)
f.write(json.dumps(zeek.retrieve_whitelist(), zeek.start_zeek()
indent=4, separators=(',', ': '))) alerts["zeek"] = zeek.retrieve_alerts()
# conns.json writing. if not os.path.isdir(os.path.join(capture_directory, "assets")):
with open(os.path.join(capture_directory, "assets/conns.json"), "w") as f: os.mkdir(os.path.join(capture_directory, "assets"))
f.write(json.dumps(zeek.retrieve_conns(), # whitelist.json writing.
indent=4, separators=(',', ': '))) with open(os.path.join(capture_directory, "assets/whitelist.json"), "w") as f:
f.write(json.dumps(zeek.retrieve_whitelist(),
indent=4, separators=(',', ': ')))
def snortengine(alerts): # conns.json writing.
suricata = SuricataEngine(capture_directory) with open(os.path.join(capture_directory, "assets/conns.json"), "w") as f:
suricata.start_suricata() f.write(json.dumps(zeek.retrieve_conns(),
alerts["suricata"] = suricata.get_alerts() indent=4, separators=(',', ': ')))
# Start the engines. def snortengine(alerts):
p1 = Process(target=zeekengine, args=(alerts,)) suricata = SuricataEngine(capture_directory)
p2 = Process(target=snortengine, args=(alerts,)) suricata.start_suricata()
p1.start() alerts["suricata"] = suricata.get_alerts()
p2.start()
# Wait to their end. # Start the engines.
p1.join() p1 = Process(target=zeekengine, args=(alerts,))
p2.join() p2 = Process(target=snortengine, args=(alerts,))
p1.start()
p2.start()
# Some formating and alerts.json writing. # Wait to their end.
with open(os.path.join(capture_directory, "assets/alerts.json"), "w") as f: p1.join()
report = {"high": [], "moderate": [], "low": []} p2.join()
for alert in (alerts["zeek"] + alerts["suricata"]):
if alert["level"] == "High": # Some formating and alerts.json writing.
report["high"].append(alert) with open(os.path.join(capture_directory, "assets/alerts.json"), "w") as f:
if alert["level"] == "Moderate": report = {"high": [], "moderate": [], "low": []}
report["moderate"].append(alert) for alert in (alerts["zeek"] + alerts["suricata"]):
if alert["level"] == "Low": if alert["level"] == "High":
report["low"].append(alert) report["high"].append(alert)
f.write(json.dumps(report, indent=4, separators=(',', ': '))) if alert["level"] == "Moderate":
report["moderate"].append(alert)
if alert["level"] == "Low":
report["low"].append(alert)
f.write(json.dumps(report, indent=4, separators=(',', ': ')))
# Generate the report
report = Report(capture_directory,frontend)
report.generate_report()
# Generate the report
report = Report(capture_directory)
report.generate_report()
else:
print("The directory doesn't exist.")
else: else:
print("Please specify a capture directory in argument.") print("The directory doesn't exist.")
def usage():
print("""Usage: python analysis.py [capture_directory]
where [capture_directory] is a directory containing a capture.pcap file
analysis.py -f starts the analysis in frontend mode intended to be called by the TinyCheck frontend.""")
if __name__ == "__main__":
if len(sys.argv) == 2: #called manually without frontend
analyze(sys.argv[1], False)
elif len(sys.argv) == 3:
if(sys.argv[1]) == "-f": #frontend mode
analyze(sys.argv[2], True)
else:
usage()
else:
usage()

View File

@ -13,7 +13,7 @@ from utils import get_config
class Report(object): class Report(object):
def __init__(self, capture_directory): def __init__(self, capture_directory, frontend):
self.capture_directory = capture_directory self.capture_directory = capture_directory
self.alerts = self.read_json(os.path.join( self.alerts = self.read_json(os.path.join(
capture_directory, "assets/alerts.json")) capture_directory, "assets/alerts.json"))
@ -21,10 +21,13 @@ class Report(object):
capture_directory, "assets/whitelist.json")) capture_directory, "assets/whitelist.json"))
self.conns = self.read_json(os.path.join( self.conns = self.read_json(os.path.join(
capture_directory, "assets/conns.json")) capture_directory, "assets/conns.json"))
self.device = self.read_json(os.path.join( self.device = None
capture_directory, "assets/device.json")) self.capinfos = None
self.capinfos = self.read_json(os.path.join( if frontend:
capture_directory, "assets/capinfos.json")) self.device = self.read_json(os.path.join(
capture_directory, "assets/device.json"))
self.capinfos = self.read_json(os.path.join(
capture_directory, "assets/capinfos.json"))
try: try:
with open(os.path.join(self.capture_directory, "capture.pcap"), "rb") as f: with open(os.path.join(self.capture_directory, "capture.pcap"), "rb") as f:
self.capture_sha1 = hashlib.sha1(f.read()).hexdigest() self.capture_sha1 = hashlib.sha1(f.read()).hexdigest()
@ -204,16 +207,18 @@ class Report(object):
""" """
header = "<div class=\"header\">" header = "<div class=\"header\">"
header += "<div class=\"logo\"></div>" header += "<div class=\"logo\"></div>"
header += "<p><br /><strong>{}: {}</strong><br />".format(self.template["device_name"], if self.device is not None:
header += "<p><br /><strong>{}: {}</strong><br />".format(self.template["device_name"],
self.device["name"]) self.device["name"])
header += "{}: {}<br />".format(self.template["device_mac"], header += "{}: {}<br />".format(self.template["device_mac"],
self.device["mac_address"]) self.device["mac_address"])
header += "{} {}<br />".format(self.template["report_generated_on"], header += "{} {}<br />".format(self.template["report_generated_on"],
datetime.now().strftime("%d/%m/%Y - %H:%M:%S")) datetime.now().strftime("%d/%m/%Y - %H:%M:%S"))
header += "{}: {}s<br />".format(self.template["capture_duration"], if self.capinfos is not None:
self.capinfos["Capture duration"]) header += "{}: {}s<br />".format(self.template["capture_duration"],
header += "{}: {}<br />".format(self.template["packets_number"], self.capinfos["Capture duration"])
self.capinfos["Number of packets"]) header += "{}: {}<br />".format(self.template["packets_number"],
self.capinfos["Number of packets"])
header += "{}: {}<br />".format( header += "{}: {}<br />".format(
self.template["capture_sha1"], self.capture_sha1) self.template["capture_sha1"], self.capture_sha1)
header += "</p>" header += "</p>"

View File

@ -236,6 +236,7 @@ class ZeekEngine(object):
pass pass
try: # Domain history check. try: # Domain history check.
whois_record = whois.whois(c["resolution"]) whois_record = whois.whois(c["resolution"])
creation_date = whois_record.creation_date if type( creation_date = whois_record.creation_date if type(
whois_record.creation_date) is not list else whois_record.creation_date[0] whois_record.creation_date) is not list else whois_record.creation_date[0]
@ -247,6 +248,7 @@ class ZeekEngine(object):
"host": c["resolution"], "host": c["resolution"],
"level": "Moderate", "level": "Moderate",
"id": "ACT-02"}) "id": "ACT-02"})
except: except:
pass pass
@ -443,11 +445,10 @@ class ZeekEngine(object):
""" """
Start zeek and check the logs. Start zeek and check the logs.
""" """
sp.Popen("cd {} && /opt/zeek/bin/zeek -Cr capture.pcap protocols/ssl/validate-certs".format( sp.Popen("cd {} && zeek -Cr capture.pcap protocols/ssl/validate-certs".format(
self.working_dir), shell=True).wait() self.working_dir), shell=True).wait()
sp.Popen("cd {} && mv *.log assets/".format(self.working_dir), sp.Popen("cd {} && mv *.log assets/".format(self.working_dir),
shell=True).wait() shell=True).wait()
self.fill_dns(self.working_dir + "/assets/") self.fill_dns(self.working_dir + "/assets/")
self.netflow_check(self.working_dir + "/assets/") self.netflow_check(self.working_dir + "/assets/")
self.ssl_check(self.working_dir + "/assets/") self.ssl_check(self.working_dir + "/assets/")

View File

@ -10,7 +10,7 @@ import os
from functools import reduce from functools import reduce
# I'm not going to use an ORM for that. # I'm not going to use an ORM for that.
parent = "/".join(sys.path[0].split("/")[:-1]) parent = os.path.split(os.path.dirname(os.path.abspath(sys.argv[0])))[0]
conn = sqlite3.connect(os.path.join(parent, "tinycheck.sqlite3")) conn = sqlite3.connect(os.path.join(parent, "tinycheck.sqlite3"))
cursor = conn.cursor() cursor = conn.cursor()

View File

@ -24,7 +24,7 @@ class Analysis(object):
if self.token is not None: if self.token is not None:
parent = "/".join(sys.path[0].split("/")[:-2]) parent = "/".join(sys.path[0].split("/")[:-2])
sp.Popen( sp.Popen(
[sys.executable, "{}/analysis/analysis.py".format(parent), "/tmp/{}".format(self.token)]) [sys.executable, "{}/analysis/analysis.py".format(parent), "-f", "/tmp/{}".format(self.token)])
return {"status": True, return {"status": True,
"message": "Analysis started", "message": "Analysis started",
"token": self.token} "token": self.token}