Adding the channel check to prevent interferences

This commit is contained in:
Félix Aime 2021-06-08 11:49:53 +02:00
parent 9f75d339da
commit 61de73d989

View File

@ -1,337 +1,355 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import subprocess as sp import subprocess as sp
import netifaces as ni import netifaces as ni
import requests as rq import requests as rq
import sys import re
import time import sys
import qrcode import time
import base64 import qrcode
import random import base64
import requests import random
import requests
from wifi import Cell
from os import path, remove from wifi import Cell
from io import BytesIO from os import path, remove
from app.utils import terminate_process, read_config from io import BytesIO
from app.utils import terminate_process, read_config
class Network(object):
class Network(object):
def __init__(self):
self.AP_SSID = False def __init__(self):
self.AP_PASS = False self.AP_SSID = False
self.iface_in = read_config(("network", "in")) self.AP_PASS = False
self.iface_out = read_config(("network", "out")) self.iface_in = read_config(("network", "in"))
self.enable_interface(self.iface_in) self.iface_out = read_config(("network", "out"))
self.enable_interface(self.iface_out) self.enable_interface(self.iface_in)
self.enable_forwarding() self.enable_interface(self.iface_out)
self.reset_dnsmasq_leases() self.enable_forwarding()
self.random_choice_alphabet = "abcdef1234567890" self.reset_dnsmasq_leases()
self.random_choice_alphabet = "abcdef1234567890"
def check_status(self):
""" def check_status(self):
The method check_status check the IP addressing of each interface """
and return their associated IP. The method check_status check the IP addressing of each interface
and return their associated IP.
:return: dict containing each interface status.
""" :return: dict containing each interface status.
"""
ctx = {"interfaces": {
self.iface_in: False, ctx = {"interfaces": {
self.iface_out: False, self.iface_in: False,
"eth0": False}, self.iface_out: False,
"internet": self.check_internet()} "eth0": False},
"internet": self.check_internet()}
for iface in ctx["interfaces"].keys():
try: for iface in ctx["interfaces"].keys():
ip = ni.ifaddresses(iface)[ni.AF_INET][0]["addr"] try:
if not ip.startswith("127") or not ip.startswith("169.254"): ip = ni.ifaddresses(iface)[ni.AF_INET][0]["addr"]
ctx["interfaces"][iface] = ip if not ip.startswith("127") or not ip.startswith("169.254"):
except: ctx["interfaces"][iface] = ip
ctx["interfaces"][iface] = "Interface not connected or present." except:
return ctx ctx["interfaces"][iface] = "Interface not connected or present."
return ctx
def wifi_list_networks(self):
""" def wifi_list_networks(self):
The method wifi_list_networks list the available WiFi networks """
by using wifi python package. The method wifi_list_networks list the available WiFi networks
:return: dict - containing the list of Wi-Fi networks. by using wifi python package.
""" :return: dict - containing the list of Wi-Fi networks.
networks = [] """
try: networks = []
for n in Cell.all(self.iface_out): try:
if n.ssid not in [n["ssid"] for n in networks] and n.ssid and n.encrypted: for n in Cell.all(self.iface_out):
networks.append( if n.ssid not in [n["ssid"] for n in networks] and n.ssid and n.encrypted:
{"ssid": n.ssid, "type": n.encryption_type}) networks.append(
return {"networks": networks} {"ssid": n.ssid, "type": n.encryption_type})
except: return {"networks": networks}
return {"networks": []} except:
return {"networks": []}
@staticmethod
def wifi_setup(ssid, password): @staticmethod
""" def wifi_setup(ssid, password):
Edit the wpa_supplicant file with provided credentials. """
If the ssid already exists, just update the password. Otherwise Edit the wpa_supplicant file with provided credentials.
create a new entry in the file. If the ssid already exists, just update the password. Otherwise
create a new entry in the file.
:return: dict containing the status of the operation
""" :return: dict containing the status of the operation
if len(password) >= 8 and len(ssid): """
found = False if len(password) >= 8 and len(ssid):
networks = [] found = False
header, content = "", "" networks = []
header, content = "", ""
with open("/etc/wpa_supplicant/wpa_supplicant.conf") as f:
content = f.read() with open("/etc/wpa_supplicant/wpa_supplicant.conf") as f:
blocks = content.split("network={") content = f.read()
header = blocks[0] blocks = content.split("network={")
header = blocks[0]
for block in blocks[1:]:
net = {} for block in blocks[1:]:
for line in block.splitlines(): net = {}
if line and line != "}": for line in block.splitlines():
if "priority=10" not in line.strip(): if line and line != "}":
key, val = line.strip().split("=") if "priority=10" not in line.strip():
if key != "disabled": key, val = line.strip().split("=")
net[key] = val.replace("\"", "") if key != "disabled":
networks.append(net) net[key] = val.replace("\"", "")
networks.append(net)
for net in networks:
if net["ssid"] == ssid: for net in networks:
net["psk"] = password.replace('"', '\\"') if net["ssid"] == ssid:
net["priority"] = "10" net["psk"] = password.replace('"', '\\"')
found = True net["priority"] = "10"
found = True
if not found:
networks.append({ if not found:
"ssid": ssid, networks.append({
"psk": password.replace('"', '\\"'), "ssid": ssid,
"key_mgmt": "WPA-PSK", "psk": password.replace('"', '\\"'),
"priority": "10" "key_mgmt": "WPA-PSK",
}) "priority": "10"
})
with open("/etc/wpa_supplicant/wpa_supplicant.conf", "w+") as f:
content = header with open("/etc/wpa_supplicant/wpa_supplicant.conf", "w+") as f:
for network in networks: content = header
net = "network={\n" for network in networks:
for k, v in network.items(): net = "network={\n"
if k in ["ssid", "psk"]: for k, v in network.items():
net += " {}=\"{}\"\n".format(k, v) if k in ["ssid", "psk"]:
else: net += " {}=\"{}\"\n".format(k, v)
net += " {}={}\n".format(k, v) else:
net += "}\n\n" net += " {}={}\n".format(k, v)
content += net net += "}\n\n"
if f.write(content): content += net
return {"status": True, if f.write(content):
"message": "Configuration saved"} return {"status": True,
else: "message": "Configuration saved"}
return {"status": False, else:
"message": "Error while writing wpa_supplicant configuration file."} return {"status": False,
else: "message": "Error while writing wpa_supplicant configuration file."}
return {"status": False, else:
"message": "Empty SSID or/and password length less than 8 chars."} return {"status": False,
"message": "Empty SSID or/and password length less than 8 chars."}
def wifi_connect(self):
""" def wifi_connect(self):
Connect to one of the WiFi networks present in the wpa_supplicant.conf. """
:return: dict containing the TinyCheck <-> AP status. Connect to one of the WiFi networks present in the wpa_supplicant.conf.
""" :return: dict containing the TinyCheck <-> AP status.
"""
# Kill wpa_supplicant instances, if any.
terminate_process("wpa_supplicant") # Kill wpa_supplicant instances, if any.
# Launch a new instance of wpa_supplicant. terminate_process("wpa_supplicant")
sp.Popen(["wpa_supplicant", "-B", "-i", self.iface_out, "-c", # Launch a new instance of wpa_supplicant.
"/etc/wpa_supplicant/wpa_supplicant.conf"]).wait() sp.Popen(["wpa_supplicant", "-B", "-i", self.iface_out, "-c",
# Check internet status "/etc/wpa_supplicant/wpa_supplicant.conf"]).wait()
for _ in range(1, 40): # Check internet status
if self.check_internet(): for _ in range(1, 40):
return {"status": True, if self.check_internet():
"message": "Wifi connected"} return {"status": True,
time.sleep(1) "message": "Wifi connected"}
time.sleep(1)
return {"status": False,
"message": "Wifi not connected"} return {"status": False,
"message": "Wifi not connected"}
def start_ap(self):
""" def start_ap(self):
The start_ap method generates an Access Point by using HostApd """
and provide to the GUI the associated ssid, password and qrcode. The start_ap method generates an Access Point by using HostApd
and provide to the GUI the associated ssid, password and qrcode.
:return: dict containing the status of the AP
""" :return: dict containing the status of the AP
"""
# Re-ask to enable interface, sometimes it just go away.
if not self.enable_interface(self.iface_out): # Re-ask to enable interface, sometimes it just go away.
return {"status": False, if not self.enable_interface(self.iface_out):
"message": "Interface not present."} return {"status": False,
"message": "Interface not present."}
# Generate the hostapd configuration
if read_config(("network", "tokenized_ssids")): # Generate the hostapd configuration
token = "".join([random.choice(self.random_choice_alphabet) if read_config(("network", "tokenized_ssids")):
for i in range(4)]) token = "".join([random.choice(self.random_choice_alphabet)
self.AP_SSID = random.choice(read_config( for i in range(4)])
("network", "ssids"))) + "-" + token self.AP_SSID = random.choice(read_config(
else: ("network", "ssids"))) + "-" + token
self.AP_SSID = random.choice(read_config(("network", "ssids"))) else:
self.AP_PASS = "".join( self.AP_SSID = random.choice(read_config(("network", "ssids")))
[random.choice(self.random_choice_alphabet) for i in range(8)]) self.AP_PASS = "".join(
[random.choice(self.random_choice_alphabet) for i in range(8)])
# Launch hostapd
if self.write_hostapd_config(): # Launch hostapd
if self.lauch_hostapd() and self.reset_dnsmasq_leases(): if self.write_hostapd_config():
return {"status": True, if self.lauch_hostapd() and self.reset_dnsmasq_leases():
"message": "AP started", return {"status": True,
"ssid": self.AP_SSID, "message": "AP started",
"password": self.AP_PASS, "ssid": self.AP_SSID,
"qrcode": self.generate_qr_code()} "password": self.AP_PASS,
else: "qrcode": self.generate_qr_code()}
return {"status": False, else:
"message": "Error while creating AP."} return {"status": False,
else: "message": "Error while creating AP."}
return {"status": False, else:
"message": "Error while writing hostapd configuration file."} return {"status": False,
"message": "Error while writing hostapd configuration file."}
def generate_qr_code(self):
""" def generate_qr_code(self):
The method generate_qr_code returns a QRCode based on """
the SSID and the password. The method generate_qr_code returns a QRCode based on
the SSID and the password.
:return: - string containing the PNG of the QRCode.
""" :return: - string containing the PNG of the QRCode.
qrc = qrcode.make("WIFI:S:{};T:WPA;P:{};;".format( """
self.AP_SSID, self.AP_PASS)) qrc = qrcode.make("WIFI:S:{};T:WPA;P:{};;".format(
buffered = BytesIO() self.AP_SSID, self.AP_PASS))
qrc.save(buffered, format="PNG") buffered = BytesIO()
return "data:image/png;base64,{}".format(base64.b64encode(buffered.getvalue()).decode("utf8")) qrc.save(buffered, format="PNG")
return "data:image/png;base64,{}".format(base64.b64encode(buffered.getvalue()).decode("utf8"))
def write_hostapd_config(self):
""" def write_hostapd_config(self):
The method write_hostapd_config write the hostapd configuration """
under a temporary location defined in the config file. The method write_hostapd_config write the hostapd configuration
under a temporary location defined in the config file.
:return: bool - if hostapd configuration file created
""" :return: bool - if hostapd configuration file created
try: """
with open("{}/app/assets/hostapd.conf".format(sys.path[0]), "r") as f: try:
conf = f.read() chan = self.set_ap_channel()
conf = conf.replace("{IFACE}", self.iface_in) with open("{}/app/assets/hostapd.conf".format(sys.path[0]), "r") as f:
conf = conf.replace("{SSID}", self.AP_SSID) conf = f.read()
conf = conf.replace("{PASS}", self.AP_PASS) conf = conf.replace("{IFACE}", self.iface_in)
with open("/tmp/hostapd.conf", "w") as c: conf = conf.replace("{SSID}", self.AP_SSID)
c.write(conf) conf = conf.replace("{PASS}", self.AP_PASS)
return True conf = conf.replace("{CHAN}", chan)
except: with open("/tmp/hostapd.conf", "w") as c:
return False c.write(conf)
return True
def lauch_hostapd(self): except:
""" return False
The method lauch_hostapd kill old instance of hostapd and launch a
new one as a background process. def lauch_hostapd(self):
"""
:return: bool - if hostapd sucessfully launched. The method lauch_hostapd kill old instance of hostapd and launch a
""" new one as a background process.
# Kill potential zombies of hostapd :return: bool - if hostapd sucessfully launched.
terminate_process("hostapd") """
sp.Popen(["ifconfig", self.iface_in, "up"]).wait() # Kill potential zombies of hostapd
sp.Popen( terminate_process("hostapd")
"/usr/sbin/hostapd /tmp/hostapd.conf > /tmp/hostapd.log", shell=True)
sp.Popen(["ifconfig", self.iface_in, "up"]).wait()
while True: sp.Popen(
if path.isfile("/tmp/hostapd.log"): "/usr/sbin/hostapd /tmp/hostapd.conf > /tmp/hostapd.log", shell=True)
with open("/tmp/hostapd.log", "r") as f:
log = f.read() while True:
err = ["Could not configure driver mode", if path.isfile("/tmp/hostapd.log"):
"driver initialization failed"] with open("/tmp/hostapd.log", "r") as f:
if not any(e in log for e in err): log = f.read()
if "AP-ENABLED" in log: err = ["Could not configure driver mode",
return True "driver initialization failed"]
else: if not any(e in log for e in err):
return False if "AP-ENABLED" in log:
time.sleep(1) return True
else:
def stop_hostapd(self): return False
""" time.sleep(1)
Stop hostapd instance.
def stop_hostapd(self):
:return: dict - a little message for debug. """
""" Stop hostapd instance.
if terminate_process("hostapd"):
return {"status": True, :return: dict - a little message for debug.
"message": "AP stopped"} """
else: if terminate_process("hostapd"):
return {"status": False, return {"status": True,
"message": "No AP running"} "message": "AP stopped"}
else:
def reset_dnsmasq_leases(self): return {"status": False,
""" "message": "No AP running"}
This method reset the DNSMasq leases and logs to get the new
connected device name & new DNS entries. def reset_dnsmasq_leases(self):
"""
:return: bool if everything goes well This method reset the DNSMasq leases and logs to get the new
""" connected device name & new DNS entries.
try:
sp.Popen("service dnsmasq stop", shell=True).wait() :return: bool if everything goes well
sp.Popen("cp /dev/null /var/lib/misc/dnsmasq.leases", """
shell=True).wait() try:
sp.Popen("cp /dev/null /var/log/messages.log", shell=True).wait() sp.Popen("service dnsmasq stop", shell=True).wait()
sp.Popen("service dnsmasq start", shell=True).wait() sp.Popen("cp /dev/null /var/lib/misc/dnsmasq.leases",
return True shell=True).wait()
except: sp.Popen("cp /dev/null /var/log/messages.log", shell=True).wait()
return False sp.Popen("service dnsmasq start", shell=True).wait()
return True
def enable_forwarding(self): except:
""" return False
This enable forwarding to get internet working on the connected device.
Method tiggered during the Network class intialization. def enable_forwarding(self):
"""
:return: bool if everything goes well This enable forwarding to get internet working on the connected device.
""" Method tiggered during the Network class intialization.
try:
sp.Popen("echo 1 > /proc/sys/net/ipv4/ip_forward", :return: bool if everything goes well
shell=True).wait() """
try:
# Enable forwarding. sp.Popen("echo 1 > /proc/sys/net/ipv4/ip_forward",
sp.Popen(["iptables", "-A", "POSTROUTING", "-t", "nat", "-o", shell=True).wait()
self.iface_out, "-j", "MASQUERADE"]).wait()
# Enable forwarding.
# Prevent the device to reach the 80 and 443 of TinyCheck. sp.Popen(["iptables", "-A", "POSTROUTING", "-t", "nat", "-o",
sp.Popen(["iptables", "-A", "INPUT", "-i", self.iface_in, "-d", self.iface_out, "-j", "MASQUERADE"]).wait()
"192.168.100.1", "-p", "tcp", "--match", "multiport", "--dports", "80,443", "-j" "DROP"]).wait()
# Prevent the device to reach the 80 and 443 of TinyCheck.
return True sp.Popen(["iptables", "-A", "INPUT", "-i", self.iface_in, "-d",
except: "192.168.100.1", "-p", "tcp", "--match", "multiport", "--dports", "80,443", "-j" "DROP"]).wait()
return False
return True
def enable_interface(self, iface): except:
""" return False
This enable interfaces, with a simple check.
:return: bool if everything goes well def enable_interface(self, iface):
""" """
sh = sp.Popen(["ifconfig", iface], This enable interfaces, with a simple check.
stdout=sp.PIPE, stderr=sp.PIPE) :return: bool if everything goes well
sh = sh.communicate() """
if b"<UP," in sh[0]: sh = sp.Popen(["ifconfig", iface],
return True # The interface is up. stdout=sp.PIPE, stderr=sp.PIPE)
elif sh[1]: sh = sh.communicate()
return False # The interface doesn't exists (most of the cases). if b"<UP," in sh[0]:
else: return True # The interface is up.
sp.Popen(["ifconfig", iface, "up"]).wait() elif sh[1]:
return True return False # The interface doesn't exists (most of the cases).
else:
def check_internet(self): sp.Popen(["ifconfig", iface, "up"]).wait()
""" return True
Check the internet link just with a small http request
to an URL present in the configuration def check_internet(self):
"""
:return: bool - if the request succeed or not. Check the internet link just with a small http request
""" to an URL present in the configuration
try:
url = read_config(("network", "internet_check")) :return: bool - if the request succeed or not.
requests.get(url, timeout=10) """
return True try:
except: url = read_config(("network", "internet_check"))
return False requests.get(url, timeout=10)
return True
except:
return False
def set_ap_channel(self):
"""
Deduce the channel to have for the AP in order to prevent
kind of jamming between the two wifi interfaces.
"""
# Get the channel of the connected interface
sh = sp.Popen(["iw", self.iface_out, "info"],
stdout=sp.PIPE, stderr=sp.PIPE).communicate()
res = re.search("channel ([0-9]{1,2})", sh[0].decode('utf8'))
chn = res.group(1)
# Return a good candidate.
return "11" if int(chn) < 7 else "1"