Compare commits

...

1 Commits

Author SHA1 Message Date
pixeebot[bot]
0a636e4a76
Add timeout to requests calls 2024-05-30 21:28:43 +00:00
2 changed files with 231 additions and 231 deletions

View File

@ -1,149 +1,149 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from app.utils import read_config from app.utils import read_config
from app.classes.iocs import IOCs from app.classes.iocs import IOCs
from app.classes.whitelist import WhiteList from app.classes.whitelist import WhiteList
from app.classes.misp import MISP from app.classes.misp import MISP
import requests import requests
import json import json
import urllib3 import urllib3
import time import time
from multiprocessing import Process from multiprocessing import Process
""" """
This file is parsing the watchers present This file is parsing the watchers present
in the configuration file. This in order to get in the configuration file. This in order to get
automatically new iocs / elements from remote automatically new iocs / elements from remote
sources without user interaction. sources without user interaction.
""" """
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def watch_iocs(): def watch_iocs():
""" """
Retrieve IOCs from the remote URLs defined in config/watchers. Retrieve IOCs from the remote URLs defined in config/watchers.
For each IOC, add it to the DB. For each IOC, add it to the DB.
""" """
# Retrieve the URLs from the configuration # Retrieve the URLs from the configuration
urls = read_config(("watchers", "iocs")) urls = read_config(("watchers", "iocs"))
watchers = [{"url": url, "status": False} for url in urls] watchers = [{"url": url, "status": False} for url in urls]
while True: while True:
for w in watchers: for w in watchers:
if w["status"] == False: if w["status"] == False:
iocs = IOCs() iocs = IOCs()
iocs_list = [] iocs_list = []
to_delete = [] to_delete = []
try: try:
res = requests.get(w["url"], verify=False) res = requests.get(w["url"], verify=False, timeout=60)
if res.status_code == 200: if res.status_code == 200:
content = json.loads(res.content) content = json.loads(res.content)
iocs_list = content["iocs"] if "iocs" in content else [] iocs_list = content["iocs"] if "iocs" in content else []
to_delete = content["to_delete"] if "to_delete" in content else [] to_delete = content["to_delete"] if "to_delete" in content else []
else: else:
w["status"] = False w["status"] = False
except: except:
w["status"] = False w["status"] = False
for ioc in iocs_list: for ioc in iocs_list:
try: try:
iocs.add(ioc["type"], ioc["tag"], iocs.add(ioc["type"], ioc["tag"],
ioc["tlp"], ioc["value"], "watcher") ioc["tlp"], ioc["value"], "watcher")
w["status"] = True w["status"] = True
except: except:
continue continue
for ioc in to_delete: for ioc in to_delete:
try: try:
iocs.delete_by_value(ioc["value"]) iocs.delete_by_value(ioc["value"])
w["status"] = True w["status"] = True
except: except:
continue continue
# If at least one URL haven't be parsed, let's retry in 1min. # If at least one URL haven't be parsed, let's retry in 1min.
if False in [w["status"] for w in watchers]: if False in [w["status"] for w in watchers]:
time.sleep(60) time.sleep(60)
else: else:
break break
def watch_whitelists(): def watch_whitelists():
""" """
Retrieve whitelist elements from the remote URLs Retrieve whitelist elements from the remote URLs
defined in config/watchers. For each (new ?) element, defined in config/watchers. For each (new ?) element,
add it to the DB. add it to the DB.
""" """
urls = read_config(("watchers", "whitelists")) urls = read_config(("watchers", "whitelists"))
watchers = [{"url": url, "status": False} for url in urls] watchers = [{"url": url, "status": False} for url in urls]
while True: while True:
for w in watchers: for w in watchers:
if w["status"] == False: if w["status"] == False:
whitelist = WhiteList() whitelist = WhiteList()
elements = [] elements = []
to_delete = [] to_delete = []
try: try:
res = requests.get(w["url"], verify=False) res = requests.get(w["url"], verify=False, timeout=60)
if res.status_code == 200: if res.status_code == 200:
content = json.loads(res.content) content = json.loads(res.content)
elements = content["elements"] if "elements" in content else [] elements = content["elements"] if "elements" in content else []
to_delete = content["to_delete"] if "to_delete" in content else [] to_delete = content["to_delete"] if "to_delete" in content else []
else: else:
w["status"] = False w["status"] = False
except: except:
w["status"] = False w["status"] = False
for elem in elements: for elem in elements:
try: try:
whitelist.add(elem["type"], elem["element"], "watcher") whitelist.add(elem["type"], elem["element"], "watcher")
w["status"] = True w["status"] = True
except: except:
continue continue
for elem in to_delete: for elem in to_delete:
try: try:
whitelist.delete_by_value(elem["element"]) whitelist.delete_by_value(elem["element"])
w["status"] = True w["status"] = True
except: except:
continue continue
if False in [w["status"] for w in watchers]: if False in [w["status"] for w in watchers]:
time.sleep(60) time.sleep(60)
else: else:
break break
def watch_misp(): def watch_misp():
""" """
Retrieve IOCs from misp instances. Each new element is Retrieve IOCs from misp instances. Each new element is
tested and then added to the database. tested and then added to the database.
""" """
iocs, misp = IOCs(), MISP() iocs, misp = IOCs(), MISP()
instances = [i for i in misp.get_instances()] instances = [i for i in misp.get_instances()]
while instances: while instances:
for i, ist in enumerate(instances): for i, ist in enumerate(instances):
status = misp.test_instance(ist["url"], status = misp.test_instance(ist["url"],
ist["apikey"], ist["apikey"],
ist["verifycert"]) ist["verifycert"])
if status: if status:
for ioc in misp.get_iocs(ist["id"]): for ioc in misp.get_iocs(ist["id"]):
iocs.add(ioc["type"], ioc["tag"], ioc["tlp"], iocs.add(ioc["type"], ioc["tag"], ioc["tlp"],
ioc["value"], "misp-{}".format(ist["id"])) ioc["value"], "misp-{}".format(ist["id"]))
misp.update_sync(ist["id"]) misp.update_sync(ist["id"])
instances.pop(i) instances.pop(i)
if instances: time.sleep(60) if instances: time.sleep(60)
p1 = Process(target=watch_iocs) p1 = Process(target=watch_iocs)
p2 = Process(target=watch_whitelists) p2 = Process(target=watch_whitelists)
p3 = Process(target=watch_misp) p3 = Process(target=watch_misp)
p1.start() p1.start()
p2.start() p2.start()
p3.start() p3.start()

