KasperskyLab-TinyCheck/server/backend/app/classes/octi.py
2021-06-15 13:24:01 +02:00

165 lines
6.3 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from app import db
from app.db.models import OCTIInst
from app.definitions import definitions as defs
from sqlalchemy.sql import exists
from urllib.parse import unquote
from flask import escape
from pycti import OpenCTIApiClient, Infrastructure
import re
import time
import sys
class OCTI(object):
def __init__(self):
return None
def add_instance(self, instance):
"""
Parse and add a OpenCTI instance to the database.
:return: status of the operation in JSON
"""
url = instance["url"]
name = instance["name"]
apikey = instance["key"]
verify = instance["ssl"]
last_sync = int(time.time()-31536000) # One year
sameinstances = db.session.query(OCTIInst).filter(
OCTIInst.url == url, OCTIInst.apikey == apikey)
if sameinstances.count():
return {"status": False,
"message": "This OpenCTI instance already exists"}
if name:
if self.test_instance(url, apikey, verify):
added_on = int(time.time())
db.session.add(OCTIInst(name, escape(
url), apikey, verify, added_on, last_sync))
db.session.commit()
return {"status": True,
"message": "OpenCTI instance added"}
else:
return {"status": False,
"message": "Please verify the connection to the OpenCTI instance"}
else:
return {"status": False,
"message": "Please provide a name for your instance"}
@staticmethod
def delete_instance(opencti_id):
"""
Delete a OpenCTI instance by its id in the database.
:return: status of the operation in JSON
"""
if db.session.query(exists().where(OCTIInst.id == opencti_id)).scalar():
db.session.query(OCTIInst).filter_by(id=opencti_id).delete()
db.session.commit()
return {"status": True,
"message": "OpenCTI instance deleted"}
else:
return {"status": False,
"message": "OpenCTI instance not found"}
def get_instances(self):
"""
Get OpenCTI instances from the database
:return: generator of the records.
"""
for opencti in db.session.query(OCTIInst).all():
opencti = opencti.__dict__
yield {"id": opencti["id"],
"name": opencti["name"],
"url": opencti["url"],
"apikey": opencti["apikey"],
"verifycert": True if opencti["verifycert"] else False,
"connected": self.test_instance(opencti["url"], opencti["apikey"], opencti["verifycert"]),
"lastsync": opencti["last_sync"]}
@staticmethod
def test_instance(url, apikey, verify):
"""
Test the connection of the OpenCTI instance.
:return: generator of the records.
"""
try:
OpenCTIApiClient(url, token=apikey, ssl_verify=verify)
return True
except:
return False
@staticmethod
def update_sync(opencti_id):
"""
Update the last synchronization date by the actual date.
:return: bool, True if updated.
"""
try:
misp = OCTIInst.query.get(int(opencti_id))
misp.last_sync = int(time.time())
db.session.commit()
return True
except:
return False
@staticmethod
def get_iocs(opencti_id):
"""
Get all IOCs from specific OpenCTI instance
:return: generator containing the IOCs.
"""
opencti = OCTIInst.query.get(int(opencti_id))
if opencti is not None:
if opencti.url and opencti.apikey:
try:
# Connect to OpenCTI instance and get network activity attributes.
i = OpenCTIApiClient(
opencti.url, opencti.apikey, opencti.verifycert)
r = Infrastructure(i).list(getAll=True)
except:
print(
"Unable to connect to the OpenCTI instance ({}/{}).".format(opencti.url, opencti.apikey))
return []
"""
for attr in r["Attribute"]:
if attr["type"] in ["ip-dst", "domain", "snort", "x509-fingerprint-sha1"]:
ioc = {"value": attr["value"],
"type": None,
"tag": "suspect",
"tlp": "white"}
# Deduce the IOC type.
if re.match(defs["iocs_types"][0]["regex"], attr["value"]):
ioc["type"] = "ip4addr"
elif re.match(defs["iocs_types"][1]["regex"], attr["value"]):
ioc["type"] = "ip6addr"
elif re.match(defs["iocs_types"][2]["regex"], attr["value"]):
ioc["type"] = "cidr"
elif re.match(defs["iocs_types"][3]["regex"], attr["value"]):
ioc["type"] = "domain"
elif re.match(defs["iocs_types"][4]["regex"], attr["value"]):
ioc["type"] = "sha1cert"
elif "alert " in attr["value"][0:6]:
ioc["type"] = "snort"
else:
continue
if "Tag" in attr:
for tag in attr["Tag"]:
# Add a TLP to the IOC if defined in tags.
tlp = re.search(
r"^(?:tlp:)(red|green|amber|white)", tag['name'].lower())
if tlp:
ioc["tlp"] = tlp.group(1)
# Add possible tag (need to match TinyCheck tags)
if tag["name"].lower() in [t["tag"] for t in defs["iocs_tags"]]:
ioc["tag"] = tag["name"].lower()
yield ioc
"""