Compare commits
1 Commits
pixeebot/d
...
pixeebot/d
Author | SHA1 | Date | |
---|---|---|---|
0a636e4a76 |
@ -1,178 +1,178 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from json import loads, dumps
|
||||
from collections import OrderedDict
|
||||
from datetime import datetime
|
||||
from traceback import print_exc
|
||||
|
||||
# Taken from https://github.com/dgunter/ParseZeekLogs <3
|
||||
|
||||
|
||||
class ParseZeekLogs(object):
|
||||
"""
|
||||
Class that parses Zeek logs and allows log data to be output in CSV or json format.
|
||||
Attributes: filepath: Path of Zeek log file to read
|
||||
"""
|
||||
|
||||
def __init__(self, filepath, batchsize=500, fields=None, output_format=None, ignore_keys=[], meta={}, safe_headers=False):
|
||||
self.fd = open(filepath, "r")
|
||||
self.options = OrderedDict()
|
||||
self.firstRun = True
|
||||
self.filtered_fields = fields
|
||||
self.batchsize = batchsize
|
||||
self.output_format = output_format
|
||||
self.ignore_keys = ignore_keys
|
||||
self.meta = meta
|
||||
self.safe_headers = safe_headers
|
||||
|
||||
# Convert ' to " in meta string
|
||||
meta = loads(dumps(meta).replace("'", '"'))
|
||||
|
||||
# Read the header option lines
|
||||
l = self.fd.readline(5_000_000).strip()
|
||||
while l.strip().startswith("#"):
|
||||
# Parse the options out
|
||||
if l.startswith("#separator"):
|
||||
key = str(l[1:].split(" ")[0])
|
||||
value = str.encode(l[1:].split(
|
||||
" ")[1].strip()).decode('unicode_escape')
|
||||
self.options[key] = value
|
||||
elif l.startswith("#"):
|
||||
key = str(l[1:].split(self.options.get('separator'))[0])
|
||||
value = l[1:].split(self.options.get('separator'))[1:]
|
||||
self.options[key] = value
|
||||
|
||||
# Read the next line
|
||||
l = self.fd.readline(5_000_000).strip()
|
||||
|
||||
self.firstLine = l
|
||||
|
||||
# Save mapping of fields to values:
|
||||
self.fields = self.options.get('fields')
|
||||
self.types = self.options.get('types')
|
||||
|
||||
self.data_types = {}
|
||||
for i, val in enumerate(self.fields):
|
||||
# Convert field names if safe_headers is enabled
|
||||
if self.safe_headers is True:
|
||||
self.fields[i] = self.fields[i].replace(".", "_")
|
||||
|
||||
# Match types with each other
|
||||
self.data_types[self.fields[i]] = self.types[i]
|
||||
|
||||
def __del__(self):
|
||||
self.fd.close()
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
retVal = ""
|
||||
if self.firstRun is True:
|
||||
retVal = self.firstLine
|
||||
self.firstRun = False
|
||||
else:
|
||||
retVal = self.fd.readline().strip()
|
||||
|
||||
# If an empty string is returned, readline is done reading
|
||||
if retVal == "" or retVal is None:
|
||||
raise StopIteration
|
||||
|
||||
# Split out the data we are going to return
|
||||
retVal = retVal.split(self.options.get('separator'))
|
||||
|
||||
record = None
|
||||
# Make sure we aren't dealing with a comment line
|
||||
if len(retVal) > 0 and not str(retVal[0]).strip().startswith("#") \
|
||||
and len(retVal) is len(self.options.get("fields")):
|
||||
record = OrderedDict()
|
||||
# Prepare fields for conversion
|
||||
for x in range(0, len(retVal)):
|
||||
if self.safe_headers is True:
|
||||
converted_field_name = self.options.get(
|
||||
"fields")[x].replace(".", "_")
|
||||
else:
|
||||
converted_field_name = self.options.get("fields")[x]
|
||||
if self.filtered_fields is None or converted_field_name in self.filtered_fields:
|
||||
# Translate - to "" to fix a conversation error
|
||||
if retVal[x] == "-":
|
||||
retVal[x] = ""
|
||||
# Save the record field if the field isn't filtered out
|
||||
record[converted_field_name] = retVal[x]
|
||||
|
||||
# Convert values to the appropriate record type
|
||||
record = self.convert_values(
|
||||
record, self.ignore_keys, self.data_types)
|
||||
|
||||
if record is not None and self.output_format == "json":
|
||||
# Output will be json
|
||||
|
||||
# Add metadata to json
|
||||
for k, v in self.meta.items():
|
||||
record[k] = v
|
||||
|
||||
retVal = record
|
||||
elif record is not None and self.output_format == "csv":
|
||||
retVal = ""
|
||||
# Add escaping to csv format
|
||||
for k, v in record.items():
|
||||
# Add escaping to string values
|
||||
if isinstance(v, str):
|
||||
retVal += str("\"" + str(v).strip() + "\"" + ",")
|
||||
else:
|
||||
retVal += str(str(v).strip() + ",")
|
||||
# Remove the trailing comma
|
||||
retVal = retVal[:-1]
|
||||
else:
|
||||
retVal = None
|
||||
|
||||
return retVal
|
||||
|
||||
def convert_values(self, data, ignore_keys=[], data_types={}):
|
||||
keys_to_delete = []
|
||||
for k, v in data.items():
|
||||
# print("evaluating k: " + str(k) + " v: " + str(v))
|
||||
|
||||
if isinstance(v, dict):
|
||||
data[k] = self.convert_values(v)
|
||||
else:
|
||||
if data_types.get(k) is not None:
|
||||
if (data_types.get(k) == "port" or data_types.get(k) == "count"):
|
||||
if v != "":
|
||||
data[k] = int(v)
|
||||
else:
|
||||
keys_to_delete.append(k)
|
||||
elif (data_types.get(k) == "double" or data_types.get(k) == "interval"):
|
||||
if v != "":
|
||||
data[k] = float(v)
|
||||
else:
|
||||
keys_to_delete.append(k)
|
||||
elif data_types.get(k) == "bool":
|
||||
data[k] = bool(v)
|
||||
else:
|
||||
data[k] = v
|
||||
|
||||
for k in keys_to_delete:
|
||||
del data[k]
|
||||
|
||||
return data
|
||||
|
||||
def get_fields(self):
|
||||
"""Returns all fields present in the log file
|
||||
Returns:
|
||||
A python list containing all field names in the log file
|
||||
"""
|
||||
field_names = ""
|
||||
if self.output_format == "csv":
|
||||
for i, v in enumerate(self.fields):
|
||||
if self.filtered_fields is None or v in self.filtered_fields:
|
||||
field_names += str(v) + ","
|
||||
# Remove the trailing comma
|
||||
field_names = field_names[:-1].strip()
|
||||
else:
|
||||
field_names = []
|
||||
for i, v in enumerate(self.fields):
|
||||
if self.filtered_fields is None or v in self.filtered_fields:
|
||||
field_names.append(v)
|
||||
return field_names
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from json import loads, dumps
|
||||
from collections import OrderedDict
|
||||
from datetime import datetime
|
||||
from traceback import print_exc
|
||||
|
||||
# Taken from https://github.com/dgunter/ParseZeekLogs <3
|
||||
|
||||
|
||||
class ParseZeekLogs(object):
|
||||
"""
|
||||
Class that parses Zeek logs and allows log data to be output in CSV or json format.
|
||||
Attributes: filepath: Path of Zeek log file to read
|
||||
"""
|
||||
|
||||
def __init__(self, filepath, batchsize=500, fields=None, output_format=None, ignore_keys=[], meta={}, safe_headers=False):
|
||||
self.fd = open(filepath, "r")
|
||||
self.options = OrderedDict()
|
||||
self.firstRun = True
|
||||
self.filtered_fields = fields
|
||||
self.batchsize = batchsize
|
||||
self.output_format = output_format
|
||||
self.ignore_keys = ignore_keys
|
||||
self.meta = meta
|
||||
self.safe_headers = safe_headers
|
||||
|
||||
# Convert ' to " in meta string
|
||||
meta = loads(dumps(meta).replace("'", '"'))
|
||||
|
||||
# Read the header option lines
|
||||
l = self.fd.readline().strip()
|
||||
while l.strip().startswith("#"):
|
||||
# Parse the options out
|
||||
if l.startswith("#separator"):
|
||||
key = str(l[1:].split(" ")[0])
|
||||
value = str.encode(l[1:].split(
|
||||
" ")[1].strip()).decode('unicode_escape')
|
||||
self.options[key] = value
|
||||
elif l.startswith("#"):
|
||||
key = str(l[1:].split(self.options.get('separator'))[0])
|
||||
value = l[1:].split(self.options.get('separator'))[1:]
|
||||
self.options[key] = value
|
||||
|
||||
# Read the next line
|
||||
l = self.fd.readline().strip()
|
||||
|
||||
self.firstLine = l
|
||||
|
||||
# Save mapping of fields to values:
|
||||
self.fields = self.options.get('fields')
|
||||
self.types = self.options.get('types')
|
||||
|
||||
self.data_types = {}
|
||||
for i, val in enumerate(self.fields):
|
||||
# Convert field names if safe_headers is enabled
|
||||
if self.safe_headers is True:
|
||||
self.fields[i] = self.fields[i].replace(".", "_")
|
||||
|
||||
# Match types with each other
|
||||
self.data_types[self.fields[i]] = self.types[i]
|
||||
|
||||
def __del__(self):
|
||||
self.fd.close()
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
retVal = ""
|
||||
if self.firstRun is True:
|
||||
retVal = self.firstLine
|
||||
self.firstRun = False
|
||||
else:
|
||||
retVal = self.fd.readline().strip()
|
||||
|
||||
# If an empty string is returned, readline is done reading
|
||||
if retVal == "" or retVal is None:
|
||||
raise StopIteration
|
||||
|
||||
# Split out the data we are going to return
|
||||
retVal = retVal.split(self.options.get('separator'))
|
||||
|
||||
record = None
|
||||
# Make sure we aren't dealing with a comment line
|
||||
if len(retVal) > 0 and not str(retVal[0]).strip().startswith("#") \
|
||||
and len(retVal) is len(self.options.get("fields")):
|
||||
record = OrderedDict()
|
||||
# Prepare fields for conversion
|
||||
for x in range(0, len(retVal)):
|
||||
if self.safe_headers is True:
|
||||
converted_field_name = self.options.get(
|
||||
"fields")[x].replace(".", "_")
|
||||
else:
|
||||
converted_field_name = self.options.get("fields")[x]
|
||||
if self.filtered_fields is None or converted_field_name in self.filtered_fields:
|
||||
# Translate - to "" to fix a conversation error
|
||||
if retVal[x] == "-":
|
||||
retVal[x] = ""
|
||||
# Save the record field if the field isn't filtered out
|
||||
record[converted_field_name] = retVal[x]
|
||||
|
||||
# Convert values to the appropriate record type
|
||||
record = self.convert_values(
|
||||
record, self.ignore_keys, self.data_types)
|
||||
|
||||
if record is not None and self.output_format == "json":
|
||||
# Output will be json
|
||||
|
||||
# Add metadata to json
|
||||
for k, v in self.meta.items():
|
||||
record[k] = v
|
||||
|
||||
retVal = record
|
||||
elif record is not None and self.output_format == "csv":
|
||||
retVal = ""
|
||||
# Add escaping to csv format
|
||||
for k, v in record.items():
|
||||
# Add escaping to string values
|
||||
if isinstance(v, str):
|
||||
retVal += str("\"" + str(v).strip() + "\"" + ",")
|
||||
else:
|
||||
retVal += str(str(v).strip() + ",")
|
||||
# Remove the trailing comma
|
||||
retVal = retVal[:-1]
|
||||
else:
|
||||
retVal = None
|
||||
|
||||
return retVal
|
||||
|
||||
def convert_values(self, data, ignore_keys=[], data_types={}):
|
||||
keys_to_delete = []
|
||||
for k, v in data.items():
|
||||
# print("evaluating k: " + str(k) + " v: " + str(v))
|
||||
|
||||
if isinstance(v, dict):
|
||||
data[k] = self.convert_values(v)
|
||||
else:
|
||||
if data_types.get(k) is not None:
|
||||
if (data_types.get(k) == "port" or data_types.get(k) == "count"):
|
||||
if v != "":
|
||||
data[k] = int(v)
|
||||
else:
|
||||
keys_to_delete.append(k)
|
||||
elif (data_types.get(k) == "double" or data_types.get(k) == "interval"):
|
||||
if v != "":
|
||||
data[k] = float(v)
|
||||
else:
|
||||
keys_to_delete.append(k)
|
||||
elif data_types.get(k) == "bool":
|
||||
data[k] = bool(v)
|
||||
else:
|
||||
data[k] = v
|
||||
|
||||
for k in keys_to_delete:
|
||||
del data[k]
|
||||
|
||||
return data
|
||||
|
||||
def get_fields(self):
|
||||
"""Returns all fields present in the log file
|
||||
Returns:
|
||||
A python list containing all field names in the log file
|
||||
"""
|
||||
field_names = ""
|
||||
if self.output_format == "csv":
|
||||
for i, v in enumerate(self.fields):
|
||||
if self.filtered_fields is None or v in self.filtered_fields:
|
||||
field_names += str(v) + ","
|
||||
# Remove the trailing comma
|
||||
field_names = field_names[:-1].strip()
|
||||
else:
|
||||
field_names = []
|
||||
for i, v in enumerate(self.fields):
|
||||
if self.filtered_fields is None or v in self.filtered_fields:
|
||||
field_names.append(v)
|
||||
return field_names
|
||||
|
@ -1,149 +1,149 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from app.utils import read_config
|
||||
from app.classes.iocs import IOCs
|
||||
from app.classes.whitelist import WhiteList
|
||||
from app.classes.misp import MISP
|
||||
|
||||
import requests
|
||||
import json
|
||||
import urllib3
|
||||
import time
|
||||
from multiprocessing import Process
|
||||
|
||||
"""
|
||||
This file is parsing the watchers present
|
||||
in the configuration file. This in order to get
|
||||
automatically new iocs / elements from remote
|
||||
sources without user interaction.
|
||||
"""
|
||||
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
|
||||
def watch_iocs():
|
||||
"""
|
||||
Retrieve IOCs from the remote URLs defined in config/watchers.
|
||||
For each IOC, add it to the DB.
|
||||
"""
|
||||
|
||||
# Retrieve the URLs from the configuration
|
||||
urls = read_config(("watchers", "iocs"))
|
||||
watchers = [{"url": url, "status": False} for url in urls]
|
||||
|
||||
while True:
|
||||
for w in watchers:
|
||||
if w["status"] == False:
|
||||
iocs = IOCs()
|
||||
iocs_list = []
|
||||
to_delete = []
|
||||
try:
|
||||
res = requests.get(w["url"], verify=False)
|
||||
if res.status_code == 200:
|
||||
content = json.loads(res.content)
|
||||
iocs_list = content["iocs"] if "iocs" in content else []
|
||||
to_delete = content["to_delete"] if "to_delete" in content else []
|
||||
else:
|
||||
w["status"] = False
|
||||
except:
|
||||
w["status"] = False
|
||||
|
||||
for ioc in iocs_list:
|
||||
try:
|
||||
iocs.add(ioc["type"], ioc["tag"],
|
||||
ioc["tlp"], ioc["value"], "watcher")
|
||||
w["status"] = True
|
||||
except:
|
||||
continue
|
||||
|
||||
for ioc in to_delete:
|
||||
try:
|
||||
iocs.delete_by_value(ioc["value"])
|
||||
w["status"] = True
|
||||
except:
|
||||
continue
|
||||
|
||||
# If at least one URL haven't be parsed, let's retry in 1min.
|
||||
if False in [w["status"] for w in watchers]:
|
||||
time.sleep(60)
|
||||
else:
|
||||
break
|
||||
|
||||
|
||||
def watch_whitelists():
|
||||
"""
|
||||
Retrieve whitelist elements from the remote URLs
|
||||
defined in config/watchers. For each (new ?) element,
|
||||
add it to the DB.
|
||||
"""
|
||||
|
||||
urls = read_config(("watchers", "whitelists"))
|
||||
watchers = [{"url": url, "status": False} for url in urls]
|
||||
|
||||
while True:
|
||||
for w in watchers:
|
||||
if w["status"] == False:
|
||||
whitelist = WhiteList()
|
||||
elements = []
|
||||
to_delete = []
|
||||
try:
|
||||
res = requests.get(w["url"], verify=False)
|
||||
if res.status_code == 200:
|
||||
content = json.loads(res.content)
|
||||
elements = content["elements"] if "elements" in content else []
|
||||
to_delete = content["to_delete"] if "to_delete" in content else []
|
||||
else:
|
||||
w["status"] = False
|
||||
except:
|
||||
w["status"] = False
|
||||
|
||||
for elem in elements:
|
||||
try:
|
||||
whitelist.add(elem["type"], elem["element"], "watcher")
|
||||
w["status"] = True
|
||||
except:
|
||||
continue
|
||||
|
||||
for elem in to_delete:
|
||||
try:
|
||||
whitelist.delete_by_value(elem["element"])
|
||||
w["status"] = True
|
||||
except:
|
||||
continue
|
||||
|
||||
if False in [w["status"] for w in watchers]:
|
||||
time.sleep(60)
|
||||
else:
|
||||
break
|
||||
|
||||
|
||||
def watch_misp():
|
||||
"""
|
||||
Retrieve IOCs from misp instances. Each new element is
|
||||
tested and then added to the database.
|
||||
"""
|
||||
iocs, misp = IOCs(), MISP()
|
||||
instances = [i for i in misp.get_instances()]
|
||||
|
||||
while instances:
|
||||
for i, ist in enumerate(instances):
|
||||
status = misp.test_instance(ist["url"],
|
||||
ist["apikey"],
|
||||
ist["verifycert"])
|
||||
if status:
|
||||
for ioc in misp.get_iocs(ist["id"]):
|
||||
iocs.add(ioc["type"], ioc["tag"], ioc["tlp"],
|
||||
ioc["value"], "misp-{}".format(ist["id"]))
|
||||
misp.update_sync(ist["id"])
|
||||
instances.pop(i)
|
||||
if instances: time.sleep(60)
|
||||
|
||||
|
||||
p1 = Process(target=watch_iocs)
|
||||
p2 = Process(target=watch_whitelists)
|
||||
p3 = Process(target=watch_misp)
|
||||
|
||||
p1.start()
|
||||
p2.start()
|
||||
p3.start()
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from app.utils import read_config
|
||||
from app.classes.iocs import IOCs
|
||||
from app.classes.whitelist import WhiteList
|
||||
from app.classes.misp import MISP
|
||||
|
||||
import requests
|
||||
import json
|
||||
import urllib3
|
||||
import time
|
||||
from multiprocessing import Process
|
||||
|
||||
"""
|
||||
This file is parsing the watchers present
|
||||
in the configuration file. This in order to get
|
||||
automatically new iocs / elements from remote
|
||||
sources without user interaction.
|
||||
"""
|
||||
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
|
||||
def watch_iocs():
|
||||
"""
|
||||
Retrieve IOCs from the remote URLs defined in config/watchers.
|
||||
For each IOC, add it to the DB.
|
||||
"""
|
||||
|
||||
# Retrieve the URLs from the configuration
|
||||
urls = read_config(("watchers", "iocs"))
|
||||
watchers = [{"url": url, "status": False} for url in urls]
|
||||
|
||||
while True:
|
||||
for w in watchers:
|
||||
if w["status"] == False:
|
||||
iocs = IOCs()
|
||||
iocs_list = []
|
||||
to_delete = []
|
||||
try:
|
||||
res = requests.get(w["url"], verify=False, timeout=60)
|
||||
if res.status_code == 200:
|
||||
content = json.loads(res.content)
|
||||
iocs_list = content["iocs"] if "iocs" in content else []
|
||||
to_delete = content["to_delete"] if "to_delete" in content else []
|
||||
else:
|
||||
w["status"] = False
|
||||
except:
|
||||
w["status"] = False
|
||||
|
||||
for ioc in iocs_list:
|
||||
try:
|
||||
iocs.add(ioc["type"], ioc["tag"],
|
||||
ioc["tlp"], ioc["value"], "watcher")
|
||||
w["status"] = True
|
||||
except:
|
||||
continue
|
||||
|
||||
for ioc in to_delete:
|
||||
try:
|
||||
iocs.delete_by_value(ioc["value"])
|
||||
w["status"] = True
|
||||
except:
|
||||
continue
|
||||
|
||||
# If at least one URL haven't be parsed, let's retry in 1min.
|
||||
if False in [w["status"] for w in watchers]:
|
||||
time.sleep(60)
|
||||
else:
|
||||
break
|
||||
|
||||
|
||||
def watch_whitelists():
|
||||
"""
|
||||
Retrieve whitelist elements from the remote URLs
|
||||
defined in config/watchers. For each (new ?) element,
|
||||
add it to the DB.
|
||||
"""
|
||||
|
||||
urls = read_config(("watchers", "whitelists"))
|
||||
watchers = [{"url": url, "status": False} for url in urls]
|
||||
|
||||
while True:
|
||||
for w in watchers:
|
||||
if w["status"] == False:
|
||||
whitelist = WhiteList()
|
||||
elements = []
|
||||
to_delete = []
|
||||
try:
|
||||
res = requests.get(w["url"], verify=False, timeout=60)
|
||||
if res.status_code == 200:
|
||||
content = json.loads(res.content)
|
||||
elements = content["elements"] if "elements" in content else []
|
||||
to_delete = content["to_delete"] if "to_delete" in content else []
|
||||
else:
|
||||
w["status"] = False
|
||||
except:
|
||||
w["status"] = False
|
||||
|
||||
for elem in elements:
|
||||
try:
|
||||
whitelist.add(elem["type"], elem["element"], "watcher")
|
||||
w["status"] = True
|
||||
except:
|
||||
continue
|
||||
|
||||
for elem in to_delete:
|
||||
try:
|
||||
whitelist.delete_by_value(elem["element"])
|
||||
w["status"] = True
|
||||
except:
|
||||
continue
|
||||
|
||||
if False in [w["status"] for w in watchers]:
|
||||
time.sleep(60)
|
||||
else:
|
||||
break
|
||||
|
||||
|
||||
def watch_misp():
|
||||
"""
|
||||
Retrieve IOCs from misp instances. Each new element is
|
||||
tested and then added to the database.
|
||||
"""
|
||||
iocs, misp = IOCs(), MISP()
|
||||
instances = [i for i in misp.get_instances()]
|
||||
|
||||
while instances:
|
||||
for i, ist in enumerate(instances):
|
||||
status = misp.test_instance(ist["url"],
|
||||
ist["apikey"],
|
||||
ist["verifycert"])
|
||||
if status:
|
||||
for ioc in misp.get_iocs(ist["id"]):
|
||||
iocs.add(ioc["type"], ioc["tag"], ioc["tlp"],
|
||||
ioc["value"], "misp-{}".format(ist["id"]))
|
||||
misp.update_sync(ist["id"])
|
||||
instances.pop(i)
|
||||
if instances: time.sleep(60)
|
||||
|
||||
|
||||
p1 = Process(target=watch_iocs)
|
||||
p2 = Process(target=watch_whitelists)
|
||||
p3 = Process(target=watch_misp)
|
||||
|
||||
p1.start()
|
||||
p2.start()
|
||||
p3.start()
|
||||
|
@ -1,82 +1,82 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from app.utils import read_config
|
||||
import subprocess as sp
|
||||
import requests
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
|
||||
|
||||
class Update(object):
|
||||
|
||||
def __init__(self):
|
||||
self.project_url = "https://api.github.com/repos/KasperskyLab/TinyCheck/tags"
|
||||
self.app_path = "/usr/share/tinycheck"
|
||||
return None
|
||||
|
||||
def check_version(self):
|
||||
"""
|
||||
Check if a new version of TinyCheck is available
|
||||
by quering the Github api and comparing the last
|
||||
tag inside the VERSION file.
|
||||
:return: dict containing the available versions.
|
||||
"""
|
||||
if read_config(("frontend", "update")):
|
||||
try:
|
||||
res = requests.get(self.project_url)
|
||||
res = json.loads(res.content.decode("utf8"))
|
||||
|
||||
with open(os.path.join(self.app_path, "VERSION")) as f:
|
||||
cv = f.read()
|
||||
if cv != res[0]["name"]:
|
||||
return {"status": True,
|
||||
"message": "A new version is available",
|
||||
"current_version": cv,
|
||||
"next_version": res[0]["name"]}
|
||||
else:
|
||||
return {"status": True,
|
||||
"message": "This is the latest version",
|
||||
"current_version": cv}
|
||||
except:
|
||||
return {"status": False,
|
||||
"message": "Something went wrong (no API access nor version file)"}
|
||||
else:
|
||||
return {"status": False,
|
||||
"message": "You don't have rights to do this operation."}
|
||||
|
||||
def get_current_version(self):
|
||||
"""
|
||||
Get the current version of the TinyCheck instance
|
||||
:return: dict containing the current version or error.
|
||||
"""
|
||||
if read_config(("frontend", "update")):
|
||||
try:
|
||||
with open(os.path.join(self.app_path, "VERSION")) as f:
|
||||
return {"status": True,
|
||||
"current_version": f.read()}
|
||||
except:
|
||||
return {"status": False,
|
||||
"message": "Something went wrong - no version file ?"}
|
||||
else:
|
||||
return {"status": False,
|
||||
"message": "You don't have rights to do this operation."}
|
||||
|
||||
def update_instance(self):
|
||||
"""
|
||||
Update the instance by executing the update script.
|
||||
:return: dict containing the update status.
|
||||
"""
|
||||
if read_config(("frontend", "update")):
|
||||
try:
|
||||
os.chdir(self.app_path)
|
||||
sp.Popen(["bash", os.path.join(self.app_path, "update.sh")])
|
||||
return {"status": True,
|
||||
"message": "Update successfully launched"}
|
||||
except:
|
||||
return {"status": False,
|
||||
"message": "Issue during the update"}
|
||||
else:
|
||||
return {"status": False,
|
||||
"message": "You don't have rights to do this operation."}
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from app.utils import read_config
|
||||
import subprocess as sp
|
||||
import requests
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
|
||||
|
||||
class Update(object):
|
||||
|
||||
def __init__(self):
|
||||
self.project_url = "https://api.github.com/repos/KasperskyLab/TinyCheck/tags"
|
||||
self.app_path = "/usr/share/tinycheck"
|
||||
return None
|
||||
|
||||
def check_version(self):
|
||||
"""
|
||||
Check if a new version of TinyCheck is available
|
||||
by quering the Github api and comparing the last
|
||||
tag inside the VERSION file.
|
||||
:return: dict containing the available versions.
|
||||
"""
|
||||
if read_config(("frontend", "update")):
|
||||
try:
|
||||
res = requests.get(self.project_url, timeout=60)
|
||||
res = json.loads(res.content.decode("utf8"))
|
||||
|
||||
with open(os.path.join(self.app_path, "VERSION")) as f:
|
||||
cv = f.read()
|
||||
if cv != res[0]["name"]:
|
||||
return {"status": True,
|
||||
"message": "A new version is available",
|
||||
"current_version": cv,
|
||||
"next_version": res[0]["name"]}
|
||||
else:
|
||||
return {"status": True,
|
||||
"message": "This is the latest version",
|
||||
"current_version": cv}
|
||||
except:
|
||||
return {"status": False,
|
||||
"message": "Something went wrong (no API access nor version file)"}
|
||||
else:
|
||||
return {"status": False,
|
||||
"message": "You don't have rights to do this operation."}
|
||||
|
||||
def get_current_version(self):
|
||||
"""
|
||||
Get the current version of the TinyCheck instance
|
||||
:return: dict containing the current version or error.
|
||||
"""
|
||||
if read_config(("frontend", "update")):
|
||||
try:
|
||||
with open(os.path.join(self.app_path, "VERSION")) as f:
|
||||
return {"status": True,
|
||||
"current_version": f.read()}
|
||||
except:
|
||||
return {"status": False,
|
||||
"message": "Something went wrong - no version file ?"}
|
||||
else:
|
||||
return {"status": False,
|
||||
"message": "You don't have rights to do this operation."}
|
||||
|
||||
def update_instance(self):
|
||||
"""
|
||||
Update the instance by executing the update script.
|
||||
:return: dict containing the update status.
|
||||
"""
|
||||
if read_config(("frontend", "update")):
|
||||
try:
|
||||
os.chdir(self.app_path)
|
||||
sp.Popen(["bash", os.path.join(self.app_path, "update.sh")])
|
||||
return {"status": True,
|
||||
"message": "Update successfully launched"}
|
||||
except:
|
||||
return {"status": False,
|
||||
"message": "Issue during the update"}
|
||||
else:
|
||||
return {"status": False,
|
||||
"message": "You don't have rights to do this operation."}
|
||||
|
Reference in New Issue
Block a user