Zion Boggan

In-depth vulnerability research, detection engineering & applied cryptography.

● Open to security-research & detection roles
GitHub · LinkedIn · Email
← All work
THREAT INTEL

CTI Detection Automation

Pulls indicators from five live threat-intel feeds, dedupes across them, extracts the MITRE techniques, generates Wazuh CDB lists and a tagged XML ruleset, then emails an analyst a signed, single-use review link before anything reaches the SIEM.

PythonThreatFox / OTX / URLhausWazuh CDBATT&CKFlaskitsdangerousSMTP
5
live feeds connected
30
tests passing
5
Wazuh CDB list types
27
ATT&CK techniques in catalog
1
signed human gate before deploy
The approval email the analyst receives: bundle ID, indicator counts by type, the diff against the last approved bundle, the top ATT&CK techniques, and the signed review link.
The approval email the analyst receives: bundle ID, indicator counts by type, the diff against the last approved bundle, the top ATT&CK techniques, and the signed review link.

The pipeline

Each run flows through a fixed set of stages, every one of which is unit-tested in isolation. build_bundle() is the spine:

def build_bundle(config: dict) -> RuleBundle:
 raw = collect_indicators(config)
 merged = deduplicate(raw)
 kept = filter_by_confidence(merged, config["min_confidence"])
 techniques = extract_techniques(kept)
 cdb_lists = rules.build_cdb_lists(kept)
 rules_xml = rules.build_rules_xml(kept, base_id=config["rules"]["base_id"])
 bundle_id = datetime.now(timezone.utc).strftime("cti-%Y%m%d-%H%M%S")
 return RuleBundle(
 bundle_id=bundle_id,
 generated_at=RuleBundle.now_iso(),
 indicators=kept,
 techniques=techniques,
 cdb_lists=cdb_lists,
 rules_xml=rules_xml,
 )

A real run over the bundled fixtures produces a 19-indicator bundle: 5 IPs, 6 domains, 6 URLs, and 2 hashes, mapping to 15 distinct ATT&CK techniques across the five sources.

Feed connectors

Five public sources, each a connector subclassing one abstract Feed with a pure parse() split cleanly from its network fetch_raw(), so parsing is tested against fixtures while the HTTP path stays thin:

FeedProvidesAuth
ThreatFoxIPs, domains, URLs, hashes, malware familyfree key (POST)
Feodo Trackerbotnet C2 IPskeyless
URLhausmalware-distribution URLs + host domainskeyless
AlienVault OTXpulse indicators carrying ATT&CK IDsfree key
OpenPhishphishing URLs + host domainskeyless

The ThreatFox connector is representative: it normalizes the API's IOC types, strips ports off ip:port values, and derives techniques from both the malware family and the threat type.

def parse(self, raw: str) -> list[Indicator]:
 payload = load_json(raw)
 if payload.get("query_status") != "ok":
 return []
 indicators: list[Indicator] = []
 for entry in payload.get("data", []):
 ioc_type = TYPE_MAP.get(entry.get("ioc_type"))
 if ioc_type is None:
 continue
 value = entry.get("ioc", "")
 if ioc_type == "ip" and ":" in value:
 value = value.split(":", 1)[0]
 malware = entry.get("malware_printable") or entry.get("malware")
 threat_type = entry.get("threat_type", "unknown")
 techniques = mitre.techniques_for_malware(malware)
 techniques += mitre.techniques_for_threat_type(threat_type)

URLhaus and OpenPhish additionally split each URL's host into a separate domain indicator, which is where the cross-feed dedup earns its keep.

Dedup and normalization

Every connector emits the same Indicator dataclass, so downstream code never special-cases a source. Deduplication keys on (type, value.lower()), the same IP seen in ThreatFox, Feodo, and OTX collapses to one indicator carrying the union of all three sources, the maximum confidence, and the union of techniques and tags:

def deduplicate(indicators: list[Indicator]) -> list[Indicator]:
 merged: dict[tuple[str, str], Indicator] = {}
 for indicator in indicators:
 key = indicator.key()
 existing = merged.get(key)
 if existing is None:
 merged[key] = Indicator(...)
 continue
 existing.confidence = max(existing.confidence, indicator.confidence)
 existing.techniques = sorted(set(existing.techniques) | set(indicator.techniques))
 existing.tags = sorted(set(existing.tags) | set(indicator.tags))
 existing.malware = existing.malware or indicator.malware
 existing.reference = existing.reference or indicator.reference
 existing.first_seen = existing.first_seen or indicator.first_seen
 if indicator.source not in existing.source.split(","):
 existing.source = ",".join(sorted(set(existing.source.split(",") + [indicator.source])))
 return list(merged.values())

A merge test pins the behavior exactly:

def test_merges_same_indicator_across_sources():
 merged = deduplicate([
 make("45.137.21.9", "threatfox", 100, ["T1071.001"], "Cobalt Strike"),
 make("45.137.21.9", "feodo", 90, ["T1573"]),
 make("45.137.21.9", "otx", 60, ["T1059.001"]),
 ])
 assert len(merged) == 1
 indicator = merged[0]
 assert indicator.confidence == 100
 assert set(indicator.techniques) == {"T1071.001", "T1573", "T1059.001"}
 assert set(indicator.source.split(",")) == {"threatfox", "feodo", "otx"}

After dedup, filter_by_confidence() drops anything under the configured threshold before any techniques or rules are derived.

MITRE TTP extraction

Indicators arrive with techniques attached three ways: OTX pulses carry ATT&CK IDs directly; feeds without IDs have them inferred from the malware family and from the threat type. The mapping tables are hand-built, 27 techniques in the catalog, keyed to malware families and threat types:

MALWARE_TECHNIQUES = {
 "cobaltstrike": ["T1071.001", "T1059.001", "T1055"],
 "agenttesla": ["T1056.001", "T1555", "T1041"],
 "emotet": ["T1566.001", "T1071.001", "T1105"],
 "qakbot": ["T1566.001", "T1055", "T1071.001"],
 ...
}

THREAT_TYPE_TECHNIQUES = {
 "phishing": ["T1566.002", "T1204.001"],
 "botnet_cc": ["T1071.001", "T1573"],
 "malware_download": ["T1105", "T1204.002"],
 "ransomware": ["T1486", "T1071.001"],
 "leaked_credentials": ["T1589.001"],
}

Family lookup is fuzzy, it lowercases the family and strips spaces, hyphens and underscores, so Cobalt Strike, cobalt-strike and CobaltStrike all hit the same row:

def techniques_for_malware(malware: str | None) -> list[str]:
 if not malware:
 return []
 key = malware.lower().replace(" ", "").replace("-", "").replace("_", "")
 for name, techniques in MALWARE_TECHNIQUES.items():
 if name in key:
 return list(techniques)
 return []

Once collected, extract_techniques() rolls every indicator's techniques into a per-technique record with its indicator count and contributing sources, sorted by prevalence, and renders a coverage table. From the live fixture run, the top of the report reads:

| Technique | Tactic | Name | Indicators | Sources |
|---|---|---|---|---|
| T1204.001 | execution | User Execution: Malicious Link | 10 | openphish, threatfox, urlhaus |
| T1105 | command-and-control | Ingress Tool Transfer | 9 | feodo, openphish, otx, threatfox, urlhaus |
| T1566.002 | initial-access | Phishing: Spearphishing Link | 8 | openphish, urlhaus |
| T1071.001 | command-and-control | Application Layer Protocol: Web Protocols | 6 | feodo, otx, threatfox |

Generated Wazuh rules

Indicators are written as five Wazuh CDB lists, one per indicator class, each keyed on the value with a malware-family or threat-type label, sorted and deduplicated. A real cti-malicious-ip list from the active bundle:

185.220.101.45:Dridex
193.149.176.12:AsyncRAT
194.36.191.55:QakBot
45.137.21.9:Cobalt Strike
91.211.88.34:Emotet

Labels are sanitized, colons and newlines stripped, clamped to 48 chars, so a value like weird:type can never break the value:label CDB format. Alongside the lists, build_rules_xml() emits list-lookup rules: outbound and inbound IP matches, a DNS-query match, a URL match, a hash-execution match, and a leaked-credential rule, each tagged with the four dominant ATT&CK techniques for that bucket. This is the verbatim outbound-IP and DNS rule from the generated local_cti_rules.xml:

