KasperskyLab-TinyCheck/analysis/classes/suricataengine.py
2021-02-08 17:22:44 +01:00

101 lines
4.1 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from utils import get_iocs, get_apname, get_device, get_config
import time
import os
import subprocess as sp
import re
import json
import sys
class SuricataEngine():
def __init__(self, capture_directory):
self.wdir = capture_directory
self.alerts = []
self.rules_file = "/tmp/rules.rules"
self.pcap_path = os.path.join(self.wdir, "capture.pcap")
self.rules = [r[0] for r in get_iocs(
"snort")] + self.generate_contextual_alerts()
self.userlang = get_config(("frontend", "user_lang"))
# Load template language
if not re.match("^[a-z]{2,3}$", self.userlang):
self.userlang = "en"
with open(os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "locales/{}.json".format(self.userlang))) as f:
self.template = json.load(f)["alerts"]
def start_suricata(self):
"""
Launch suricata against the capture.pcap file.
:return: nothing.
"""
# Generate the rule file an launch suricata.
if self.generate_rule_file():
sp.Popen(["suricata", "-S", self.rules_file, "-r",
self.pcap_path, "-l", "/tmp/"]).wait()
# Let's parse the log file.
for line in open("/tmp/fast.log", "r").readlines():
if "[**]" in line:
s = line.split("[**]")[1].strip()
m = re.search(
r"\[\d+\:(?P<sid>\d+)\:(?P<rev>\d+)\] (?P<title>[ -~]+)", s)
self.alerts.append({"title": self.template["SNORT-01"]["title"].format(m.group('title')),
"description": self.template["SNORT-01"]["description"],
"level": "High",
"id": "SNORT-01"})
# Remove fast.log
os.remove("/tmp/fast.log")
def generate_rule_file(self):
"""
Generate the rules file passed to suricata.
:return: bool if operation succeed.
"""
try:
with open(self.rules_file, "w+") as f:
f.write("\n".join(self.rules))
return True
except:
return False
def generate_contextual_alerts(self):
"""
Generate contextual alerts related to the current
ssid or the device itself.
"""
apname = get_apname()
device = get_device(self.wdir.split("/")[-1])
rules = []
# Devices names to be whitelisted (can appear in UA of HTTP requests. So FP high alerts)
device_names = ["iphone", "ipad", "android", "samsung", "galaxy",
"huawei", "oneplus", "oppo", "pixel", "xiaomi", "realme", "chrome",
"safari"]
if apname and device:
# See if the AP name is sent in clear text over the internet.
if len(apname) >= 5:
rules.append(
'alert tcp {} any -> $EXTERNAL_NET any (content:"{}"; msg:"WiFi name sent in clear text"; sid:10000101; rev:001;)'.format(device["ip_address"], apname))
rules.append(
'alert udp {} any -> $EXTERNAL_NET any (content:"{}"; msg:"WiFi name sent in clear text"; sid:10000102; rev:001;)'.format(device["ip_address"], apname))
# See if the device name is sent in clear text over the internet.
if len(device["name"]) >= 5 and device["name"].lower() not in device_names:
rules.append('alert tcp {} any -> $EXTERNAL_NET any (content:"{}"; msg:"Device name sent in clear text"; sid:10000103; rev:001;)'.format(
device["ip_address"], device["name"]))
rules.append('alert udp {} any -> $EXTERNAL_NET any (content:"{}"; msg:"Device name sent in clear text"; sid:10000104; rev:001;)'.format(
device["ip_address"], device["name"]))
return rules
def get_alerts(self):
return [dict(t) for t in {tuple(d.items()) for d in self.alerts}]