Secure Source of Randomness

This commit is contained in:
pixeebot[bot] 2024-06-15 10:28:21 +00:00 committed by GitHub
parent 3ff9520114
commit 6c2da10eaa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 133 additions and 133 deletions

View File

@ -1,128 +1,128 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import subprocess as sp import subprocess as sp
from app.utils import terminate_process, read_config from app.utils import terminate_process, read_config
from os import mkdir, path from os import mkdir, path
from flask import send_file, jsonify from flask import send_file, jsonify
import datetime import datetime
import shutil import shutil
import json import json
import random import sys
import sys import re
import re import secrets
class Capture(object): class Capture(object):
def __init__(self): def __init__(self):
self.random_choice_alphabet = "ABCDEF1234567890" self.random_choice_alphabet = "ABCDEF1234567890"
def start_capture(self): def start_capture(self):
""" """
Start a tshark capture on the created AP interface and save Start a tshark capture on the created AP interface and save
it in a temporary directory under /tmp/. it in a temporary directory under /tmp/.
:return: dict containing capture token and status. :return: dict containing capture token and status.
""" """
# Kill potential tshark zombies instances, if any. # Kill potential tshark zombies instances, if any.
terminate_process("tshark") terminate_process("tshark")
# Few context variable assignment # Few context variable assignment
self.capture_token = "".join( self.capture_token = "".join(
[random.choice(self.random_choice_alphabet) for i in range(8)]) [secrets.choice(self.random_choice_alphabet) for i in range(8)])
self.capture_dir = "/tmp/{}/".format(self.capture_token) self.capture_dir = "/tmp/{}/".format(self.capture_token)
self.assets_dir = "/tmp/{}/assets/".format(self.capture_token) self.assets_dir = "/tmp/{}/assets/".format(self.capture_token)
self.pcap = self.capture_dir + "capture.pcap" self.pcap = self.capture_dir + "capture.pcap"
self.iface = read_config(("network", "in")) self.iface = read_config(("network", "in"))
# For packets monitoring # For packets monitoring
self.list_pkts = [] self.list_pkts = []
self.last_pkts = 0 self.last_pkts = 0
# Make the capture and the assets directory # Make the capture and the assets directory
mkdir(self.capture_dir) mkdir(self.capture_dir)
mkdir(self.assets_dir) mkdir(self.assets_dir)
try: try:
sp.Popen(["tshark", "-i", self.iface, "-w", sp.Popen(["tshark", "-i", self.iface, "-w",
self.pcap, "-f", "tcp or udp"]) self.pcap, "-f", "tcp or udp"])
return {"status": True, return {"status": True,
"message": "Capture started", "message": "Capture started",
"capture_token": self.capture_token} "capture_token": self.capture_token}
except: except:
return {"status": False, return {"status": False,
"message": "Unexpected error: %s" % sys.exc_info()[0]} "message": "Unexpected error: %s" % sys.exc_info()[0]}
def get_capture_stats(self): def get_capture_stats(self):
""" """
Get some dirty capture statistics in order to have a sparkline Get some dirty capture statistics in order to have a sparkline
in the background of capture view. in the background of capture view.
:return: dict containing stats associated to the capture :return: dict containing stats associated to the capture
""" """
with open("/sys/class/net/{}/statistics/tx_packets".format(self.iface)) as f: with open("/sys/class/net/{}/statistics/tx_packets".format(self.iface)) as f:
tx_pkts = int(f.read()) tx_pkts = int(f.read())
with open("/sys/class/net/{}/statistics/rx_packets".format(self.iface)) as f: with open("/sys/class/net/{}/statistics/rx_packets".format(self.iface)) as f:
rx_pkts = int(f.read()) rx_pkts = int(f.read())
if self.last_pkts == 0: if self.last_pkts == 0:
self.last_pkts = tx_pkts + rx_pkts self.last_pkts = tx_pkts + rx_pkts
return {"status": True, return {"status": True,
"packets": [0*400]} "packets": [0*400]}
else: else:
curr_pkts = (tx_pkts + rx_pkts) - self.last_pkts curr_pkts = (tx_pkts + rx_pkts) - self.last_pkts
self.last_pkts = tx_pkts + rx_pkts self.last_pkts = tx_pkts + rx_pkts
self.list_pkts.append(curr_pkts) self.list_pkts.append(curr_pkts)
return {"status": True, return {"status": True,
"packets": self.beautify_stats(self.list_pkts)} "packets": self.beautify_stats(self.list_pkts)}
@staticmethod @staticmethod
def beautify_stats(data): def beautify_stats(data):
""" """
Add 0 at the end of the array if the len of the array is less Add 0 at the end of the array if the len of the array is less
than max_len. Else, get the last 100 stats. This allows to than max_len. Else, get the last 100 stats. This allows to
show a kind of "progressive chart" in the background for show a kind of "progressive chart" in the background for
the first packets. the first packets.
:return: a list of integers. :return: a list of integers.
""" """
max_len = 400 max_len = 400
if len(data) >= max_len: if len(data) >= max_len:
return data[-max_len:] return data[-max_len:]
else: else:
return data + [1] * (max_len - len(data)) return data + [1] * (max_len - len(data))
def stop_capture(self): def stop_capture(self):
""" """
Stop tshark if any instance present & ask create_capinfos. Stop tshark if any instance present & ask create_capinfos.
:return: dict as a small confirmation. :return: dict as a small confirmation.
""" """
if terminate_process("tshark"): if terminate_process("tshark"):
self.create_capinfos() self.create_capinfos()
return {"status": True, return {"status": True,
"message": "Capture stopped"} "message": "Capture stopped"}
else: else:
return {"status": False, return {"status": False,
"message": "No active capture"} "message": "No active capture"}
def create_capinfos(self): def create_capinfos(self):
""" """
Creates a capinfo json file. Creates a capinfo json file.
:return: dict as a small confirmation. :return: dict as a small confirmation.
""" """
infos = sp.Popen(["capinfos", self.pcap], infos = sp.Popen(["capinfos", self.pcap],
stdout=sp.PIPE, stderr=sp.PIPE) stdout=sp.PIPE, stderr=sp.PIPE)
infos = infos.communicate()[0] infos = infos.communicate()[0]
data = {} data = {}
for l in infos.decode().splitlines(): for l in infos.decode().splitlines():
try: try:
l = l.split(": ") if ": " in l else l.split("= ") l = l.split(": ") if ": " in l else l.split("= ")
if len(l[0]) and len(l[1]): if len(l[0]) and len(l[1]):
data[l[0].strip()] = l[1].strip() data[l[0].strip()] = l[1].strip()
except: except:
continue continue
with open("{}capinfos.json".format(self.assets_dir), 'w') as f: with open("{}capinfos.json".format(self.assets_dir), 'w') as f:
json.dump(data, f) json.dump(data, f)
return True return True