<group name="cti,threat-intel,auto-generated,">
 <rule id="100300" level="12">
 <field name="dstip" type="pcre2">\S+</field>
 <list field="dstip" lookup="address_match_key">etc/lists/cti-malicious-ip</list>
 <description>Outbound connection to CTI-flagged IP: $(dstip)</description>
 <mitre>
 <id>T1071.001</id>
 <id>T1573</id>
 <id>T1566.001</id>
 <id>T1059.001</id>
 </mitre>
 </rule>
 <rule id="100302" level="12">
 <field name="win.eventdata.queryName" type="pcre2">\S+</field>
 <list field="win.eventdata.queryName" lookup="match_key">etc/lists/cti-malicious-domain</list>
 <description>DNS query for CTI-flagged domain: $(win.eventdata.queryName)</description>
 <mitre>
 <id>T1204.001</id>
 <id>T1566.002</id>
 </mitre>
 </rule>
</group>

The lookups reference etc/lists/, and the hash rule fires at level 13, so the output drops directly into a Wazuh manager with no hand-editing. A test parses the generated XML with ElementTree to prove it is well-formed and that the <mitre> tags survived.

The signed approval gate

The pipeline never deploys on its own. The review link carries an itsdangerous URL-safe timed token, signed with a server secret under a fixed salt, so it cannot be forged and stops working once the TTL elapses:

def serializer(secret: str) -> URLSafeTimedSerializer:
 return URLSafeTimedSerializer(secret, salt="cti-rule-approval")

def make_token(secret: str, bundle_id: str) -> str:
 return serializer(secret).dumps({"bundle_id": bundle_id})

def verify_token(secret: str, token: str, max_age: int) -> str | None:
 try:
 data = serializer(secret).loads(token, max_age=max_age)
 except (BadSignature, SignatureExpired):
 return None
 return data.get("bundle_id")

A small Flask console serves the gate. Every state-changing route verifies the token first and abort(403)s on a bad or expired one; approval promotes the candidate to active and, if a Wazuh path is set, deploys to the manager:

@app.post("/approve/<token>")
def approve(token):
 bundle_id = approval.verify_token(secret, token, ttl)
 if not bundle_id:
 abort(403)
 active_dir = config.get("wazuh_etc_dir")
 result = pipeline.promote(
 bundle_id, output_dir, Path(active_dir) if active_dir else None
 )
 return render_template("result.html", action="approved", result=result)

Reject writes a REJECTED marker carrying the analyst's reason; the dashboard then lists every candidate bundle as pending, approved, or rejected. Promotion records the approved key set in state.json, which is exactly what the next run diffs against.

Tests

30 tests across 7 files cover every stage end-to-end: feed parsing against fixtures, cross-source dedup, confidence filtering, TTP extraction, CDB and XML generation, token signing and expiry, email rendering, and the full Flask approve/reject flow. The web tests run the real pipeline and drive the routes with Flask's test client:

def test_review_requires_valid_token(app_and_bundle):
 app, _ = app_and_bundle
 client = app.test_client()
 assert client.get("/review/garbage").status_code == 403


def test_approve_promotes(app_and_bundle):
 app, result = app_and_bundle
 token = make_token("test-secret", result["bundle_id"])
 client = app.test_client()
 resp = client.post(f"/approve/{token}")
 assert resp.status_code == 200
 assert b"approved" in resp.data
 follow = client.get(f"/review/{token}")
 assert b"already been approved" in follow.data

Token security is pinned directly, a token signed with one secret will not verify under another, and a zero-max-age token is rejected after a one-second sleep:

def test_token_rejects_wrong_secret():
 token = make_token("secret", "bundle")
 assert verify_token("other", token, 60) is None


def test_token_expires():
 token = make_token("secret", "bundle")
 time.sleep(1)
 assert verify_token("secret", token, 0) is None

Repository · github.com/zionsworking/cti-detection-automation