Curate MISP events¶
Introduction¶
- UUID: 4c4d34f7-8628-410e-b41d-712061020e0c
- Started from issue 21
- State: Published
- Purpose: This playbook queries for MISP events that require curation and addresses the remaining curation tasks. In general you run this playbook after your automatic or manual curation process has highlighted the events that require a review but you can also force the playbook to curate all events. This playbook uses the hashlookup and mmdb_lookup MISP modules.
- This playbook searches for events with
workflow:state=incomplete
. - If you force curation (
curation_always_do_curation_tasks
) then all curation tasks are executed. Otherwise, - The playbook processes only the curation tasks (
workflow:todo
) attached ('tagged') to an event. - After curation the state is changed to
workflow:state=complete
and the event is published. - The curation includes
- Disable to_ids for attributes matching a warninglist, except for URLs or hostnames where a manual review is necessary
- Disable to_ids for attributes matching known software (via hashlookup)
- Add a GalaxyCluster with the location of an IP (via mmdb_lookup)
- Add TTPs based on string matches in the event title
- Tag attributes that are in MISP feeds (for easyr filtering afterwards)
- Use this MISP playbook together with the Query for inconsistencies in MISP events playbook for optimal threat intelligence curation result.
- The results are summarised at the end of the playbook and shared with Mattermost.
- This playbook searches for events with
- Tags: [ "curation", "workflow", "todo", "qa", "quality", "audit"]
- External resources: Mattermost, Hashlookup, mmdb
- Target audience: CTI
Playbook¶
- Curate MISP events
- Introduction
- Preparation
- PR:1 Initialise environment
- PR:2 Verify MISP modules
- PR:3 Load helper functions
- PR:4 Set helper variables
- Curate events and attributes
- RE:1 Limit the search to trusted organisations
- RE:2 Search for events that require curation
- RE:3 Process curation tasks
- RE:4 Summary of findings
- RE:5 Curation details on events
- RE:6 Curation details on attributes
- RE7: Curated attributes that require a manual review
- Closure
- EN:1 Create the summary of the playbook
- EN:2 Print the summary
- EN:3 Send a summary to Mattermost
- EN:4 End of the playbook
- External references
- Technical details
Preparation¶
PR:1 Initialise environment¶
This section initialises the playbook environment and loads the required Python libraries.
The credentials for MISP (API key) and other services are loaded from the file keys.py
in the directory vault. A PyMISP object is created to interact with MISP and the active MISP server is displayed. By printing out the server name you know that it's possible to connect to MISP. In case of a problem PyMISP will indicate the error with PyMISPError: Unable to connect to MISP
.
The contents of the keys.py
file should contain at least :
misp_url="<MISP URL>" # The URL to our MISP server
misp_key="<MISP API KEY>" # The MISP API key
misp_verifycert=<True or False> # Ignore certificate errors
mattermost_playbook_user="<MATTERMOST USER>"
mattermost_hook="<MATTERMOST WEBHOOK>"
# Initialise Python environment
import urllib3
import sys
import json
from pyfaup.faup import Faup
from prettytable import PrettyTable, MARKDOWN
from IPython.display import Image, display, display_markdown, HTML
from datetime import date
import requests
import uuid
from uuid import uuid4
from pymisp import *
from pymisp.tools import GenericObjectGenerator
import re
import time
from datetime import datetime
# Load the credentials
sys.path.insert(0, "../vault/")
from keys import *
if misp_verifycert is False:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
print("The \033[92mPython libraries\033[90m are loaded and the \033[92mcredentials\033[90m are read from the keys file.")
# Create the PyMISP object
misp = PyMISP(misp_url, misp_key, misp_verifycert)
print("I will use the MISP server \033[92m{}\033[90m for this playbook.\n\n".format(misp_url))
PR:2 Verify MISP modules¶
This playbook uses the MISP modules to obtain additional correlation or enrichment information. MISP modules are autonomous modules that can be used to extend MISP for new services such as expansion, import and export. The modules are written in Python 3 following a simple API interface. The objective is to ease the extensions of MISP functionalities without modifying core components. The API is available via a simple REST API which is independent from MISP installation or configuration.
In the next cell we check if we have access to the MISP module server and if the required modules are enabled.
# Where can we find the local MISP Module server? You can leave this to the default setting in most cases.
misp_modules_url = "http://127.0.0.1:6666"
# How long do we wait between queries when using the MISP modules (API rate limiting of external service such as VirusTotal)
misp_modules_wait = 3
# Initiliasation
misp_modules = {}
misp_modules_headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
misp_modules_in_use = ["mmdb_lookup", "hashlookup"]
# Code block to query the MISP module server and check if our modules are enabled
res = requests.get("{}/modules".format(misp_modules_url), headers=misp_modules_headers)
for module in res.json():
for module_requested in misp_modules_in_use:
if module.get("name", False) == module_requested:
misp_modules[module_requested] = {"enabled": True, "input": module.get("mispattributes").get("input")}
print("Found the \033[92m{}\033[90m MISP module (Accepted input: {}).".format(module_requested, misp_modules[module_requested]["input"]))
print("\n")
PR:3 Load helper functions¶
The next cell contains helper functions that are used in this playbook.
Instead of distributing helper functions as separate Python files this playbook includes all the required code as one code cell. This makes portability of playbooks between instances easier. The downside is that functions defined in this playbook need to be defined again in other playbooks, which is not optimal for code re-use. For this iteration of playbooks it is chosen to include the code in the playbook (more portability), but you can easily create one "helper" file that contains all the helper code and then import that file in each playbook (for example by adding to the previous cell from helpers import *
).
def __includeFlattenedAttributes(event):
'''
Merge 'single' attributes and attributes part of an object into one list
'''
object_attributes = []
for misp_object in event["Object"]:
for object_attribute in misp_object["Attribute"]:
object_attributes.append(object_attribute)
event_attributes = object_attributes + event["Attribute"]
return event_attributes
def any_tag_present(element_tags, required_tags):
'''
Verify if any of the required_tags are present in the element_tags
'''
for tag in required_tags:
if tag in element_tags:
return True
return False
def todo_process_warninglist(event_uuid, event, attribute, todo):
'''
Process the matches with a MISP warninglist
Disable to_ids and tag the attribute, except for URLs or domains
'''
if "warnings" in attribute:
# URLs can match with warninglists but are not necessary FP
# dropbox.com is on warninglist, but a URL pointing to dropbox.com for malware download is not a FP
if attribute["type"] in ["url", "domain"]:
misp.tag(attribute["uuid"], "workflow:todo=\"review\"", playbook_config["only_use_local_tags"])
playbook_results["attributes_toreview"][attribute["uuid"]] = {"eventid": event["id"], "eventtitle": event["info"], "value": attribute["value"]}
print(" No to_ids change for \033[92m{}\033[90m ({}). Added 'review' tag.".format(attribute["value"], attribute["uuid"]))
else:
tags = playbook_config["curation_tasks"][todo].get("action-taken", []) + playbook_config["curation_tasks"][todo].get("action-taken-custom", []).get("warninglist", [])
misp.update_attribute({"uuid": attribute["uuid"], "to_ids": 0})
playbook_results["attributes_false_positives_warnings"][attribute["uuid"]] = {"eventid": event["id"], "eventtitle": event["info"], "value": attribute["value"]}
for tag in tags:
misp.tag(attribute["uuid"], tag, playbook_config["only_use_local_tags"])
print(" Disabled to_ids for \033[92m{}\033[90m ({})".format(attribute["value"], attribute["uuid"]))
def todo_process_hashlookup(event_uuid, event, attribute, todo):
'''
Process the matches with Hashlookup
Disable to_ids and tag the attribute
'''
if attribute["type"] in ["sha1", "md5", "sha256"]:
module_name = "hashlookup"
attribute_type = attribute["type"]
attribute_value = attribute["value"]
data = {"attribute": {"type": f"{attribute_type}", "uuid": str(uuid.uuid4()), "value": f"{attribute_value}"},
"module": module_name, "config": {"custom_API": False}
}
result = requests.post("{}/query".format(misp_modules_url), headers=misp_modules_headers, json=data)
if "results" in result.json() and len(result.json()["results"]) > 0:
result_json = result.json()["results"]
tags = playbook_config["curation_tasks"][todo].get("action-taken", []) + playbook_config["curation_tasks"][todo].get("action-taken-custom", []).get("hashlookup", [])
misp.update_attribute({"uuid": attribute["uuid"], "to_ids": 0})
playbook_results["attributes_false_positives_hashlookup"][attribute["uuid"]] = {"eventid": event["id"], "eventtitle": event["info"], "value": attribute["value"]}
for tag in tags:
misp.tag(attribute["uuid"], tag, playbook_config["only_use_local_tags"])
print(" Disabled to_ids for \033[92m{}\033[90m ({})".format(attribute["value"], attribute["uuid"]))
def todo_add_galaxycluster_on_ip(event_uuid, event, attributes, todo):
'''
Add a GalaxyCluster based on the location of the IP
'''
print(" Curation: GalaxyCluster IP - \033[92m{}\033[90m".format(todo))
for attribute in attributes:
if attribute["type"] in ["ip-src", "ip-dst", "ip-src|port", "ip-dst|port"]:
module_name = "mmdb_lookup"
attribute_type = attribute["type"]
attribute_value = attribute["value"]
data = {"attribute": {"type": f"{attribute_type}", "uuid": str(uuid.uuid4()), "value": f"{attribute_value}"},
"module": module_name
}
result = requests.post("{}/query".format(misp_modules_url), headers=misp_modules_headers, json=data)
country = False
if "results" in result.json() and len(result.json()["results"]) > 0:
result_json = result.json()["results"]
if "Object" in result_json and result_json["Object"][0].get("name", "") == "geolocation":
module_attributes = result_json["Object"][0]["Attribute"]
for module_attribute in module_attributes:
if module_attribute["object_relation"] == "country":
country = ["misp-galaxy:country=\"{}\"".format(module_attribute["value"].lower().strip())]
break
if country is not False:
tags = playbook_config["curation_tasks"][todo].get("action-taken", []) + country
for tag in tags:
misp.tag(attribute["uuid"], tag, playbook_config["only_use_local_tags"])
temp_attr = MISPAttribute()
temp_attr.uuid = attribute["uuid"]
temp_attr.comment = "Updated by MISP playbook. {}".format(attribute["comment"])
misp.update_attribute(temp_attr)
playbook_results["attributes_galaxycluster"][attribute["uuid"]] = {"eventid": event["id"], "eventtitle": event["info"], "value": attribute["value"], "country": country}
print(" Add GalaxyCluster \033[92m{}\033[90m to {}".format(country, attribute["value"]))
def todo_add_ttp(event_uuid, event, attributes, todo):
'''
Attempt to 'guess' the TTPs based on the event title
'''
print(" Curation: TTP - \033[92m{}\033[90m".format(todo))
for associate_info in playbook_config["curation_tasks"][todo].get("associate_info", ""):
if associate_info.lower().strip() in event["info"].lower().strip():
tags = playbook_config["curation_tasks"][todo]["associate_info"][associate_info]
for tag in tags:
misp.tag(event_uuid, tag, playbook_config["only_use_local_tags"])
playbook_results["events_ttp"][event_uuid] = {"eventid": event["id"], "eventtitle": event["info"], "eventdate": event["date"], "org": event["Orgc"]["name"]}
print(" Add TTPs \033[92m{}\033[90m to {}".format(playbook_config["curation_tasks"][todo]["associate_info"][associate_info], event["info"]))
def todo_check_feed_overlap(event_uuid, event, attributes, todo):
'''
Check for overlap with MISP feeds
'''
for attribute in attributes:
if "Feed" in attribute and len(attribute.get("Feed", [])) > 0:
feedlist = ""
for feed in attribute.get("Feed", []):
feedlist = "{} {}".format(feed["name"], feedlist)
playbook_results["attributes_feedoverlap"][attribute["uuid"]] = {"eventid": event["id"], "eventtitle": event["info"], "value": attribute["value"], "feedlist": feedlist.strip()}
tags = playbook_config["curation_tasks"][todo].get("action-taken", []) + playbook_config["curation_tasks"][todo].get("action-taken-custom", []).get("feedoverlap", [])
for tag in tags:
misp.tag(attribute["uuid"], tag, playbook_config["only_use_local_tags"])
print(" Overlap with feed {} for \033[92m{}\033[90m ({})".format(feedlist.strip(), attribute["value"], attribute["uuid"]))
PR:4 Set helper variables¶
This cell contains helper variables that are used in this playbook. Their usage is explained in the next steps of the playbook.
playbook_config
: the configuration of the playbookplaybook_results
: the results of the playbookresult_limit
: maximum number of results to include in one result page when querying MISP
playbook_config = {
"curation_complete": ["workflow:state=\"complete\""],
"curation_incomplete": ["workflow:state=\"incomplete\""],
"curation_always_do_curation_tasks": False,
"curation_tasks": {
"workflow:todo=\"review-for-false-positive\"": {
"function": "todo_process_fp",
"action-taken": ["misp-workflow:analysis=\"false-positive\"",
#"misp-workflow:action-taken=\"ids-flag-removed\"",
],
"action-taken-custom": {"hashlookup": ["cudeso.be:curated=\"disable_ids_circl_hashlookup\""],
"warninglist": ["cudeso.be:curated=\"disable_ids_warninglist\""]}
},
"workflow:todo=\"add-context\"": {
"function": "todo_add_context",
"action-taken": [#"misp-workflow:action-taken=\"add-context\""
],
"associate_info":{
"phishing": ["misp-galaxy:mitre-attack-pattern=\"Phishing - T1566\"", "misp-galaxy:rsit=\"Fraud:Phishing\""],
"powershell": ["misp-galaxy:mitre-attack-pattern=\"PowerShell - T1059.001\""],
"government": ["misp-galaxy:sector=\"Government, Administration\""],
"telecom": ["misp-galaxy:sector=\"Telecoms\""],
"energy": ["misp-galaxy:sector=\"Energy\""]
}
},
"workflow:todo=\"additional-task\"": {
"function": "todo_additional_task",
"action-taken": [],
"action-taken-custom": {"feedoverlap": ["cudeso.be:curated=\"feed_overlap\""]}
},
},
"publish": True,
"only_use_local_tags": True,
"remove_unknown_todo": True
}
playbook_results = {"events": {},
"events_autopublish": {},
"events_publish_after_tasks": {},
"events_ttp": {},
"attributes_false_positives_warnings": {},
"attributes_false_positives_hashlookup": {},
"attributes_toreview": {},
"attributes_feedoverlap": {},
"attributes_galaxycluster": {}
}
result_limit = 20
Curate events and attributes¶
RE:1 Limit the search to trusted organisations¶
You can limit the curation to one or more organisations with the variable org_list
. Optionally you could extend this code to get the list from a warninglist holding the high confidence organisations. Also see this blog article: Curate events with an organisation confidence level.
# Only consider events created by the below organisations. Set to False to include events from all orgs
#org_list = [1, 2, 14, 16]
org_list = False
RE:2 Search for events that require curation¶
The playbook searches for the events that require curation. The search uses the values from playbook_config
, more specific the "curation_incomplete"
entry. In most cases this is "workflow:state=\"incomplete\""]
.
In general, this playbook assumes you have an automatic or manual curation process that adds curation tasks (workflow:todo=*
) to an event. If you do not have such process, then set curation_always_do_curation_tasks
to force curation to happen via this playbook.
If there are no remaining tasks, meaning the event is not tagged with a workflow:todo=*
tag or you have not forced curation to happen, then the curation (workflow) is marked as complete (with the value from "curation_complete"
) and the event is published (provided the value publish
in playbook_config
is set to True).
print("Searching for events ...")
processed_events = 0
auto_published_events = 0
event_list = misp.search("events", tags=playbook_config["curation_incomplete"], org=org_list, limit=result_limit)
print("Got {} results.".format(len(event_list)))
if len(event_list) > 0:
for event_entry in event_list:
event = event_entry["Event"]
workflow_todo = False
if playbook_config["curation_always_do_curation_tasks"]:
# Force curation to always happen on curation_incomplete
workflow_todo = True
for task in playbook_config["curation_tasks"]:
print(" Force curation \033[91m{}\033[90m ({} - {}) : \033[92m{}\033[90m".format(event["info"], event["id"], event["uuid"], task))
if event["uuid"] not in playbook_results["events"]:
# The search does not include feed and warning matches; manually get the event
event_details = misp.get_event(event["uuid"])["Event"]
_AttributeFlattened = __includeFlattenedAttributes(event_details)
playbook_results["events"][event["uuid"]] = {"todo": [task], "event": event_details, "attributes": _AttributeFlattened}
else:
playbook_results["events"][event["uuid"]]["todo"].append(task)
else:
# Check for "curation tasks"
for tag in event["Tag"]:
if "workflow:todo=" in tag["name"]:
workflow_todo = True
print(" \033[91m{}\033[90m ({} - {}) : \033[92m{}\033[90m".format(event["info"], event["id"], event["uuid"], tag["name"]))
if event["uuid"] not in playbook_results["events"]:
# The search does not include feed and warning matches; manually get the event
event_details = misp.get_event(event["uuid"])["Event"]
_AttributeFlattened = __includeFlattenedAttributes(event_details)
playbook_results["events"][event["uuid"]] = {"todo": [tag["name"]], "event": event_details, "attributes": _AttributeFlattened}
else:
playbook_results["events"][event["uuid"]]["todo"].append(tag["name"])
if not workflow_todo:
# No curation tasks
print(" \033[92m{}\033[90m ({} - {}) : No remaining todo".format(event["info"], event["id"], event["uuid"]))
for tag in playbook_config["curation_incomplete"]:
misp.untag(event["uuid"], tag)
for tag in playbook_config["curation_complete"]:
misp.tag(event["uuid"], tag, playbook_config["only_use_local_tags"])
if playbook_config["publish"]:
result = misp.publish(event["uuid"])
playbook_results["events_autopublish"][event["uuid"]] = {"eventid": event["id"], "eventtitle": event["info"], "eventdate": event["date"], "org": event["Orgc"]["name"]}
print(" Changed curation workflow state and publish event {}".format(result))
auto_published_events += 1
processed_events += 1
print(" Processed \033[92m{}\033[90m events, of which \033[92m{}\033[90m events have their workflow state changed and are published.".format(processed_events, auto_published_events))
if len(playbook_results["events"]) > 0:
print(" There are \033[91m{}\033[90m events that have remaining workflow tasks.".format(len(playbook_results["events"])))
print("Finished processing events.")
else:
print("No events found.")
RE:3 Process curation tasks¶
The events that require further curation are stored in playbook_results["events"]
, along with the curation task. This curation task corresponds with a workflow:todo=*
tag. For each tag, or todo (or curation task), a function is defined in playbook_config["curation_tasks"][todo]["function"]
. In the next call we process all these todo and execute the corresponding function. The progress of the curation tasks is displayed, and results are stored in playbook_results
.
Complimentary curation actions can be found in Query for inconsistencies in MISP events, but also have a look at the MISP Workflow Blueprints.
def todo_process_fp(event_uuid, event, attributes, todo):
print(" Curation: false positives \033[92m{}\033[90m".format(todo))
for attribute in attributes:
if attribute["to_ids"]:
todo_process_warninglist(event_uuid, event, attribute, todo)
todo_process_hashlookup(event_uuid, event, attribute, todo)
def todo_add_context(event_uuid, event, attributes, todo):
todo_add_galaxycluster_on_ip(event_uuid, event, attributes, todo)
todo_add_ttp(event_uuid, event, attributes, todo)
def todo_additional_task(event_uuid, event, attributes, todo):
todo_check_feed_overlap(event_uuid, event, attributes, todo)
print("Start processing the remaining todo")
for event_uuid in playbook_results["events"]:
event_data = playbook_results["events"][event_uuid]
print("Working with \033[92m{}\033[90m ({} - {})".format(event_data["event"]["info"], event_data["event"]["id"], event_data["event"]["uuid"]))
for todo in event_data["todo"]:
if todo in playbook_config["curation_tasks"] and "function" in playbook_config["curation_tasks"][todo]:
globals()[playbook_config["curation_tasks"][todo]["function"]](event_uuid, event_data["event"], event_data["attributes"], todo)
misp.untag(event_uuid, todo)
else:
print(" Unknown todo \033[91m{}\033[90m".format(todo))
if playbook_config["remove_unknown_todo"]:
misp.untag(event_uuid, todo)
if playbook_config["publish"]:
for tag in playbook_config["curation_incomplete"]:
misp.untag(event_uuid, tag)
for tag in playbook_config["curation_complete"]:
misp.tag(event_uuid, tag, playbook_config["only_use_local_tags"])
misp.publish(event_uuid)
playbook_results["events_publish_after_tasks"][event_data["event"]["uuid"]] = {"eventid": event_data["event"]["id"], "eventtitle": event_data["event"]["info"], "eventdate": event_data["event"]["date"], "org": event_data["event"]["Orgc"]["name"]}
print(" Publish event \033[92m{}\033[90m ({} - {})".format(event_data["event"]["info"], event_data["event"]["id"], event_data["event"]["uuid"]))
print("Finished processing the remaining todo")
RE:4 Create the summary of findings¶
The playbook first creates and then prints out a short summary of the findings. In the next cell the results from playbook_results
are summarised and put in a table format. This table is then printed in the summary and included in the Mattermost notification.
count_events_autopublish = 0
count_events_publish_after_curation = 0
count_events_ttp = 0
count_attribute_require_review = 0
count_attribute_disable_warninglist = 0
count_attribute_disable_hashlookup = 0
count_attribute_feedoverlap = 0
count_attribute_galaxycluster = 0
table = PrettyTable()
table.field_names = ["Event ID", "Event", "Date", "Org", "Event UUID"]
table.align["Event"] = "l"
table.align["Event ID"] = "l"
table.align["Org"] = "l"
table._max_width = {"Event": 50, "Org": 30}
for entry in playbook_results["events_autopublish"]:
count_events_autopublish += 1
entry_details = playbook_results["events_autopublish"][entry]
table.add_row([entry_details["eventid"], entry_details["eventtitle"], entry_details["eventdate"], entry_details["org"], entry])
misp_events_autopublish = table
print("Created \033[92m{}\033[90m with {} entries".format("events_autopublish", count_events_autopublish))
table = PrettyTable()
table.field_names = ["Event ID", "Event", "Date", "Org", "Event UUID"]
table.align["Event"] = "l"
table.align["Event ID"] = "l"
table.align["Org"] = "l"
table._max_width = {"Event": 50, "Org": 30}
for entry in playbook_results["events_publish_after_tasks"]:
count_events_publish_after_curation += 1
entry_details = playbook_results["events_publish_after_tasks"][entry]
table.add_row([entry_details["eventid"], entry_details["eventtitle"], entry_details["eventdate"], entry_details["org"], entry])
misp_events_publish_after_tasks = table
print("Created \033[92m{}\033[90m with {} entries".format("events_publish_after_tasks", count_events_publish_after_curation))
table = PrettyTable()
table.field_names = ["Event ID", "Event", "Date", "Org", "Event UUID"]
table.align["Event"] = "l"
table.align["Event ID"] = "l"
table._max_width = {"Event": 50}
for entry in playbook_results["events_ttp"]:
count_events_ttp += 1
entry_details = playbook_results["events_ttp"][entry]
table.add_row([entry_details["eventid"], entry_details["eventtitle"], entry_details["eventdate"], entry_details["org"], entry])
misp_events_ttp = table
print("Created \033[92m{}\033[90m with {} entries".format("events_ttp", count_events_ttp))
table = PrettyTable()
table.field_names = ["Event ID", "Event", "Attribute", "Attribute UUID"]
table.align["Event"] = "l"
table.align["Event ID"] = "l"
table.align["Attribute"] = "l"
table._max_width = {"Event": 40, "Attribute": 40}
for entry in playbook_results["attributes_false_positives_warnings"]:
count_attribute_disable_warninglist += 1
entry_details = playbook_results["attributes_false_positives_warnings"][entry]
table.add_row([entry_details["eventid"], entry_details["eventtitle"], entry_details["value"], entry])
misp_attributes_false_positives_warnings = table
print("Created \033[92m{}\033[90m with {} entries".format("attributes_false_positives_warnings", count_attribute_disable_warninglist))
table = PrettyTable()
table.field_names = ["Event ID", "Event", "Attribute", "Attribute UUID"]
table.align["Event"] = "l"
table.align["Event ID"] = "l"
table.align["Attribute"] = "l"
table._max_width = {"Event": 40, "Attribute": 40}
for entry in playbook_results["attributes_false_positives_hashlookup"]:
count_attribute_disable_hashlookup += 1
entry_details = playbook_results["attributes_false_positives_hashlookup"][entry]
table.add_row([entry_details["eventid"], entry_details["eventtitle"], entry_details["value"], entry])
misp_attributes_false_positives_hashlookup = table
print("Created \033[92m{}\033[90m with {} entries".format("attributes_false_positives_hashlookup", count_attribute_disable_hashlookup))
table = PrettyTable()
table.field_names = ["Event ID", "Event", "Attribute", "Attribute UUID"]
table.align["Event"] = "l"
table.align["Event ID"] = "l"
table.align["Attribute"] = "l"
table._max_width = {"Event": 40, "Attribute": 40}
for entry in playbook_results["attributes_toreview"]:
count_attribute_require_review += 1
entry_details = playbook_results["attributes_toreview"][entry]
table.add_row([entry_details["eventid"], entry_details["eventtitle"], entry_details["value"], entry])
misp_attributes_toreview = table
print("Created \033[92m{}\033[90m with {} entries".format("attributes_toreview", count_attribute_require_review))
table = PrettyTable()
table.field_names = ["Event ID", "Event", "Attribute", "Feedlist", "Attribute UUID"]
table.align["Event"] = "l"
table.align["Event ID"] = "l"
table.align["Attribute"] = "l"
table.align["Feedlist"] = "l"
table._max_width = {"Event": 30, "Attribute": 30, "Feedlist": 30}
for entry in playbook_results["attributes_feedoverlap"]:
count_attribute_feedoverlap += 1
entry_details = playbook_results["attributes_feedoverlap"][entry]
table.add_row([entry_details["eventid"], entry_details["eventtitle"], entry_details["value"], entry_details["feedlist"], entry])
misp_attribute_feedoverlap = table
print("Created \033[92m{}\033[90m with {} entries".format("feedoverlap", count_attribute_feedoverlap))
table = PrettyTable()
table.field_names = ["Event ID", "Event", "Attribute", "Country", "Attribute UUID"]
table.align["Event"] = "l"
table.align["Event ID"] = "l"
table.align["Attribute"] = "l"
table.align["Country"] = "l"
table._max_width = {"Event": 30, "Attribute": 30, "Country": 30}
for entry in playbook_results["attributes_galaxycluster"]:
count_attribute_galaxycluster += 1
entry_details = playbook_results["attributes_galaxycluster"][entry]
table.add_row([entry_details["eventid"], entry_details["eventtitle"], entry_details["value"], entry_details["country"], entry])
misp_attribute_galaxycluster = table
print("Created \033[92m{}\033[90m with {} entries".format("galaxycluster", count_attribute_galaxycluster))
RE:5 Curation details on events¶
Print out the details of the curation changes that happened on event level.
print("Events automatically \033[92mpublished\033[90m.")
print(misp_events_autopublish.get_string(sortby="Event ID"))
print("\n\nEvents automatically \033[92mpublished after curation tasks\033[90m.")
print(misp_events_publish_after_tasks.get_string(sortby="Event ID"))
print("\n\nEvents with additional \033[92mTTPs\033[90m.")
print(misp_events_ttp.get_string(sortby="Event ID"))
RE:6 Curation details on attributes¶
print("Attributes that have \033[92mto_ids disabled\033[90m because of \033[92mwarninglist\033[90m matches.")
print(misp_attributes_false_positives_warnings.get_string(sortby="Attribute"))
print("\n\nAttributes that have \033[92mto_ids disabled\033[90m because of \033[92mhashlookup\033[90m matches.")
print(misp_attributes_false_positives_hashlookup.get_string(sortby="Attribute"))
print("\n\nAttributes that have an overlap with \033[92mMISP feeds\033[90m.")
print(misp_attribute_feedoverlap.get_string(sortby="Attribute"))
print("\n\nAttributes with a GalaxyCluster for the \033[92mlocation\033[90m of an IP.")
print(misp_attribute_galaxycluster.get_string(sortby="Attribute"))
RE:7 Curated attributes that require a manual review¶
As part of the curation this playbook disables the to_ids for attributes that match a MISP warninglist.
The misp-warninglists are lists of well-known indicators that can be associated to potential false positives, errors or mistakes. The warning lists are integrated in MISP to display an info/warning box at the event and attribute level if such indicators are available in one of the list. The lists are also used to filter potential false-positive at API level. MISP warninglists also serve a dual use. You cannot only use them to identify false positives, you can also use the lists to track if specific attributes (for example IPs or domains) are in the threat events you receive from your community.
The warninglists work well for IPs and hostnames, but for URLs or domains you can "overcurate" attributes. For example the warninglists contain entries for dropbox.com
. The URL "https[:]//www.dropbox[.]com/maliciouspayload" will trigger a match with a warninglist. In most of these cases you'd like to keep the to_ids flag set to True. For this reason, the playbook does not change the to_ids flag if the attribute is of type URL or domain. It does tag the attribute for your review and publishes the event so that it becomes available for your security devices.
for entry in playbook_results["attributes_toreview"]:
entry_details = playbook_results["attributes_toreview"][entry]
link = "<a href=\"{}/events/view/{}/focus:{}\">{}</a>".format(misp_url, entry_details["eventid"], entry, entry_details["value"])
display(HTML("""Edit in MISP: {}""".format(link)))
print("\n\n")
print(misp_attributes_toreview.get_string())
Closure¶
In this closure or end step we create a summary of the actions that were performed by the playbook. The summary is printed and can also be send to a chat channel.
EN:1 Create the summary of the playbook¶
The next section creates a summary and stores the output in the variable summary
in Markdown format. It also stores an intro text in the variable intro
. These variables can later be used when sending information to Mattermost or TheHive.
summary = "# MISP Playbook summary\nCurate MISP events \n\n"
current_date = datetime.now()
formatted_date = current_date.strftime("%Y-%m-%d")
summary += "## Overview\n\n"
summary += "- Date: **{}**\n".format(formatted_date)
summary += "- Events reviewed: **{}**\n".format(processed_events)
summary += "- Events **automatically published**: **{}**\n".format(count_events_autopublish)
summary += "- Events published **after curation**: **{}**\n".format(count_events_publish_after_curation)
summary += "- Events with additional **TTP**: **{}**\n".format(count_events_ttp)
summary += "- Attributes that require a manual **review**: **{}**\n".format(count_attribute_require_review)
summary += "- Attributes that have to_ids disabled because of **warninglist** matches: **{}**\n".format(count_attribute_disable_warninglist)
summary += "- Attributes that have to_ids disabled because of **hashlookup** matches: **{}**\n".format(count_attribute_disable_hashlookup)
summary += "- Attributes with a GalaxyCluster for the **location** of an IP: **{}**\n".format(count_attribute_galaxycluster)
summary += "- Attributes that have an overlap with **MISP feeds**: **{}**\n".format(count_attribute_feedoverlap)
summary += "\n\n"
summary += "## Attributes that require your review\n\n"
misp_attributes_toreview.set_style(MARKDOWN)
summary += misp_attributes_toreview.get_string(sortby="Attribute")
summary += "\n\n"
summary += "## Events\n\n"
summary += "### Events automatically published\n\n"
misp_events_autopublish.set_style(MARKDOWN)
summary += misp_events_autopublish.get_string(sortby="Event ID")
summary += "\n\n"
summary += "### Events automatically published after curation tasks\n\n"
misp_events_publish_after_tasks.set_style(MARKDOWN)
summary += misp_events_publish_after_tasks.get_string(sortby="Event ID")
summary += "\n\n"
summary += "### Events with additional TTPs\n\n"
misp_events_ttp.set_style(MARKDOWN)
summary += misp_events_ttp.get_string(sortby="Event ID")
summary += "\n\n"
summary += "\n\n"
summary += "## Attributes\n\n"
summary += "## Attributes that have to_ids disabled because of warninglist matches\n\n"
misp_attributes_false_positives_warnings.set_style(MARKDOWN)
summary += misp_attributes_false_positives_warnings.get_string(sortby="Attribute")
summary += "\n\n"
summary += "## Attributes that have to_ids disabled because of hashlookup matches\n\n"
misp_attributes_false_positives_hashlookup.set_style(MARKDOWN)
summary += misp_attributes_false_positives_hashlookup.get_string(sortby="Attribute")
summary += "\n\n"
summary += "## Attributes that have an overlap with MISP feeds\n\n"
misp_attribute_feedoverlap.set_style(MARKDOWN)
summary += misp_attribute_feedoverlap.get_string(sortby="Attribute")
summary += "\n\n"
summary += "## Attributes with a GalaxyCluster for the location** of an IP\n\n"
misp_attribute_galaxycluster.set_style(MARKDOWN)
summary += misp_attribute_galaxycluster.get_string(sortby="Attribute")
summary += "\n\n"
print("The \033[92msummary\033[90m of the playbook is available.\n")
EN:2 Print the summary¶
print(summary)
# Or print with parsed markdown
#display_markdown(summary, raw=True)
EN:3 Send a summary to Mattermost¶
Now you can send the summary to Mattermost. You can send the summary in two ways by selecting one of the options for the variable send_to_mattermost_option
in the next cell.
- The default option where the entire summary is in the chat, or
- a short intro and the summary in a card
For this playbook we rely on a webhook in Mattermost. You can add a webhook by choosing the gear icon in Mattermost, then choose Integrations and then Incoming Webhooks. Set a channel for the webhook and lock the webhook to this channel with "Lock to this channel".
send_to_mattermost_option = "via a chat message"
#send_to_mattermost_option = "via a chat message with card"
message = False
if send_to_mattermost_option == "via a chat message":
message = {"username": mattermost_playbook_user, "text": summary}
elif send_to_mattermost_option == "via a chat message with card":
message = {"username": mattermost_playbook_user, "text": intro, "props": {"card": summary}}
if message:
r = requests.post(mattermost_hook, data=json.dumps(message))
r.raise_for_status()
if message and r.status_code == 200:
print("Summary is \033[92msent to Mattermost.\n")
else:
print("\033[91mFailed to sent summary\033[90m to Mattermost.\n")
EN:4 End of the playbook¶
print("\033[92m End of the playbook")
External references ¶
Technical details¶
Documentation¶
This playbook requires the MISP modules
- mmdb_lookup
- hashlookup
This playbook requires these Python libraries to exist in the environment where the playbook is executed. You can install them with pip install <library>
.
PrettyTable
ipywidgets
Colour codes¶
The output from Python displays some text in different colours. These are the colour codes
Red = '\033[91m'
Green = '\033[92m'
Blue = '\033[94m'
Cyan = '\033[96m'
White = '\033[97m'
Yellow = '\033[93m'
Magenta = '\033[95m'
Grey = '\033[90m'
Black = '\033[90m'
Default = '\033[99m'