Create a custom MISP warninglist¶
Introduction¶
- UUID: 1c946ff3-0798-4c59-a19e-fc0b622e75e3
- Started from issue 7
- State: Published : demo version with output
- Purpose: This playbook creates a custom MISP warninglist with a set of entries provided by the analyst as input. A check is done if the warninglist already exists. If the warninglist already exists then the entries are added to the existing warninglist. When the warninglist is created the MISP events are queried for matches ('retro-search').
- The playbook also queries Shodan and VirusTotal for matches with entries in the warninglist. The result of the creation of the warninglist as well as the matches is summarised at the end of the playbook and sent to Mattermost or Slack or added as an alert in TheHive.
- Tags: [ "warninglist", "hunting" ]
- External resources: VirusTotal, Shodan, Mattermost, TheHive
- Target audience: SOC, CSIRT, CTI
- Graphical workflow
Playbook¶
- Create a custom MISP warninglist
- Introduction
- Preparation
- PR:1 Initialise environment
- PR:2 Verify MISP modules
- PR:3 Set helper variables
- Investigate
- IN:1 MISP warninglists
- IN:2 Set the MISP warninglist name (or ID)
- IN:3 MISP warninglist details
- IN:4 MISP warninglist values
- IN:5 Create or update the MISP warninglist
- IN:6 Enable the MISP warninglist
- IN:7 View the warninglist in MISP
- Correlation
- ER:1 Query MISP
- ER:2 Query VirusTotal
- ER:3 Query Shodan
- Closure
- EN:1 Create the summary of the playbook
- EN:2 Send a summary to Mattermost
- EN:3 Send an alert to TheHive
- EN:4 End of the playbook
- External references
- Technical details
Preparation¶
PR:1 Initialise environment¶
This section initialises the playbook environment and loads the required Python libraries.
The credentials for MISP (API key) and other services are loaded from the file keys.py
in the directory vault. A PyMISP object is created to interact with MISP and the active MISP server is displayed. By printing out the server name you know that it's possible to connect to MISP. In case of a problem PyMISP will indicate the error with PyMISPError: Unable to connect to MISP
.
The contents of the keys.py
file should contain at least :
misp_url="<MISP URL>" # The URL to our MISP server
misp_key="<MISP API KEY>" # The MISP API key
misp_verifycert=<True or False> # Indicate if PyMISP should attempt to verify the certificate or ignore errors
mattermost_playbook_user="<MATTERMOST USER>"
mattermost_hook="<MATTERMOST WEBHOOK>"
thehive_url="<THEHIVE URL>"
thehive_key="<THEHIVE API KEY>"
virustotal_apikey="<VIRUSTOTAL_APIKEY>"
shodan_apikey="<SHODAN_APIKEY>"
# Initialise Python environment
import urllib3
import sys
import json
from pyfaup.faup import Faup
from prettytable import PrettyTable, MARKDOWN
from IPython.display import Image, display, display_markdown, HTML
from datetime import date
import requests
import uuid
from uuid import uuid4
from pymisp import *
from pymisp.tools import GenericObjectGenerator
import re
import time
# Load the credentials
sys.path.insert(0, "../vault/")
from keys import *
if misp_verifycert is False:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
print("The \033[92mPython libraries\033[90m are loaded and the \033[92mcredentials\033[90m are read from the keys file.")
# Create the PyMISP object
misp = PyMISP(misp_url, misp_key, misp_verifycert)
print("I will use the MISP server \033[92m{}\033[90m for this playbook.\n\n".format(misp_url))
The Python libraries are loaded and the credentials are read from the keys file. I will use the MISP server https://misp.demo.cudeso.be/ for this playbook.
PR:2 Verify MISP modules¶
This playbook uses the MISP VirusTotal and Shodan modules to obtain additional correlation information. MISP modules are autonomous modules that can be used to extend MISP for new services such as expansion, import and export. The modules are written in Python 3 following a simple API interface. The objective is to ease the extensions of MISP functionalities without modifying core components. The API is available via a simple REST API which is independent from MISP installation or configuration.
Note that the core of the playbook (creating the warninglists) does function without these modules. The playbook does not re-implement the modules, it merely queries the local MISP module server, similarly as how MISP queries this module server. This also means that the playbook needs to have access to this MISP module server.
In the next cell we check if we have access to the MISP module server and if the VirusTotal and Shodan modules are enabled.
# Where can we find the local MISP Module server? You can leave this to the default setting in most cases.
misp_modules_url = "http://127.0.0.1:6666"
# How long do we wait between queries when using the MISP modules (API rate limiting of external service such as VirusTotal)
misp_modules_wait = 3
# Initiliasation
misp_modules = {}
misp_modules_headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
misp_modules_in_use = ["virustotal_public", "shodan"]
# Code block to query the MISP module server and check if our modules are enabled
res = requests.get("{}/modules".format(misp_modules_url), headers=misp_modules_headers)
for module in res.json():
for module_requested in misp_modules_in_use:
if module.get("name", False) == module_requested:
misp_modules[module_requested] = {"enabled": True, "input": module.get("mispattributes").get("input")}
print("Found the \033[92m{}\033[90m MISP module (Accepted input: {}).".format(module_requested, misp_modules[module_requested]["input"]))
print("\n\n")
Found the shodan MISP module (Accepted input: ['ip-src', 'ip-dst']). Found the virustotal_public MISP module (Accepted input: ['hostname', 'domain', 'ip-src', 'ip-dst', 'md5', 'sha1', 'sha256', 'url']).
PR:3 Set helper variables¶
This cell contains helper variables that are used in this playbook.
# Dictionary to hold correlation results
correlation_results = {}
correlation_results["misp"] = {}
correlation_results["virustotal"] = {}
correlation_results["shodan"] = {}
# A set of regular expressions that we use to determine the attribute type in a warninglist. Needed to decide if we submit them to MISP modules
regular_expressions = {"sha256": "^[a-fA-F0-9]{64}$",
"md5": "^[a-fA-F0-9]{32}$",
"hostname": "^[a-zA-Z0-9.\-_]+\.[a-zA-Z]{2,}$",
"sha1": "^[a-fA-F0-9]{40}$",
"url": "^(http|https):\/\/[-a-zA-Z0-9-]{2,256}\.[-a-zA-Z0-9-]{2,256}",
"ip-src": "[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}",
"ip-dst": "[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}"
}
# Warninglist values, in text block and Python list format
warninglist_values_blob = ""
warninglist_values_list = []
Investigate¶
IN:1 MISP warninglists¶
The misp-warninglists are lists of well-known indicators that can be associated to potential false positives, errors or mistakes. The warning lists are integrated in MISP to display an info/warning box at the event and attribute level if such indicators are available in one of the list. The lists are also used to filter potential false-positive at API level. MISP warninglists also serve a dual use. You cannot only use them to identify false positives, you can also use the lists to track if specific attributes (for example IPs or domains) are in the threat events you receive from your community.
IN:2 Set the MISP warninglist name (or ID)¶
In the next cell you can use warninglist_name
to indicate if you want to create a new MISP warninglist or add (or replace) data to an existing MISP warninglist. Contrary to some of the other MISP elements, warninglists do not have a UUID. The playbook uses the warninglist name as the reference information. The playbook checks if the name that you provide already exists as a warninglist. If it doesn't exist it will create it for you. If you know the ID of the warninglist, you can specify it directly with warninglist_id
. This overrides warninglist_name
.
In summary:
- A value in
warninglist_id
: add or replace the content of the existing warninglist with IDwarninglist_id
warninglist_name
with the name of a non-existing warninglist: the warninglist is createdwarninglist_name
with the name of an existing warninglist: add or replace the content of the warninglist
# Provide the name of a warninglist. The playbook first checks if the warninglist exists before doing other actions.
warninglist_name = "Campaign-ZN03"
# Optionally provide a warninglist ID directly
warninglist_id = 0
# Code block to check if a warninglist exists.
warninglist_existing = False
try:
warninglist_id = int(warninglist_id)
except ValueError:
warninglist_id = 0
if warninglist_id <= 0:
all_warninglists = misp.warninglists(pythonify=True)
if len(all_warninglists) > 0:
for warninglist in all_warninglists:
if warninglist.name == warninglist_name:
warninglist_id = int(warninglist.id)
warninglist_name = warninglist.name
print("Found a matching warninglist with ID \033[92m{}\033[90m.\n".format(warninglist_id))
break
if warninglist_id > 0:
existing_warninglist = misp.get_warninglist(warninglist_id, pythonify=True)
if "errors" in existing_warninglist:
print("Received an error from MISP when looking up warninglist ID \033[92m{}\033[90m.".format(warninglist_id))
else:
print("I will use the warninglist \033[92m{} - {}\033[90m.".format(warninglist_id, warninglist_name))
print("- Warninglist description (warninglist_description): \033[1m{}\033[0m".format(existing_warninglist.description))
print("- Warninglist type (warninglist_type): \033[1m{}\033[0m".format(existing_warninglist.type))
print("- Warninglist category (warninglist_category): \033[1m{}\033[0m".format(existing_warninglist.category))
print("- Warninglist matching attributes(warninglist_matching_attributes) : \033[1m{}\033[0m".format(existing_warninglist.WarninglistType))
#print(vars(existing_warninglist))
warninglist_request_url = "{}/{}/{}".format(misp_url, "warninglists/edit", warninglist_id)
warninglist_existing = True
else:
print("I will create a \033[92mnew warninglist\033[90m with the name \033[92m{}\033[90m.".format(warninglist_name))
warninglist_request_url = "{}/{}".format(misp_url, "warninglists/add")
print("\n\n")
I will create a new warninglist with the name Campaign-ZN03.
IN:3 MISP warninglist details¶
In the next cell you can provide your input and specify the details of the warninglists. If you use an existing warninglist and leave warninglist_description
, warninglist_type
, warninglist_category
and warninglist_matching_attributes
empty then the warninglist details remains unchanged. If you provide a value the playbook will override their details in MISP. Make sure that if you change the type the warninglist values are still valid according to the newly selected warninglist type.
The description of the warninglist is defined in warninglist_description
.
The warninglist type is defined in warninglist_type
and you can choose between
- string (default): perfect match of a string in the warning list against matching attributes
- substring: substring matching of a string in the warning list against matching attributes
- hostname: hostname matching (e.g. domain matching from URL)
- cidr: IP or CDIR block matching
- regex: regex matching of a string matching attributes
The warninglist category is defined with warninglist_category
and is a list of false positives (value false_positive
) or a list of known identifiers (value known
, the default value).
And finally you can specify the list of matching MISP attributes where this warninglist is applied against with warninglist_matching_attributes
. The default is an empty list. This means that all attribute types are taken into consideration. The list needs to be supplied in a Python list format.
# Leave these settings unchanged if you want to keep the warninglist details already defined in MISP
# Warninglist description
warninglist_description = "Tracker for indicators of Campaign-ZN03"
# Warninglist type: string (default), substring, hostname, cidr or regex
warninglist_type = ""
# Warninglist category: false_positive or known (default)
warninglist_category = ""
# Warninglist matching MISP attributes, default is an empty list, meaning all attribute types
warninglist_matching_attributes = []
IN:4 MISP warninglist values¶
Add or replace values¶
If you're using an existing warninglist then use warninglist_values_action
to indicate if you want to append (add) data to an existing warninglist or replace the values that already exist. The default behaviour is to append values to a warninglist. This is not used when you create a new warninglist.
Warninglist values¶
Then finally with warninglist_values
you can specify the list of values for a warninglist. This needs to be a Python list. You can include comments by separating them the value from the comment with "#" (for example " mydomain.com # Demo domain "
). Note that the playbook prevents you from adding doubles to the value list.
# Append the values from warninglist_values to the list or replace them
warninglist_values_action = "append"
# The values for the warninglist
warninglist_values = [
"028878c4b6ab475ed0be97eca6f92af9 # TinyTurla / w64time.dll",
"johnshopkin.net",
"158.255.208.168 # SDBbot C2"
]
# Verify that we only have append or replace. This is not used for new warninglists
if not warninglist_values_action in ["append", "replace"]:
warninglist_values_action = "append"
# Set details of the warninglist, either use those of the existing warninglist, supplied as input or the default values
if warninglist_existing:
post_description = warninglist_description if len(warninglist_description.strip()) > 0 else existing_warninglist.description
post_type = warninglist_type.lower() if len(warninglist_type.strip()) > 0 else existing_warninglist.type
post_category = warninglist_category.lower() if len(warninglist_category.strip()) > 0 else existing_warninglist.category
post_matching_attributes = warninglist_matching_attributes if type(warninglist_matching_attributes) == list and len(warninglist_matching_attributes) > 0 else existing_warninglist.WarninglistType
post_text = "update"
else:
post_description = warninglist_description.strip()
post_type = warninglist_type.lower() if len(warninglist_type.strip()) > 0 else "string"
post_category = warninglist_category.lower() if len(warninglist_category.strip()) > 0 else "known"
post_matching_attributes = warninglist_matching_attributes if type(warninglist_matching_attributes) == list and len(warninglist_matching_attributes) > 0 else []
post_text = "create"
# Process the values in the warninglist and put them in a text blob and Python list. Only consider the current values if there is an append action
if warninglist_id > 0 and warninglist_values_action == "append":
for value in existing_warninglist.WarninglistEntry:
comment = " # {}".format(value["comment"]) if value["comment"] is not None and len(value["comment"]) > 0 else ""
warninglist_values_blob = "{}{}{} \n".format(warninglist_values_blob, value["value"], comment)
warninglist_values_list.append(value["value"])
for value in warninglist_values:
if value not in warninglist_values_list and len(value) > 0:
warninglist_values_blob = "{}{} \n".format(warninglist_values_blob, value)
warninglist_values_list.append(value.split("#")[0].strip())
print("I will \033[92m{}\033[90m the warninglist {} (type: {}, category: {}, matching_attributes: {}). Continue in the next cell with the actual submit.\n\n".format(post_text, warninglist_name, post_type, post_category, post_matching_attributes))
I will create the warninglist Campaign-ZN03 (type: string, category: known, matching_attributes: []). Continue in the next cell with the actual submit.
IN:5 Create or update the MISP warninglist¶
The next cell does the actual connection with MISP and will submit the warninglist values.
This is done with a POST request via the _prepare_request function of PyMISP. There are different PyMISP functions available to manipulate MISP warninglists but unfortunately there is no function that allows you to add a new warninglist, hence the use of _prepare_request
. The function is a wrapper around the Python requests library and takes care of setting the necessary HTTP headers for you.
# Build the JSON block that we will submit
custom_warninglist = {
"name": f"{warninglist_name}",
"description": f"{post_description}",
"type": f"{post_type}",
"category": f"{post_category}",
"entries": f"{warninglist_values_blob}",
"matching_attributes": post_matching_attributes
}
# Send the POST request
warninglist = {"Warninglist": custom_warninglist}
warninglist_post = misp._prepare_request("POST", warninglist_request_url, data=warninglist)
# Process the response
if not warninglist_post.status_code == 200:
if "errors" in warninglist_post.json():
print("There were \033[91merrors when updating the warninglist.\033[90m Fix these errors before proceeding.\n\n")
print(warninglist_post.json()["errors"])
else:
if "Warninglist" in warninglist_post.json():
warninglist_id = int(warninglist_post.json()["Warninglist"].get("id", 0))
print("There was a \033[92msuccessfull\033[90m {} for the warninglist \033[92m{}\033[90m.\n\n".format(post_text, warninglist_id))
else:
print("There were \033[91merrors when updating the warninglist.\033[90mFix these errors before proceeding.\n\n")
print(warninglist_post.json()["errors"])
There was a successfull create for the warninglist 91.
IN:6 Enable the MISP warninglist¶
If you create a new MISP warninglist you still need to enable the list before it becomes active in the MISP interface. Leave warninglist_enable
to True to enable the warninglist. Note that for the playbook it does not matter if you enabled the warninglist or not.
warninglist_enable = True
if warninglist_enable and warninglist_id > 0:
result = misp.enable_warninglist(warninglist_id)
if "errors" in result:
print("There was an \033[91merror when enabling the warninglist.\033[90m Fix these errors before proceeding.\n\n")
print(result)
else:
print("\033[92mEnabled\033[90m the warninglist. Now continue with querying MISP.\n\n")
Enabled the warninglist. Now continue with querying MISP.
IN:7 View the warninglist in MISP¶
You can also view the result of the previous steps of the playbook directly in MISP.
display(HTML("View the <a href=\"{}/warninglists/view/{}\" target=_blank>warninglist {} in MISP</a>\n".format(misp_url, warninglist_id, warninglist_name)))
Correlation¶
ER:1 Query MISP¶
Now that the warninglist is created (or updated), you can query the existing MISP events for matches with the entries on the warninglist. This playbook will search in all your events and returns a list of matches (attributes) corresponding with the values on the warninglist. You can consider this as some form of retro-search in your existing MISP database.
You can limit this search to events that are published (with match_published
) or events with specific tags (with match_tags
).
match_published = True
match_tags = []
#match_tags = [ "workflow:state=\"complete\"" ]
# Code block to query MISP
if len(warninglist_values_list) > 0:
search_match = misp.search("attributes", value=warninglist_values_list, tags=match_tags, published=match_published, pythonify=True)
if len(search_match) > 0:
for attribute in search_match:
category_type = "{}/{}".format(attribute.category, attribute.type)
event_info = "{} ({})".format(attribute.Event.info, attribute.Event.id)
if attribute.value in correlation_results["misp"]:
correlation_results["misp"][attribute.value]["category_type"].append(category_type) if category_type not in correlation_results["misp"][attribute.value]["category_type"] else None
correlation_results["misp"][attribute.value]["event_info"].append(event_info) if event_info not in correlation_results["misp"][attribute.value]["event_info"] else None
else:
correlation_results["misp"][attribute.value] = {"category_type": [category_type], "event_info": [event_info]}
misp_matches_count = len(correlation_results["misp"])
print("Got \033[92m{}\033[90m correlation result(s).\n\n".format(misp_matches_count))
else:
print("\033[93mNo correlating MISP events found")
Got 3 correlation result(s).
MISP events correlation table¶
The correlation results are now stored in correlation_results["misp"]
. Execute the next cell to display them in a table format. The table is also included in the summary sent to Mattermost and TheHive.
# Put the correlations in a pretty table. We can use this table later also for the summary
misp_table = PrettyTable()
if misp_matches_count > 0:
misp_table.field_names = ["Source", "Value", "Type", "Event"]
misp_table.align["Value"] = "l"
misp_table.align["Type"] = "l"
misp_table.align["Event"] = "l"
for correlation in correlation_results["misp"]:
for category_type in correlation_results["misp"][correlation]["category_type"]:
events = ""
for event in correlation_results["misp"][correlation]["event_info"]:
events = "{}{}\n".format(events, event)
misp_table.add_row(["MISP", correlation, category_type, events])
print(misp_table)
else:
print("\033[93mNo correlating MISP events found")
+--------+----------------------------------+-------------------------+--------------------------------------------------------------------------------------------------------------------------+ | Source | Value | Type | Event | +--------+----------------------------------+-------------------------+--------------------------------------------------------------------------------------------------------------------------+ | MISP | 158.255.208.168 | Network activity/ip-dst | [CERT-FR] Infrastructure d'attaque du groupe cybercriminel TA505 (2397) | | | | | | | MISP | johnshopkin.net | Network activity/domain | OSINT - Protecting customers from a private-sector offensive actor using 0-day exploits and DevilsTongue malware (2411) | | | | | | | MISP | 028878c4b6ab475ed0be97eca6f92af9 | Payload delivery/md5 | TinyTurla - Turla deploys new malware to keep a secret backdoor on victim machines (2416) | | | | | | +--------+----------------------------------+-------------------------+--------------------------------------------------------------------------------------------------------------------------+
ER:2 Query VirusTotal¶
If the VirusTotal module is enabled in MISP modules you can now query VirusTotal for every entry on the warninglist. The playbook will return the corresponding matches. The playbook checks if the attribute in the warninglist is an attribute type that is accepted by the VirusTotal MISP module (previously stored in misp_modules[module_name]["input"]
). In order to deal with the API rate limits the playbook will also wait a short time between each query (misp_modules_wait
).
# Code block to query VirusTotal
module_name = "virustotal_public"
if misp_modules[module_name]["enabled"] and len(warninglist_values_list) > 0:
for value in warninglist_values_list:
attribute_type = False
for expr in regular_expressions:
if re.match(r"{}".format(regular_expressions[expr]), value):
attribute_type = expr
break
if attribute_type in misp_modules[module_name]["input"]:
data = {
"attribute": {
"type": f"{attribute_type}",
"uuid": str(uuid.uuid4()),
"value": f"{value}",
},
"module": module_name,
"config": {"apikey": virustotal_apikey}
}
print("Query \033[92m{}\033[90m as \033[92m{}\033[90m".format(value, attribute_type))
result = requests.post("{}/query".format(misp_modules_url), headers=misp_modules_headers, json=data)
#pprint(result.json())
if "results" in result.json() and len(result.json()["results"]) > 0:
result_json = result.json()["results"]
for misp_object in result_json["Object"]:
for misp_attribute in misp_object["Attribute"]:
category_type = "{}/{}".format(misp_attribute["category"], misp_attribute["type"])
if category_type not in ["Other/text"]:
correlation_results["virustotal"][misp_attribute["value"]] = {"category_type": [category_type]}
print(" Got {} {}".format(category_type, misp_attribute["value"]))
print("Sleeping for {} seconds".format(misp_modules_wait))
time.sleep(misp_modules_wait)
else:
print("Skipping \033[91m{}\033[90m. Not a valid query type ({}).".format(value, misp_modules[module_name]["input"]))
print("\n\nGot \033[92m{}\033[90m correlation result(s) from VirusTotal.\n\n".format(len(correlation_results["virustotal"])))
Query 028878c4b6ab475ed0be97eca6f92af9 as md5 Got Payload delivery/md5 028878c4b6ab475ed0be97eca6f92af9 Got Payload delivery/sha1 02c37ccdfccfe03560a4bf069f46e8ae3a5d2348 Got Payload delivery/sha256 030cbd1a51f8583ccfc3fa38a28a5550dc1c84c05d6c0f5eb887d13dedf1da01 Got External analysis/link https://www.virustotal.com/gui/file/030cbd1a51f8583ccfc3fa38a28a5550dc1c84c05d6c0f5eb887d13dedf1da01/detection/f-030cbd1a51f8583ccfc3fa38a28a5550dc1c84c05d6c0f5eb887d13dedf1da01-1676466051 Got External analysis/text 55/70 Sleeping for 3 seconds Query johnshopkin.net as hostname Got Network activity/domain johnshopkin.net Got Network activity/ip-dst 152.89.247.66 Got Network activity/ip-dst 172.105.27.36 Got Network activity/ip-dst 44.227.65.245 Got Network activity/ip-dst 44.227.76.166 Got Network activity/ip-dst 45.61.137.20 Got Network activity/ip-dst 54.197.173.238 Sleeping for 3 seconds Query 158.255.208.168 as ip-src Got Network activity/AS 9009 Got Network activity/ip-dst 158.255.208.168 Got Network activity/domain hk-ed.metercdn.net Sleeping for 3 seconds Got 15 correlation result(s) from VirusTotal.
VirusTotal correlation table¶
The correlation results are now stored in correlation_results["virustotal"]
. Execute the next cell to display them in a table format. The table is also included in the summary sent to Mattermost and TheHive.
# Put the correlations in a pretty table. We can use this table later also for the summary
vt_table = PrettyTable()
if len(correlation_results["virustotal"]) > 0:
vt_table.field_names = ["Source", "Value", "Category and Type"]
vt_table.align["Value"] = "l"
vt_table.align["Category and Type"] = "l"
vt_table.border = True
for el in correlation_results["virustotal"]:
vt_table.add_row(["VirusTotal", el, correlation_results["virustotal"][el]["category_type"]])
print(vt_table)
else:
print("\033[93mNo correlating data found from VirusTotal")
+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------+ | Source | Value | Category and Type | +------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------+ | VirusTotal | 028878c4b6ab475ed0be97eca6f92af9 | ['Payload delivery/md5'] | | VirusTotal | 02c37ccdfccfe03560a4bf069f46e8ae3a5d2348 | ['Payload delivery/sha1'] | | VirusTotal | 030cbd1a51f8583ccfc3fa38a28a5550dc1c84c05d6c0f5eb887d13dedf1da01 | ['Payload delivery/sha256'] | | VirusTotal | https://www.virustotal.com/gui/file/030cbd1a51f8583ccfc3fa38a28a5550dc1c84c05d6c0f5eb887d13dedf1da01/detection/f-030cbd1a51f8583ccfc3fa38a28a5550dc1c84c05d6c0f5eb887d13dedf1da01-1676466051 | ['External analysis/link'] | | VirusTotal | 55/70 | ['External analysis/text'] | | VirusTotal | johnshopkin.net | ['Network activity/domain'] | | VirusTotal | 152.89.247.66 | ['Network activity/ip-dst'] | | VirusTotal | 172.105.27.36 | ['Network activity/ip-dst'] | | VirusTotal | 44.227.65.245 | ['Network activity/ip-dst'] | | VirusTotal | 44.227.76.166 | ['Network activity/ip-dst'] | | VirusTotal | 45.61.137.20 | ['Network activity/ip-dst'] | | VirusTotal | 54.197.173.238 | ['Network activity/ip-dst'] | | VirusTotal | 9009 | ['Network activity/AS'] | | VirusTotal | 158.255.208.168 | ['Network activity/ip-dst'] | | VirusTotal | hk-ed.metercdn.net | ['Network activity/domain'] | +------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------+
ER:3 Query Shodan¶
If the Shodan module is enabled in MISP modules you can now query Shodan for every entry on the warninglist. The playbook will return the corresponding matches. The Shodan MISP module only accepts IP addresses as input, so only those attributes corresponding with the IP regular expression will be used. In order to deal with the API rate limits the playbook will also wait a short time between each query (misp_modules_wait
).
# Code block to query Shodan
module_name = "shodan"
if misp_modules[module_name]["enabled"] and len(warninglist_values_list) > 0:
for value in warninglist_values_list:
attribute_type = False
for expr in regular_expressions:
if re.match(r"{}".format(regular_expressions[expr]), value):
attribute_type = expr
break
if attribute_type in misp_modules[module_name]["input"]:
data = {
"attribute": {
"type": f"{attribute_type}",
"uuid": str(uuid.uuid4()),
"value": f"{value}",
},
"module": module_name,
"config": {"apikey": shodan_apikey}
}
print("Query \033[92m{}\033[90m as \033[92m{}\033[90m".format(value, attribute_type))
result = requests.post("{}/query".format(misp_modules_url), headers=misp_modules_headers, json=data)
#pprint(result.json())
if "results" in result.json() and len(result.json()["results"]) > 0:
result_json = result.json()["results"]
for misp_object in result_json["Object"]:
for misp_attribute in misp_object["Attribute"]:
category_type = "{}/{}".format(misp_attribute["category"], misp_attribute["type"])
if category_type not in ["Other/text", "Other/float", "Other/datetime"]:
correlation_results["shodan"][misp_attribute["value"]] = {"category_type": [category_type]}
print(" Got {} {}".format(category_type, misp_attribute["value"]))
print("Sleeping for {} seconds".format(misp_modules_wait))
time.sleep(misp_modules_wait)
else:
print("Skipping \033[91m{}\033[90m. Not a valid query type ({}).".format(value, misp_modules[module_name]["input"]))
print("\n\nGot \033[92m{}\033[90m correlation result(s) from Shodan.\n\n".format(len(correlation_results["shodan"])))
Skipping 028878c4b6ab475ed0be97eca6f92af9. Not a valid query type (['ip-src', 'ip-dst']). Skipping johnshopkin.net. Not a valid query type (['ip-src', 'ip-dst']). Query 158.255.208.168 as ip-src Got Network activity/AS AS9009 Got Network activity/ip-src 158.255.208.168 Got Network activity/ip-src 158.255.208.168 Got Network activity/port 80 Got Network activity/port 443 Got Network activity/port 8443 Got Network activity/domain 158.in-addr.arpa Got Network activity/domain metercdn.net Got Network activity/hostname 168.208.255.158.in-addr.arpa Got Network activity/hostname metercdn.net Got Network activity/x509-fingerprint-sha256 2ae166ec2af00fa9bce0d9f992608f9075109d6b9a126274213a8a2d1c83ab70 Got Network activity/x509-fingerprint-sha1 0e8ff37c435bccb4082ffa95aa3ed5cbd04f6e87 Got Network activity/x509-fingerprint-sha256 2ae166ec2af00fa9bce0d9f992608f9075109d6b9a126274213a8a2d1c83ab70 Got Network activity/x509-fingerprint-sha1 0e8ff37c435bccb4082ffa95aa3ed5cbd04f6e87 Sleeping for 3 seconds Got 10 correlation result(s) from Shodan.
Shodan correlation table¶
The correlation results are now stored in correlation_results["shodan"]
. Execute the next cell to display them in a table format. The table is also included in the summary sent to Mattermost and TheHive.
# Put the correlations in a pretty table. We can use this table later also for the summary
shodan_table = PrettyTable()
if len(correlation_results["shodan"]) > 0:
shodan_table.field_names = [ "Source", "Value", "Category and Type"]
shodan_table.align["Value"] = "l"
shodan_table.align["Category and Type"] = "l"
shodan_table.border = True
for el in correlation_results["shodan"]:
shodan_table.add_row(["Shodan", el, correlation_results["shodan"][el]["category_type"]] )
print(shodan_table)
else:
print("\033[93mNo correlating data found from Shodan")
+--------+------------------------------------------------------------------+----------------------------------------------+ | Source | Value | Category and Type | +--------+------------------------------------------------------------------+----------------------------------------------+ | Shodan | AS9009 | ['Network activity/AS'] | | Shodan | 158.255.208.168 | ['Network activity/ip-src'] | | Shodan | 80 | ['Network activity/port'] | | Shodan | 443 | ['Network activity/port'] | | Shodan | 8443 | ['Network activity/port'] | | Shodan | 158.in-addr.arpa | ['Network activity/domain'] | | Shodan | metercdn.net | ['Network activity/hostname'] | | Shodan | 168.208.255.158.in-addr.arpa | ['Network activity/hostname'] | | Shodan | 2ae166ec2af00fa9bce0d9f992608f9075109d6b9a126274213a8a2d1c83ab70 | ['Network activity/x509-fingerprint-sha256'] | | Shodan | 0e8ff37c435bccb4082ffa95aa3ed5cbd04f6e87 | ['Network activity/x509-fingerprint-sha1'] | +--------+------------------------------------------------------------------+----------------------------------------------+
Closure¶
In this closure or end step we create a summary of the actions that were performed by the playbook. The summary is printed and can also be send to a chat channel.
EN:1 Create the summary of the playbook¶
The next section creates a summary and stores the output in the variable summary
in Markdown format. It also stores an intro text in the variable intro
. These variables can later be used when sending information to Mattermost or TheHive.
summary = "## MISP Playbook summary\nCreate a custom MISP warninglist. \n\n"
if post_text == "update":
summary += "There was an **update** to a MISP warning list {} ({}).\n\n".format(warninglist_name, warninglist_id)
else:
summary += "A **new** MISP warning list {} ({}) was added.\n\n".format(warninglist_name, warninglist_id)
intro = summary
summary += "### Warninglist details\n"
summary += "\n - Name: {}".format(warninglist_name)
summary += "\n - ID: {}".format(warninglist_id)
summary += "\n - Description: {}".format(warninglist_description)
summary += "\n - Type: {}".format(warninglist_type)
summary += "\n - Category: {}".format(warninglist_category)
summary += "\n - Matching attributes: {}".format(warninglist_matching_attributes)
summary += "\n\n"
summary += "### Warninglist values\n"
for value in warninglist_values_list:
summary += "{}\n".format(value)
summary += "\n"
if misp_matches_count > 0:
summary += "### MISP event matches\nThere are {} matches for the warninglist values found in the MISP events. \n\n".format(misp_matches_count)
misp_table.set_style(MARKDOWN)
summary += misp_table.get_string()
else:
summary += "### MISP event matches\nThere are no matches for the warninglist values found in the MISP events."
summary += "\n"
if len(correlation_results["virustotal"]) > 0:
summary += "### VirusTotal matches\nThere are {} matches for the warninglist values found in VirusTotal. \n\n".format(len(correlation_results["virustotal"]))
vt_table.set_style(MARKDOWN)
summary += vt_table.get_string()
else:
summary += "### VirusTotal matches\nThere are no matches for the warninglist values found in VirusTotal."
summary += "\n"
if len(correlation_results["shodan"]) > 0:
summary += "### Shodan matches\nThere are {} matches for the warninglist values found in Shodan. \n\n".format(len(correlation_results["shodan"]))
shodan_table.set_style(MARKDOWN)
summary += shodan_table.get_string()
else:
summary += "### Shodan matches\nThere are no matches for the warninglist values found in Shodan."
summary += "\n\n"
print("The \033[92msummary\033[90m of the playbook is available.\n")
The summary of the playbook is available.
EN:2 Send a summary to Mattermost¶
Now you can send the summary to Mattermost. You can send the summary in two ways by selecting one of the options for the variable send_to_mattermost_option
in the next cell.
- The default option where the entire summary is in the chat, or
- a short intro and the summary in a card
For this playbook we rely on a webhook in Mattermost. You can add a webhook by choosing the gear icon in Mattermost, then choose Integrations and then Incoming Webhooks. Set a channel for the webhook and lock the webhook to this channel with "Lock to this channel".
send_to_mattermost_option = "via a chat message"
#send_to_mattermost_option = "via a chat message with card"
message = False
if send_to_mattermost_option == "via a chat message":
message = {"username": mattermost_playbook_user, "text": summary}
elif send_to_mattermost_option == "via a chat message with card":
message = {"username": mattermost_playbook_user, "text": intro, "props": {"card": summary}}
if message:
r = requests.post(mattermost_hook, data=json.dumps(message))
r.raise_for_status()
if message and r.status_code == 200:
print("Summary is \033[92msent to Mattermost.\n")
else:
print("\033[91mFailed to sent summary\033[90m to Mattermost.\n")
Summary is sent to Mattermost.
EN:3 Send an alert to TheHive¶
Next to informing your colleagues via Mattermost you can also send an alert to TheHive. The alert contains the summary, and a list of MISP warninglist values as 'observables'.
You can change the alert title with thehive_alert_title
and provide a reference type with thehive_alert_reference
. Note that this reference needs to be unique in TheHive. If you want to create multiple alerts for the same MISP event then add some random value at the end.
# The title of the TheHive alert
thehive_alert_title = "MISP Playbook Summary"
# A unique reference for the TheHive
thehive_alert_reference = "MISP warninglist ID - {} - {} - {}".format(warninglist_name, warninglist_id, str(uuid.uuid4()))
# Alert type in TheHive
thehive_alert_type = "MISP Playbook alert"
# TLP:Amber for TheHive
thehive_tlp = 2
# PAP:GREEN for TheHive
thehive_pap = 1
# Code block to send an alert to TheHive
# We use the Python requests library
thehive_headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {thehive_key}'}
thehive_url_create = "{}/api/v1/alert".format(thehive_url)
thehive_observables = []
for value in warninglist_values_list:
attribute_type = False
for expr in regular_expressions:
if re.match(r"{}".format(regular_expressions[expr]), value):
attribute_type = expr
break
if attribute_type:
if attribute_type == "ip-src" or attribute_type == "ip-dst":
dataType = "ip"
elif attribute_type == "md5" or attribute_type == "sha256" or attribute_type == "sha1":
dataType = "hash"
else:
dataType = attribute_type
if dataType:
thehive_observables.append({"dataType": dataType, "data": value, "pap": thehive_pap, "tlp": thehive_tlp})
thehive_alert = {"title": thehive_alert_title,
"description": intro,
"summary": summary[0:1048576],
"type": thehive_alert_type,
"source": "playbook",
"sourceRef": thehive_alert_reference,
"tlp": thehive_tlp, "pap": thehive_pap,
"observables": thehive_observables}
result = requests.post(thehive_url_create, headers=thehive_headers, data=json.dumps(thehive_alert))
if result.status_code == 201 and result.json()['status'] == 'New':
thehive_alert_id = result.json()['_id']
print('The TheHive \033[92malert {} is added'.format(thehive_alert_id))
else:
print('\033[91mFailed\033[90m to add TheHive alert')
print(result.text)
The TheHive alert ~32912 is added
EN:4 End of the playbook¶
print("\033[92m End of the playbook")
External references ¶
Technical details¶
Documentation¶
This playbook requires these Python libraries to exist in the environment where the playbook is executed. You can install them with pip install <library>
.
pyfaup
chardet
PrettyTable
mattermostdriver
You need to have network access to
- your MISP server (HTTP or HTTPS)
These MISP modules need to be enabled
- virustotal_public
- shodan
You need
- an API key with MISP
- Under Global Actions, My Profile. Add an extra authentication key.
- Add the API key (
misp_key
) and the MISP URL (misp_url
) tokeys.py
- Add the API key (
- If you use a self-signed certificate set
misp_verifycert
to False
- If you use a self-signed certificate set
- an API key with VirusTotal
- Click on your username (upper right corner), select API key.
- Add the API key (
virustotal_apikey
) tokeys.py
.
- Add the API key (
- an API key with Shodan
- Click on your Shodan (upper left corner), click on Account.
- Add the API key (
shodan_apikey
) tokeys.py
.
- Add the API key (
- an incoming webhook in your Mattermost server
- Set this up under Integrations, Incoming Webhooks. Set as default channel your SOC/CSIRT team channel. For additional protection, lock the webhook so that the incoming webhook can post only to the selected channel.
- Add the webhook to
mattermost_hook
. It is displayed under 'integrations/incoming_webhooks' and set a username undermattermost_playbook_user
- Add the webhook to
- an API key with your TheHive server
- Click on your username (upper right corner), Settings and then API key
- Make sure that your user has 'manageAlert/create' privileges
- Add the API key (
thehive_key
) tokeys.py
and add the URL to TheHive (thehive_url
)
- Add the API key (
Colour codes¶
The output from Python displays some text in different colours. These are the colour codes
Red = '\033[91m'
Green = '\033[92m'
Blue = '\033[94m'
Cyan = '\033[96m'
White = '\033[97m'
Yellow = '\033[93m'
Magenta = '\033[95m'
Grey = '\033[90m'
Black = '\033[90m'
Default = '\033[99m'