KasperskyLab-TinyCheck/server/backend/app/classes/iocs.py

154 lines
5.4 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from app import db
from app.db.models import Ioc
from sqlalchemy.sql import exists
from app.definitions import definitions
from flask import escape
import re
import time
class IOCs(object):
def __init__(self):
return None
@staticmethod
def add(ioc_type, ioc_tag, ioc_tlp, ioc_value, source):
"""
Parse and add an IOC to the database.
:return: status of the operation in JSON
"""
ioc_value = ioc_value.lower() if ioc_type != "snort" else ioc_value
ioc_valid = False
if db.session.query(exists().where(Ioc.value == ioc_value)).scalar():
return {"status": False,
"message": "IOC already exists",
"ioc": escape(ioc_value)}
elif ioc_tlp in ["white", "green", "amber", "red"]:
if ioc_type == "unknown":
for t in definitions["iocs_types"]:
if t["regex"] and t["auto"]:
if re.match(t["regex"], ioc_value):
ioc_type = t["type"]
ioc_valid = True
elif ioc_type in [t["type"] for t in definitions["iocs_types"]]:
for t in definitions["iocs_types"]:
if t["type"] == ioc_type and t["regex"]:
if re.match(t["regex"], ioc_value):
ioc_valid = True
break
elif t["type"] == "snort" and ioc_value[0:6] == "alert ":
ioc_valid = True
break
else:
return {"status": True,
"message": "Wrong IOC type",
"ioc": escape(ioc_value),
"type": escape(ioc_type)}
if ioc_valid:
added_on = int(time.time())
db.session.add(Ioc(ioc_value, ioc_type, ioc_tlp,
ioc_tag, source, added_on))
db.session.commit()
return {"status": True,
"message": "IOC added",
"ioc": escape(ioc_value),
"type": escape(ioc_type),
"tlp": escape(ioc_tlp),
"tag": escape(ioc_tag),
"source": escape(source),
"added_on": escape(added_on)}
else:
return {"status": False,
"message": "Wrong IOC format",
"ioc": escape(ioc_value)}
else:
return {"status": False,
"message": "Wrong IOC TLP",
"ioc": escape(ioc_value),
"type": escape(ioc_tlp)}
@staticmethod
def delete(ioc_id):
"""
Delete an IOC by its id in the database.
:return: status of the operation in JSON
"""
if db.session.query(exists().where(Ioc.id == ioc_id)).scalar():
db.session.query(Ioc).filter_by(id=ioc_id).delete()
db.session.commit()
return {"status": True,
"message": "IOC deleted"}
else:
return {"status": False,
"message": "IOC not found"}
@staticmethod
def delete_by_value(ioc_value):
"""
Delete an IOC by its value in the database.
:return: status of the operation in JSON
"""
if db.session.query(exists().where(Ioc.value == ioc_value)).scalar():
db.session.query(Ioc).filter_by(value=ioc_value).delete()
db.session.commit()
return {"status": True,
"message": "IOC deleted"}
else:
return {"status": False,
"message": "IOC not found"}
@staticmethod
def search(term):
"""
Search IOCs in the database.
:return: generator of results.
"""
iocs = db.session.query(Ioc).filter(
Ioc.value.like(term.replace("*", "%"))).all()
for ioc in iocs:
ioc = ioc.__dict__
yield {"id": ioc["id"],
"type": ioc["type"],
"tag": ioc["tag"],
"tlp": ioc["tlp"],
"value": ioc["value"]}
@staticmethod
def get_types():
"""
Retreive a list of IOCs types.
:return: generator of iocs types.
"""
for t in definitions["iocs_types"]:
yield {"type": t["type"],
"name": t["name"]}
@staticmethod
def get_tags():
"""
Retreive a list of IOCs tags.
:return: generator of iocs tags.
"""
for t in definitions["iocs_tags"]:
yield {"tag": t["tag"],
"name": t["name"]}
@staticmethod
def get_all():
"""
Get all IOCs from the database
:return: generator of the records.
"""
for ioc in db.session.query(Ioc).all():
ioc = ioc.__dict__
yield {"id": ioc["id"],
"type": ioc["type"],
"tag": ioc["tag"],
"tlp": ioc["tlp"],
"value": ioc["value"]}