SpyGuard/server/backend/app/classes/misp.py

158 lines
6.0 KiB
Python
Raw Permalink Normal View History

2022-11-06 15:51:33 +01:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from app import db
from app.db.models import MISPInst
from app.definitions import definitions as defs
from sqlalchemy.sql import exists
2023-09-02 10:08:18 +02:00
from markupsafe import escape
2022-11-06 15:51:33 +01:00
from pymisp import PyMISP
import re
import time
class MISP(object):
def __init__(self):
return None
def add_instance(self, instance) -> dict:
"""
Parse and add a MISP instance to the database.
:return: status of the operation in JSON
"""
url = instance["url"]
name = instance["name"]
apikey = instance["key"]
verify = instance["ssl"]
last_sync = int(time.time()-31536000) # One year
sameinstances = db.session.query(MISPInst).filter(
MISPInst.url == url, MISPInst.apikey == apikey)
if sameinstances.count():
return {"status": False,
"message": "This MISP instance already exists"}
if name:
if self.test_instance(url, apikey, verify):
added_on = int(time.time())
db.session.add(MISPInst(name, escape(
url), apikey, verify, added_on, last_sync))
db.session.commit()
return {"status": True,
"message": "MISP instance added"}
else:
return {"status": False,
"message": "Please verify the connection to the MISP instance"}
else:
return {"status": False,
"message": "Please provide a name for your instance"}
@staticmethod
def delete_instance(misp_id) -> dict:
"""
Delete a MISP instance by its id in the database.
:return: status of the operation in JSON
"""
if db.session.query(exists().where(MISPInst.id == misp_id)).scalar():
db.session.query(MISPInst).filter_by(id=misp_id).delete()
db.session.commit()
return {"status": True,
"message": "MISP instance deleted"}
else:
return {"status": False,
"message": "MISP instance not found"}
def get_instances(self) -> list:
"""
Get MISP instances from the database
:return: generator of the records.
"""
for misp in db.session.query(MISPInst).all():
misp = misp.__dict__
yield {"id": misp["id"],
"name": misp["name"],
"url": misp["url"],
"apikey": misp["apikey"],
"verifycert": True if misp["verifycert"] else False,
"connected": self.test_instance(misp["url"], misp["apikey"], misp["verifycert"]),
"lastsync": misp["last_sync"]}
@staticmethod
def test_instance(url, apikey, verify) -> bool:
"""
Test the connection of the MISP instance.
:return: generator of the records.
"""
try:
PyMISP(url, apikey, verify)
return True
except:
return False
@staticmethod
def update_sync(misp_id) -> bool:
"""
Update the last synchronization date by the actual date.
:return: bool, True if updated.
"""
try:
misp = MISPInst.query.get(int(misp_id))
misp.last_sync = int(time.time())
db.session.commit()
return True
except:
return False
@staticmethod
def get_iocs(misp_id) -> list:
"""
Get all IOCs from specific MISP instance
:return: generator containing the IOCs.
"""
misp = MISPInst.query.get(int(misp_id))
if misp is not None:
if misp.url and misp.apikey:
try:
# Connect to MISP instance and get network activity attributes.
m = PyMISP(misp.url, misp.apikey, misp.verifycert)
r = m.search("attributes", category="Network activity", date_from=int(misp.last_sync))
except:
print("Unable to connect to the MISP instance ({}/{}).".format(misp.url, misp.apikey))
return []
for attr in r["Attribute"]:
if attr["type"] in ["ip-dst", "domain", "snort", "x509-fingerprint-sha1"]:
ioc = {"value": attr["value"],
"type": None,
"tag": "suspect",
"tlp": "white"}
# Deduce the IOC type.
if re.match(defs["iocs_types"][0]["regex"], attr["value"]):
ioc["type"] = "ip4addr"
elif re.match(defs["iocs_types"][1]["regex"], attr["value"]):
ioc["type"] = "ip6addr"
elif re.match(defs["iocs_types"][2]["regex"], attr["value"]):
ioc["type"] = "cidr"
elif re.match(defs["iocs_types"][3]["regex"], attr["value"]):
ioc["type"] = "domain"
elif re.match(defs["iocs_types"][4]["regex"], attr["value"]):
ioc["type"] = "sha1cert"
elif "alert " in attr["value"][0:6]:
ioc["type"] = "snort"
else:
continue
if "Tag" in attr:
for tag in attr["Tag"]:
# Add a TLP to the IOC if defined in tags.
tlp = re.search(r"^(?:tlp:)(red|green|amber|white)", tag['name'].lower())
if tlp: ioc["tlp"] = tlp.group(1)
# Add possible tag (need to match SpyGuard tags)
if tag["name"].lower() in [t["tag"] for t in defs["iocs_tags"]]:
ioc["tag"] = tag["name"].lower()
yield ioc