View File

@ -9,13 +9,13 @@ import sys
import time import time
import qrcode import qrcode
import base64 import base64
import random
import requests import requests
from wifi import Cell from wifi import Cell
from os import path, remove from os import path, remove
from io import BytesIO from io import BytesIO
from app.utils import terminate_process, read_config from app.utils import terminate_process, read_config
import secrets
class Network(object): class Network(object):
@ -170,14 +170,14 @@ class Network(object):
# Generate the hostapd configuration # Generate the hostapd configuration
if read_config(("network", "tokenized_ssids")): if read_config(("network", "tokenized_ssids")):
token = "".join([random.choice(self.random_choice_alphabet) token = "".join([secrets.choice(self.random_choice_alphabet)
for i in range(4)]) for i in range(4)])
self.AP_SSID = random.choice(read_config( self.AP_SSID = secrets.choice(read_config(
("network", "ssids"))) + "-" + token ("network", "ssids"))) + "-" + token
else: else:
self.AP_SSID = random.choice(read_config(("network", "ssids"))) self.AP_SSID = secrets.choice(read_config(("network", "ssids")))
self.AP_PASS = "".join( self.AP_PASS = "".join(
[random.choice(self.random_choice_alphabet) for i in range(8)]) [secrets.choice(self.random_choice_alphabet) for i in range(8)])
# Launch hostapd # Launch hostapd
if self.write_hostapd_config(): if self.write_hostapd_config():