370 lines
13 KiB
Python
370 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import subprocess as sp
|
|
import netifaces as ni
|
|
import requests as rq
|
|
import re
|
|
import sys
|
|
import time
|
|
import qrcode
|
|
import base64
|
|
import random
|
|
import requests
|
|
|
|
from wifi import Cell
|
|
from os import path, remove
|
|
from io import BytesIO
|
|
from app.utils import terminate_process, read_config
|
|
|
|
|
|
class Network(object):
|
|
|
|
def __init__(self):
|
|
self.AP_SSID = False
|
|
self.AP_PASS = False
|
|
self.iface_in = read_config(("network", "in"))
|
|
self.iface_out = read_config(("network", "out"))
|
|
self.enable_interface(self.iface_in)
|
|
self.enable_interface(self.iface_out)
|
|
self.enable_forwarding()
|
|
self.reset_dnsmasq_leases()
|
|
self.random_choice_alphabet = "abcdef1234567890"
|
|
|
|
def check_status(self):
|
|
"""
|
|
The method check_status check the IP addressing of each interface
|
|
and return their associated IP.
|
|
|
|
:return: dict containing each interface status.
|
|
"""
|
|
|
|
ctx = {"interfaces": {
|
|
self.iface_in: False,
|
|
self.iface_out: False,
|
|
"eth0": False},
|
|
"internet": self.check_internet()}
|
|
|
|
for iface in ctx["interfaces"].keys():
|
|
try:
|
|
ip = ni.ifaddresses(iface)[ni.AF_INET][0]["addr"]
|
|
if not ip.startswith("127") or not ip.startswith("169.254"):
|
|
ctx["interfaces"][iface] = ip
|
|
except:
|
|
ctx["interfaces"][iface] = "Interface not connected or present."
|
|
return ctx
|
|
|
|
def wifi_list_networks(self):
|
|
"""
|
|
The method wifi_list_networks list the available WiFi networks
|
|
by using wifi python package.
|
|
:return: dict - containing the list of Wi-Fi networks.
|
|
"""
|
|
networks = []
|
|
try:
|
|
for n in Cell.all(self.iface_out):
|
|
if n.ssid not in [n["ssid"] for n in networks] and n.ssid and n.encrypted:
|
|
networks.append(
|
|
{"ssid": n.ssid, "type": n.encryption_type})
|
|
return {"networks": networks}
|
|
except:
|
|
return {"networks": []}
|
|
|
|
@staticmethod
|
|
def wifi_setup(ssid, password):
|
|
"""
|
|
Edit the wpa_supplicant file with provided credentials.
|
|
If the ssid already exists, just update the password. Otherwise
|
|
create a new entry in the file.
|
|
|
|
:return: dict containing the status of the operation
|
|
"""
|
|
if len(password) >= 8 and len(ssid):
|
|
found = False
|
|
networks = []
|
|
header, content = "", ""
|
|
|
|
with open("/etc/wpa_supplicant/wpa_supplicant.conf") as f:
|
|
content = f.read()
|
|
blocks = content.split("network={")
|
|
header = blocks[0]
|
|
|
|
for block in blocks[1:]:
|
|
net = {}
|
|
for line in block.splitlines():
|
|
if line and line != "}":
|
|
if "priority=10" not in line.strip():
|
|
key, val = line.strip().split("=")
|
|
if key != "disabled":
|
|
net[key] = val.replace("\"", "")
|
|
networks.append(net)
|
|
|
|
for net in networks:
|
|
if net["ssid"] == ssid:
|
|
net["psk"] = password.replace('"', '\\"')
|
|
net["priority"] = "10"
|
|
found = True
|
|
|
|
if not found:
|
|
networks.append({
|
|
"ssid": ssid,
|
|
"psk": password.replace('"', '\\"'),
|
|
"key_mgmt": "WPA-PSK",
|
|
"priority": "10"
|
|
})
|
|
|
|
with open("/etc/wpa_supplicant/wpa_supplicant.conf", "w+") as f:
|
|
content = header
|
|
for network in networks:
|
|
net = "network={\n"
|
|
for k, v in network.items():
|
|
if k in ["ssid", "psk"]:
|
|
net += " {}=\"{}\"\n".format(k, v)
|
|
else:
|
|
net += " {}={}\n".format(k, v)
|
|
net += "}\n\n"
|
|
content += net
|
|
if f.write(content):
|
|
return {"status": True,
|
|
"message": "Configuration saved"}
|
|
else:
|
|
return {"status": False,
|
|
"message": "Error while writing wpa_supplicant configuration file."}
|
|
else:
|
|
return {"status": False,
|
|
"message": "Empty SSID or/and password length less than 8 chars."}
|
|
|
|
def wifi_connect(self):
|
|
"""
|
|
Connect to one of the WiFi networks present in the wpa_supplicant.conf.
|
|
:return: dict containing the TinyCheck <-> AP status.
|
|
"""
|
|
|
|
# Kill wpa_supplicant instances, if any.
|
|
terminate_process("wpa_supplicant")
|
|
# Launch a new instance of wpa_supplicant.
|
|
sp.Popen(["wpa_supplicant", "-B", "-i", self.iface_out, "-c",
|
|
"/etc/wpa_supplicant/wpa_supplicant.conf"]).wait()
|
|
# Check internet status
|
|
for _ in range(1, 40):
|
|
if self.check_internet():
|
|
return {"status": True,
|
|
"message": "Wifi connected"}
|
|
time.sleep(1)
|
|
|
|
return {"status": False,
|
|
"message": "Wifi not connected"}
|
|
|
|
def start_ap(self):
|
|
"""
|
|
The start_ap method generates an Access Point by using HostApd
|
|
and provide to the GUI the associated ssid, password and qrcode.
|
|
|
|
:return: dict containing the status of the AP
|
|
"""
|
|
|
|
# Re-ask to enable interface, sometimes it just go away.
|
|
if not self.enable_interface(self.iface_out):
|
|
return {"status": False,
|
|
"message": "Interface not present."}
|
|
|
|
# Generate the hostapd configuration
|
|
if read_config(("network", "tokenized_ssids")):
|
|
token = "".join([random.choice(self.random_choice_alphabet)
|
|
for i in range(4)])
|
|
self.AP_SSID = random.choice(read_config(
|
|
("network", "ssids"))) + "-" + token
|
|
else:
|
|
self.AP_SSID = random.choice(read_config(("network", "ssids")))
|
|
self.AP_PASS = "".join(
|
|
[random.choice(self.random_choice_alphabet) for i in range(8)])
|
|
|
|
# Launch hostapd
|
|
if self.write_hostapd_config():
|
|
if self.lauch_hostapd() and self.reset_dnsmasq_leases():
|
|
return {"status": True,
|
|
"message": "AP started",
|
|
"ssid": self.AP_SSID,
|
|
"password": self.AP_PASS,
|
|
"qrcode": self.generate_qr_code()}
|
|
else:
|
|
return {"status": False,
|
|
"message": "Error while creating AP."}
|
|
else:
|
|
return {"status": False,
|
|
"message": "Error while writing hostapd configuration file."}
|
|
|
|
def generate_qr_code(self):
|
|
"""
|
|
The method generate_qr_code returns a QRCode based on
|
|
the SSID and the password.
|
|
|
|
:return: - string containing the PNG of the QRCode.
|
|
"""
|
|
qrc = qrcode.make("WIFI:S:{};T:WPA;P:{};;".format(
|
|
self.AP_SSID, self.AP_PASS))
|
|
buffered = BytesIO()
|
|
qrc.save(buffered, format="PNG")
|
|
return "data:image/png;base64,{}".format(base64.b64encode(buffered.getvalue()).decode("utf8"))
|
|
|
|
def write_hostapd_config(self):
|
|
"""
|
|
The method write_hostapd_config write the hostapd configuration
|
|
under a temporary location defined in the config file.
|
|
|
|
:return: bool - if hostapd configuration file created
|
|
"""
|
|
try:
|
|
chan = self.set_ap_channel()
|
|
with open("{}/app/assets/hostapd.conf".format(sys.path[0]), "r") as f:
|
|
conf = f.read()
|
|
conf = conf.replace("{IFACE}", self.iface_in)
|
|
conf = conf.replace("{SSID}", self.AP_SSID)
|
|
conf = conf.replace("{PASS}", self.AP_PASS)
|
|
conf = conf.replace("{CHAN}", chan)
|
|
with open("/tmp/hostapd.conf", "w") as c:
|
|
c.write(conf)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def lauch_hostapd(self):
|
|
"""
|
|
The method lauch_hostapd kill old instance of hostapd and launch a
|
|
new one as a background process.
|
|
|
|
:return: bool - if hostapd sucessfully launched.
|
|
"""
|
|
|
|
# Kill potential zombies of hostapd
|
|
terminate_process("hostapd")
|
|
|
|
sp.Popen(["ip","link","set", self.iface_in, "up"]).wait()
|
|
sp.Popen(
|
|
"/usr/sbin/hostapd /tmp/hostapd.conf > /tmp/hostapd.log", shell=True)
|
|
|
|
while True:
|
|
if path.isfile("/tmp/hostapd.log"):
|
|
with open("/tmp/hostapd.log", "r") as f:
|
|
log = f.read()
|
|
err = ["Could not configure driver mode",
|
|
"driver initialization failed"]
|
|
if not any(e in log for e in err):
|
|
if "AP-ENABLED" in log:
|
|
return True
|
|
else:
|
|
return False
|
|
time.sleep(1)
|
|
|
|
def stop_hostapd(self):
|
|
"""
|
|
Stop hostapd instance.
|
|
|
|
:return: dict - a little message for debug.
|
|
"""
|
|
if terminate_process("hostapd"):
|
|
return {"status": True,
|
|
"message": "AP stopped"}
|
|
else:
|
|
return {"status": False,
|
|
"message": "No AP running"}
|
|
|
|
def reset_dnsmasq_leases(self):
|
|
"""
|
|
This method reset the DNSMasq leases and logs to get the new
|
|
connected device name & new DNS entries.
|
|
|
|
:return: bool if everything goes well
|
|
"""
|
|
try:
|
|
sp.Popen("service dnsmasq stop", shell=True).wait()
|
|
sp.Popen("cp /dev/null /var/lib/misc/dnsmasq.leases",
|
|
shell=True).wait()
|
|
sp.Popen("cp /dev/null /var/log/messages.log", shell=True).wait()
|
|
sp.Popen("service dnsmasq start", shell=True).wait()
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def enable_forwarding(self):
|
|
"""
|
|
This enable forwarding to get internet working on the connected device.
|
|
Method tiggered during the Network class intialization.
|
|
|
|
:return: bool if everything goes well
|
|
"""
|
|
try:
|
|
sp.Popen("echo 1 > /proc/sys/net/ipv4/ip_forward",
|
|
shell=True).wait()
|
|
|
|
# Enable forwarding.
|
|
|
|
sp.Popen("nft add table nat",shell=True).wait()
|
|
sp.Popen("nft 'add chain nat prerouting { type nat hook prerouting priority 100; }'",shell=True).wait()
|
|
sp.Popen("nft 'add chain nat postrouting { type nat hook postrouting priority 100; }'",shell=True).wait()
|
|
sp.Popen("nft add table ip filter",shell=True).wait()
|
|
sp.Popen("nft 'add chain ip filter INPUT { type filter hook input priority 0; }'",shell=True).wait()
|
|
|
|
|
|
sp.Popen(["nft","add","rule","ip","nat","postrouting","oifname",
|
|
self.iface_out,"counter","masquerade"]).wait()
|
|
|
|
# Prevent the device to reach the 80 and 443 of TinyCheck.
|
|
sp.Popen(["nft","add","rule","ip","filter","INPUT","iifname",self.iface_in,"ip",
|
|
"protocol","tcp","ip","daddr","192.168.100.1","tcp","dport","{ 80,443}","counter","drop"]).wait()
|
|
|
|
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def enable_interface(self, iface):
|
|
"""
|
|
This enable interfaces, with a simple check.
|
|
:return: bool if everything goes well
|
|
"""
|
|
sh = sp.Popen(["ip" ,"a","s", iface],
|
|
stdout=sp.PIPE, stderr=sp.PIPE)
|
|
sh = sh.communicate()
|
|
if b",UP" in sh[0]:
|
|
return True # The interface is up.
|
|
elif sh[1]:
|
|
return False # The interface doesn't exists (most of the cases).
|
|
else:
|
|
sp.Popen(["ip","link","set", iface, "up"]).wait()
|
|
return True
|
|
|
|
def check_internet(self):
|
|
"""
|
|
Check the internet link just with a small http request
|
|
to an URL present in the configuration
|
|
|
|
:return: bool - if the request succeed or not.
|
|
"""
|
|
try:
|
|
url = read_config(("network", "internet_check"))
|
|
requests.get(url, timeout=10)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def set_ap_channel(self):
|
|
"""
|
|
Deduce the channel to have for the AP in order to prevent
|
|
kind of jamming between the two wifi interfaces.
|
|
"""
|
|
try:
|
|
if self.iface_out[0] == "w":
|
|
# Get the channel of the connected interface
|
|
sh = sp.Popen(["iw", self.iface_out, "info"],
|
|
stdout=sp.PIPE, stderr=sp.PIPE).communicate()
|
|
res = re.search("channel ([0-9]{1,2})", sh[0].decode('utf8'))
|
|
chn = res.group(1)
|
|
|
|
# Return a good candidate.
|
|
return "11" if int(chn) < 7 else "1"
|
|
else:
|
|
return "1"
|
|
except:
|
|
return "1"
|