View File

@ -1,82 +1,82 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from app.utils import read_config from app.utils import read_config
import subprocess as sp import subprocess as sp
import requests import requests
import json import json
import os import os
import re import re
class Update(object): class Update(object):
def __init__(self): def __init__(self):
self.project_url = "https://api.github.com/repos/KasperskyLab/TinyCheck/tags" self.project_url = "https://api.github.com/repos/KasperskyLab/TinyCheck/tags"
self.app_path = "/usr/share/tinycheck" self.app_path = "/usr/share/tinycheck"
return None return None
def check_version(self): def check_version(self):
""" """
Check if a new version of TinyCheck is available Check if a new version of TinyCheck is available
by quering the Github api and comparing the last by quering the Github api and comparing the last
tag inside the VERSION file. tag inside the VERSION file.
:return: dict containing the available versions. :return: dict containing the available versions.
""" """
if read_config(("frontend", "update")): if read_config(("frontend", "update")):
try: try:
res = requests.get(self.project_url) res = requests.get(self.project_url, timeout=60)
res = json.loads(res.content.decode("utf8")) res = json.loads(res.content.decode("utf8"))
with open(os.path.join(self.app_path, "VERSION")) as f: with open(os.path.join(self.app_path, "VERSION")) as f:
cv = f.read() cv = f.read()
if cv != res[0]["name"]: if cv != res[0]["name"]:
return {"status": True, return {"status": True,
"message": "A new version is available", "message": "A new version is available",
"current_version": cv, "current_version": cv,
"next_version": res[0]["name"]} "next_version": res[0]["name"]}
else: else:
return {"status": True, return {"status": True,
"message": "This is the latest version", "message": "This is the latest version",
"current_version": cv} "current_version": cv}
except: except:
return {"status": False, return {"status": False,
"message": "Something went wrong (no API access nor version file)"} "message": "Something went wrong (no API access nor version file)"}
else: else:
return {"status": False, return {"status": False,
"message": "You don't have rights to do this operation."} "message": "You don't have rights to do this operation."}
def get_current_version(self): def get_current_version(self):
""" """
Get the current version of the TinyCheck instance Get the current version of the TinyCheck instance
:return: dict containing the current version or error. :return: dict containing the current version or error.
""" """
if read_config(("frontend", "update")): if read_config(("frontend", "update")):
try: try:
with open(os.path.join(self.app_path, "VERSION")) as f: with open(os.path.join(self.app_path, "VERSION")) as f:
return {"status": True, return {"status": True,
"current_version": f.read()} "current_version": f.read()}
except: except:
return {"status": False, return {"status": False,
"message": "Something went wrong - no version file ?"} "message": "Something went wrong - no version file ?"}
else: else:
return {"status": False, return {"status": False,
"message": "You don't have rights to do this operation."} "message": "You don't have rights to do this operation."}
def update_instance(self): def update_instance(self):
""" """
Update the instance by executing the update script. Update the instance by executing the update script.
:return: dict containing the update status. :return: dict containing the update status.
""" """
if read_config(("frontend", "update")): if read_config(("frontend", "update")):
try: try:
os.chdir(self.app_path) os.chdir(self.app_path)
sp.Popen(["bash", os.path.join(self.app_path, "update.sh")]) sp.Popen(["bash", os.path.join(self.app_path, "update.sh")])
return {"status": True, return {"status": True,
"message": "Update successfully launched"} "message": "Update successfully launched"}
except: except:
return {"status": False, return {"status": False,
"message": "Issue during the update"} "message": "Issue during the update"}
else: else:
return {"status": False, return {"status": False,
"message": "You don't have rights to do this operation."} "message": "You don't have rights to do this operation."}