The pipeline
Each run flows through a fixed set of stages, every one of which is unit-tested in isolation. build_bundle() is the spine:
def build_bundle(config: dict) -> RuleBundle:
raw = collect_indicators(config)
merged = deduplicate(raw)
kept = filter_by_confidence(merged, config["min_confidence"])
techniques = extract_techniques(kept)
cdb_lists = rules.build_cdb_lists(kept)
rules_xml = rules.build_rules_xml(kept, base_id=config["rules"]["base_id"])
bundle_id = datetime.now(timezone.utc).strftime("cti-%Y%m%d-%H%M%S")
return RuleBundle(
bundle_id=bundle_id,
generated_at=RuleBundle.now_iso(),
indicators=kept,
techniques=techniques,
cdb_lists=cdb_lists,
rules_xml=rules_xml,
)A real run over the bundled fixtures produces a 19-indicator bundle: 5 IPs, 6 domains, 6 URLs, and 2 hashes, mapping to 15 distinct ATT&CK techniques across the five sources.
Feed connectors
Five public sources, each a connector subclassing one abstract Feed with a pure parse() split cleanly from its network fetch_raw(), so parsing is tested against fixtures while the HTTP path stays thin:
| Feed | Provides | Auth |
|---|---|---|
| ThreatFox | IPs, domains, URLs, hashes, malware family | free key (POST) |
| Feodo Tracker | botnet C2 IPs | keyless |
| URLhaus | malware-distribution URLs + host domains | keyless |
| AlienVault OTX | pulse indicators carrying ATT&CK IDs | free key |
| OpenPhish | phishing URLs + host domains | keyless |
The ThreatFox connector is representative: it normalizes the API's IOC types, strips ports off ip:port values, and derives techniques from both the malware family and the threat type.
def parse(self, raw: str) -> list[Indicator]:
payload = load_json(raw)
if payload.get("query_status") != "ok":
return []
indicators: list[Indicator] = []
for entry in payload.get("data", []):
ioc_type = TYPE_MAP.get(entry.get("ioc_type"))
if ioc_type is None:
continue
value = entry.get("ioc", "")
if ioc_type == "ip" and ":" in value:
value = value.split(":", 1)[0]
malware = entry.get("malware_printable") or entry.get("malware")
threat_type = entry.get("threat_type", "unknown")
techniques = mitre.techniques_for_malware(malware)
techniques += mitre.techniques_for_threat_type(threat_type)URLhaus and OpenPhish additionally split each URL's host into a separate domain indicator, which is where the cross-feed dedup earns its keep.
Dedup and normalization
Every connector emits the same Indicator dataclass, so downstream code never special-cases a source. Deduplication keys on (type, value.lower()), the same IP seen in ThreatFox, Feodo, and OTX collapses to one indicator carrying the union of all three sources, the maximum confidence, and the union of techniques and tags:
def deduplicate(indicators: list[Indicator]) -> list[Indicator]:
merged: dict[tuple[str, str], Indicator] = {}
for indicator in indicators:
key = indicator.key()
existing = merged.get(key)
if existing is None:
merged[key] = Indicator(...)
continue
existing.confidence = max(existing.confidence, indicator.confidence)
existing.techniques = sorted(set(existing.techniques) | set(indicator.techniques))
existing.tags = sorted(set(existing.tags) | set(indicator.tags))
existing.malware = existing.malware or indicator.malware
existing.reference = existing.reference or indicator.reference
existing.first_seen = existing.first_seen or indicator.first_seen
if indicator.source not in existing.source.split(","):
existing.source = ",".join(sorted(set(existing.source.split(",") + [indicator.source])))
return list(merged.values())A merge test pins the behavior exactly:
def test_merges_same_indicator_across_sources():
merged = deduplicate([
make("45.137.21.9", "threatfox", 100, ["T1071.001"], "Cobalt Strike"),
make("45.137.21.9", "feodo", 90, ["T1573"]),
make("45.137.21.9", "otx", 60, ["T1059.001"]),
])
assert len(merged) == 1
indicator = merged[0]
assert indicator.confidence == 100
assert set(indicator.techniques) == {"T1071.001", "T1573", "T1059.001"}
assert set(indicator.source.split(",")) == {"threatfox", "feodo", "otx"}After dedup, filter_by_confidence() drops anything under the configured threshold before any techniques or rules are derived.
MITRE TTP extraction
Indicators arrive with techniques attached three ways: OTX pulses carry ATT&CK IDs directly; feeds without IDs have them inferred from the malware family and from the threat type. The mapping tables are hand-built, 27 techniques in the catalog, keyed to malware families and threat types:
MALWARE_TECHNIQUES = {
"cobaltstrike": ["T1071.001", "T1059.001", "T1055"],
"agenttesla": ["T1056.001", "T1555", "T1041"],
"emotet": ["T1566.001", "T1071.001", "T1105"],
"qakbot": ["T1566.001", "T1055", "T1071.001"],
...
}
THREAT_TYPE_TECHNIQUES = {
"phishing": ["T1566.002", "T1204.001"],
"botnet_cc": ["T1071.001", "T1573"],
"malware_download": ["T1105", "T1204.002"],
"ransomware": ["T1486", "T1071.001"],
"leaked_credentials": ["T1589.001"],
}Family lookup is fuzzy, it lowercases the family and strips spaces, hyphens and underscores, so Cobalt Strike, cobalt-strike and CobaltStrike all hit the same row:
def techniques_for_malware(malware: str | None) -> list[str]:
if not malware:
return []
key = malware.lower().replace(" ", "").replace("-", "").replace("_", "")
for name, techniques in MALWARE_TECHNIQUES.items():
if name in key:
return list(techniques)
return []Once collected, extract_techniques() rolls every indicator's techniques into a per-technique record with its indicator count and contributing sources, sorted by prevalence, and renders a coverage table. From the live fixture run, the top of the report reads:
| Technique | Tactic | Name | Indicators | Sources |
|---|---|---|---|---|
| T1204.001 | execution | User Execution: Malicious Link | 10 | openphish, threatfox, urlhaus |
| T1105 | command-and-control | Ingress Tool Transfer | 9 | feodo, openphish, otx, threatfox, urlhaus |
| T1566.002 | initial-access | Phishing: Spearphishing Link | 8 | openphish, urlhaus |
| T1071.001 | command-and-control | Application Layer Protocol: Web Protocols | 6 | feodo, otx, threatfox |
Generated Wazuh rules
Indicators are written as five Wazuh CDB lists, one per indicator class, each keyed on the value with a malware-family or threat-type label, sorted and deduplicated. A real cti-malicious-ip list from the active bundle:
185.220.101.45:Dridex
193.149.176.12:AsyncRAT
194.36.191.55:QakBot
45.137.21.9:Cobalt Strike
91.211.88.34:EmotetLabels are sanitized, colons and newlines stripped, clamped to 48 chars, so a value like weird:type can never break the value:label CDB format. Alongside the lists, build_rules_xml() emits list-lookup rules: outbound and inbound IP matches, a DNS-query match, a URL match, a hash-execution match, and a leaked-credential rule, each tagged with the four dominant ATT&CK techniques for that bucket. This is the verbatim outbound-IP and DNS rule from the generated local_cti_rules.xml:
<group name="cti,threat-intel,auto-generated,">
<rule id="100300" level="12">
<field name="dstip" type="pcre2">\S+</field>
<list field="dstip" lookup="address_match_key">etc/lists/cti-malicious-ip</list>
<description>Outbound connection to CTI-flagged IP: $(dstip)</description>
<mitre>
<id>T1071.001</id>
<id>T1573</id>
<id>T1566.001</id>
<id>T1059.001</id>
</mitre>
</rule>
<rule id="100302" level="12">
<field name="win.eventdata.queryName" type="pcre2">\S+</field>
<list field="win.eventdata.queryName" lookup="match_key">etc/lists/cti-malicious-domain</list>
<description>DNS query for CTI-flagged domain: $(win.eventdata.queryName)</description>
<mitre>
<id>T1204.001</id>
<id>T1566.002</id>
</mitre>
</rule>
</group>The lookups reference etc/lists/, and the hash rule fires at level 13, so the output drops directly into a Wazuh manager with no hand-editing. A test parses the generated XML with ElementTree to prove it is well-formed and that the <mitre> tags survived.
The signed approval gate
The pipeline never deploys on its own. The review link carries an itsdangerous URL-safe timed token, signed with a server secret under a fixed salt, so it cannot be forged and stops working once the TTL elapses:
def serializer(secret: str) -> URLSafeTimedSerializer:
return URLSafeTimedSerializer(secret, salt="cti-rule-approval")
def make_token(secret: str, bundle_id: str) -> str:
return serializer(secret).dumps({"bundle_id": bundle_id})
def verify_token(secret: str, token: str, max_age: int) -> str | None:
try:
data = serializer(secret).loads(token, max_age=max_age)
except (BadSignature, SignatureExpired):
return None
return data.get("bundle_id")A small Flask console serves the gate. Every state-changing route verifies the token first and abort(403)s on a bad or expired one; approval promotes the candidate to active and, if a Wazuh path is set, deploys to the manager:
@app.post("/approve/<token>")
def approve(token):
bundle_id = approval.verify_token(secret, token, ttl)
if not bundle_id:
abort(403)
active_dir = config.get("wazuh_etc_dir")
result = pipeline.promote(
bundle_id, output_dir, Path(active_dir) if active_dir else None
)
return render_template("result.html", action="approved", result=result)Reject writes a REJECTED marker carrying the analyst's reason; the dashboard then lists every candidate bundle as pending, approved, or rejected. Promotion records the approved key set in state.json, which is exactly what the next run diffs against.
Tests
30 tests across 7 files cover every stage end-to-end: feed parsing against fixtures, cross-source dedup, confidence filtering, TTP extraction, CDB and XML generation, token signing and expiry, email rendering, and the full Flask approve/reject flow. The web tests run the real pipeline and drive the routes with Flask's test client:
def test_review_requires_valid_token(app_and_bundle):
app, _ = app_and_bundle
client = app.test_client()
assert client.get("/review/garbage").status_code == 403
def test_approve_promotes(app_and_bundle):
app, result = app_and_bundle
token = make_token("test-secret", result["bundle_id"])
client = app.test_client()
resp = client.post(f"/approve/{token}")
assert resp.status_code == 200
assert b"approved" in resp.data
follow = client.get(f"/review/{token}")
assert b"already been approved" in follow.dataToken security is pinned directly, a token signed with one secret will not verify under another, and a zero-max-age token is rejected after a one-second sleep:
def test_token_rejects_wrong_secret():
token = make_token("secret", "bundle")
assert verify_token("other", token, 60) is None
def test_token_expires():
token = make_token("secret", "bundle")
time.sleep(1)
assert verify_token("secret", token, 0) is None


