2020-11-24 19:45:03 +01:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
2021-02-04 11:33:33 +01:00
from utils import get_iocs , get_apname , get_device , get_config
2020-11-24 19:45:03 +01:00
import time
import os
import subprocess as sp
import re
import json
2021-02-04 16:04:59 +01:00
import sys
2020-11-24 19:45:03 +01:00
class SuricataEngine ( ) :
def __init__ ( self , capture_directory ) :
self . wdir = capture_directory
self . alerts = [ ]
self . rules_file = " /tmp/rules.rules "
self . pcap_path = os . path . join ( self . wdir , " capture.pcap " )
self . rules = [ r [ 0 ] for r in get_iocs (
" snort " ) ] + self . generate_contextual_alerts ( )
2021-02-04 11:33:33 +01:00
self . userlang = get_config ( ( " frontend " , " user_lang " ) )
# Load template language
if not re . match ( " ^[a-z] { 2,3}$ " , self . userlang ) :
self . userlang = " en "
2021-02-08 17:22:44 +01:00
with open ( os . path . join ( os . path . dirname ( os . path . realpath ( sys . argv [ 0 ] ) ) , " locales/ {} .json " . format ( self . userlang ) ) ) as f :
2021-02-04 11:33:33 +01:00
self . template = json . load ( f ) [ " alerts " ]
2020-11-24 19:45:03 +01:00
def start_suricata ( self ) :
"""
Launch suricata against the capture . pcap file .
: return : nothing .
"""
# Generate the rule file an launch suricata.
if self . generate_rule_file ( ) :
2020-12-18 14:57:52 +01:00
sp . Popen ( [ " suricata " , " -S " , self . rules_file , " -r " ,
self . pcap_path , " -l " , " /tmp/ " ] ) . wait ( )
2020-11-24 19:45:03 +01:00
# Let's parse the log file.
for line in open ( " /tmp/fast.log " , " r " ) . readlines ( ) :
if " [**] " in line :
s = line . split ( " [**] " ) [ 1 ] . strip ( )
m = re . search (
r " \ [ \ d+ \ :(?P<sid> \ d+) \ :(?P<rev> \ d+) \ ] (?P<title>[ -~]+) " , s )
2021-02-04 11:33:33 +01:00
self . alerts . append ( { " title " : self . template [ " SNORT-01 " ] [ " title " ] . format ( m . group ( ' title ' ) ) ,
" description " : self . template [ " SNORT-01 " ] [ " description " ] ,
2020-11-24 19:45:03 +01:00
" level " : " High " ,
" id " : " SNORT-01 " } )
# Remove fast.log
os . remove ( " /tmp/fast.log " )
def generate_rule_file ( self ) :
"""
Generate the rules file passed to suricata .
2021-02-04 11:33:33 +01:00
: return : bool if operation succeed .
2020-11-24 19:45:03 +01:00
"""
try :
with open ( self . rules_file , " w+ " ) as f :
f . write ( " \n " . join ( self . rules ) )
return True
except :
return False
def generate_contextual_alerts ( self ) :
"""
Generate contextual alerts related to the current
ssid or the device itself .
"""
apname = get_apname ( )
device = get_device ( self . wdir . split ( " / " ) [ - 1 ] )
rules = [ ]
# Devices names to be whitelisted (can appear in UA of HTTP requests. So FP high alerts)
device_names = [ " iphone " , " ipad " , " android " , " samsung " , " galaxy " ,
" huawei " , " oneplus " , " oppo " , " pixel " , " xiaomi " , " realme " , " chrome " ,
" safari " ]
if apname and device :
# See if the AP name is sent in clear text over the internet.
if len ( apname ) > = 5 :
rules . append (
' alert tcp {} any -> $EXTERNAL_NET any (content: " {} " ; msg: " WiFi name sent in clear text " ; sid:10000101; rev:001;) ' . format ( device [ " ip_address " ] , apname ) )
rules . append (
' alert udp {} any -> $EXTERNAL_NET any (content: " {} " ; msg: " WiFi name sent in clear text " ; sid:10000102; rev:001;) ' . format ( device [ " ip_address " ] , apname ) )
# See if the device name is sent in clear text over the internet.
if len ( device [ " name " ] ) > = 5 and device [ " name " ] . lower ( ) not in device_names :
rules . append ( ' alert tcp {} any -> $EXTERNAL_NET any (content: " {} " ; msg: " Device name sent in clear text " ; sid:10000103; rev:001;) ' . format (
device [ " ip_address " ] , device [ " name " ] ) )
rules . append ( ' alert udp {} any -> $EXTERNAL_NET any (content: " {} " ; msg: " Device name sent in clear text " ; sid:10000104; rev:001;) ' . format (
device [ " ip_address " ] , device [ " name " ] ) )
return rules
def get_alerts ( self ) :
return [ dict ( t ) for t in { tuple ( d . items ( ) ) for d in self . alerts } ]