auto_sigma_rule_generator/backend/cve2capec_client.py

447 lines
No EOL
18 KiB
Python

"""
CVE2CAPEC client for retrieving MITRE ATT&CK technique mappings.
Integrates with the CVE2CAPEC repository: https://github.com/Galeax/CVE2CAPEC
"""
import json
import logging
import requests
from typing import Dict, List, Optional
import time
from datetime import datetime, timedelta
import os
logger = logging.getLogger(__name__)
class CVE2CAPECClient:
"""Client for accessing CVE to MITRE ATT&CK technique mappings."""
def __init__(self):
self.base_url = "https://raw.githubusercontent.com/Galeax/CVE2CAPEC/main"
self.cache_file = "/tmp/cve2capec_cache.json"
self.cache_expiry_hours = 24 # Cache for 24 hours
self.cve_mappings = {}
self.technique_names = {} # Map technique IDs to names
# Load cached data if available
self._load_cache()
# Load MITRE ATT&CK technique names
self._load_technique_names()
def _load_cache(self):
"""Load cached CVE mappings if they exist and are fresh."""
try:
if os.path.exists(self.cache_file):
with open(self.cache_file, 'r') as f:
cache_data = json.load(f)
# Check if cache is still fresh
cache_time = datetime.fromisoformat(cache_data.get('timestamp', '2000-01-01'))
if datetime.now() - cache_time < timedelta(hours=self.cache_expiry_hours):
self.cve_mappings = cache_data.get('mappings', {})
logger.info(f"Loaded {len(self.cve_mappings)} CVE mappings from cache")
return
# Cache is stale or doesn't exist, fetch fresh data
self._fetch_fresh_data()
except Exception as e:
logger.error(f"Error loading CVE2CAPEC cache: {e}")
self._fetch_fresh_data()
def _fetch_fresh_data(self):
"""Fetch fresh CVE mappings from the repository."""
try:
logger.info("Fetching fresh CVE2CAPEC data from all database files...")
# Define year range to fetch (focusing on recent years first for better performance)
# Start with recent years that are most likely to be relevant
years_to_fetch = list(range(2018, 2026)) # 2018-2025
all_mappings = {}
for year in years_to_fetch:
try:
url = f"{self.base_url}/database/CVE-{year}.jsonl"
logger.info(f"Fetching CVE mappings for year {year}...")
response = requests.get(url, timeout=30)
response.raise_for_status()
# Parse JSONL format
year_mappings = {}
for line in response.text.strip().split('\n'):
if line.strip():
try:
data = json.loads(line)
year_mappings.update(data)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse line in {year} data: {e}")
continue
all_mappings.update(year_mappings)
logger.info(f"Loaded {len(year_mappings)} CVE mappings from {year}")
# Add a small delay to be respectful to the server
time.sleep(0.5)
except requests.RequestException as e:
logger.warning(f"Failed to fetch CVE-{year}.jsonl: {e}")
continue
except Exception as e:
logger.warning(f"Error processing CVE-{year}.jsonl: {e}")
continue
# Also try to fetch the new_cves.jsonl for the latest data
try:
logger.info("Fetching latest CVE mappings from new_cves.jsonl...")
url = f"{self.base_url}/results/new_cves.jsonl"
response = requests.get(url, timeout=30)
response.raise_for_status()
latest_mappings = {}
for line in response.text.strip().split('\n'):
if line.strip():
try:
data = json.loads(line)
latest_mappings.update(data)
except json.JSONDecodeError:
continue
all_mappings.update(latest_mappings)
logger.info(f"Added {len(latest_mappings)} latest CVE mappings")
except Exception as e:
logger.warning(f"Failed to fetch new_cves.jsonl: {e}")
self.cve_mappings = all_mappings
# Save to cache
cache_data = {
'timestamp': datetime.now().isoformat(),
'mappings': all_mappings,
'years_fetched': years_to_fetch
}
with open(self.cache_file, 'w') as f:
json.dump(cache_data, f)
logger.info(f"Successfully fetched and cached {len(all_mappings)} total CVE mappings")
except Exception as e:
logger.error(f"Error fetching CVE2CAPEC data: {e}")
# Continue with empty mappings if fetch fails
self.cve_mappings = {}
def _load_technique_names(self):
"""Load MITRE ATT&CK technique names for better rule descriptions."""
# Common MITRE ATT&CK techniques and their names
self.technique_names = {
# Initial Access
"1189": "Drive-by Compromise",
"1190": "Exploit Public-Facing Application",
"1133": "External Remote Services",
"1200": "Hardware Additions",
"1566": "Phishing",
"1091": "Replication Through Removable Media",
"1195": "Supply Chain Compromise",
"1199": "Trusted Relationship",
"1078": "Valid Accounts",
# Execution
"1059": "Command and Scripting Interpreter",
"1059.001": "PowerShell",
"1059.003": "Windows Command Shell",
"1059.005": "Visual Basic",
"1059.006": "Python",
"1203": "Exploitation for Client Execution",
"1559": "Inter-Process Communication",
"1106": "Execution through Module Load",
"1053": "Scheduled Task/Job",
"1129": "Shared Modules",
"1204": "User Execution",
"1047": "Windows Management Instrumentation",
# Persistence
"1098": "Account Manipulation",
"1197": "BITS Jobs",
"1547": "Boot or Logon Autostart Execution",
"1037": "Boot or Logon Initialization Scripts",
"1176": "Browser Extensions",
"1554": "Compromise Client Software Binary",
"1136": "Create Account",
"1543": "Create or Modify System Process",
"1546": "Event Triggered Execution",
"1133": "External Remote Services",
"1574": "Hijack Execution Flow",
"1525": "Implant Internal Image",
"1556": "Modify Authentication Process",
"1137": "Office Application Startup",
"1542": "Pre-OS Boot",
"1053": "Scheduled Task/Job",
"1505": "Server Software Component",
"1205": "Traffic Signaling",
"1078": "Valid Accounts",
# Privilege Escalation
"1548": "Abuse Elevation Control Mechanism",
"1134": "Access Token Manipulation",
"1547": "Boot or Logon Autostart Execution",
"1037": "Boot or Logon Initialization Scripts",
"1543": "Create or Modify System Process",
"1484": "Domain Policy Modification",
"1546": "Event Triggered Execution",
"1068": "Exploitation for Privilege Escalation",
"1574": "Hijack Execution Flow",
"1055": "Process Injection",
"1053": "Scheduled Task/Job",
"1078": "Valid Accounts",
# Defense Evasion
"1548": "Abuse Elevation Control Mechanism",
"1134": "Access Token Manipulation",
"1197": "BITS Jobs",
"1610": "Deploy Container",
"1140": "Deobfuscate/Decode Files or Information",
"1006": "Direct Volume Access",
"1484": "Domain Policy Modification",
"1480": "Execution Guardrails",
"1211": "Exploitation for Defense Evasion",
"1222": "File and Directory Permissions Modification",
"1564": "Hide Artifacts",
"1574": "Hijack Execution Flow",
"1562": "Impair Defenses",
"1070": "Indicator Removal on Host",
"1202": "Indirect Command Execution",
"1036": "Masquerading",
"1556": "Modify Authentication Process",
"1112": "Modify Registry",
"1207": "Rogue Domain Controller",
"1014": "Rootkit",
"1218": "Signed Binary Proxy Execution",
"1216": "Signed Script Proxy Execution",
"1553": "Subvert Trust Controls",
"1221": "Template Injection",
"1205": "Traffic Signaling",
"1535": "Unused/Unsupported Cloud Regions",
"1078": "Valid Accounts",
"1497": "Virtualization/Sandbox Evasion",
"1220": "XSL Script Processing",
# Credential Access
"1557": "Adversary-in-the-Middle",
"1110": "Brute Force",
"1555": "Credentials from Password Stores",
"1212": "Exploitation for Credential Access",
"1187": "Forced Authentication",
"1606": "Forge Web Credentials",
"1056": "Input Capture",
"1556": "Modify Authentication Process",
"1040": "Network Sniffing",
"1003": "OS Credential Dumping",
"1528": "Steal Application Access Token",
"1558": "Steal or Forge Kerberos Tickets",
"1111": "Two-Factor Authentication Interception",
"1552": "Unsecured Credentials",
# Discovery
"1087": "Account Discovery",
"1010": "Application Window Discovery",
"1217": "Browser Bookmark Discovery",
"1580": "Cloud Infrastructure Discovery",
"1538": "Cloud Service Dashboard",
"1526": "Cloud Service Discovery",
"1613": "Container and Resource Discovery",
"1482": "Domain Trust Discovery",
"1083": "File and Directory Discovery",
"1615": "Group Policy Discovery",
"1046": "Network Service Scanning",
"1135": "Network Share Discovery",
"1201": "Password Policy Discovery",
"1069": "Permission Groups Discovery",
"1057": "Process Discovery",
"1012": "Query Registry",
"1018": "Remote System Discovery",
"1518": "Software Discovery",
"1082": "System Information Discovery",
"1614": "System Location Discovery",
"1016": "System Network Configuration Discovery",
"1049": "System Network Connections Discovery",
"1033": "System Owner/User Discovery",
"1007": "System Service Discovery",
"1124": "System Time Discovery",
"1497": "Virtualization/Sandbox Evasion",
# Lateral Movement
"1210": "Exploitation of Remote Services",
"1534": "Internal Spearphishing",
"1570": "Lateral Tool Transfer",
"1021": "Remote Service Session Hijacking",
"1021.001": "RDP Hijacking",
"1021.002": "SSH Hijacking",
"1021.004": "Tty Shell Hijacking",
"1021.005": "VNC Hijacking",
"1080": "Taint Shared Content",
"1550": "Use Alternate Authentication Material",
# Collection
"1557": "Adversary-in-the-Middle",
"1560": "Archive Collected Data",
"1123": "Audio Capture",
"1119": "Automated Collection",
"1185": "Browser Session Hijacking",
"1115": "Clipboard Data",
"1530": "Data from Cloud Storage Object",
"1602": "Data from Configuration Repository",
"1213": "Data from Information Repositories",
"1005": "Data from Local System",
"1039": "Data from Network Shared Drive",
"1025": "Data from Removable Media",
"1074": "Data Staged",
"1114": "Email Collection",
"1056": "Input Capture",
"1113": "Screen Capture",
"1125": "Video Capture",
# Command and Control
"1071": "Application Layer Protocol",
"1092": "Communication Through Removable Media",
"1132": "Data Encoding",
"1001": "Data Obfuscation",
"1568": "Dynamic Resolution",
"1573": "Encrypted Channel",
"1008": "Fallback Channels",
"1105": "Ingress Tool Transfer",
"1104": "Multi-Stage Channels",
"1095": "Non-Application Layer Protocol",
"1571": "Non-Standard Port",
"1572": "Protocol Tunneling",
"1090": "Proxy",
"1219": "Remote Access Software",
"1102": "Web Service",
# Exfiltration
"1020": "Automated Exfiltration",
"1030": "Data Transfer Size Limits",
"1048": "Exfiltration Over Alternative Protocol",
"1041": "Exfiltration Over C2 Channel",
"1011": "Exfiltration Over Other Network Medium",
"1052": "Exfiltration Over Physical Medium",
"1567": "Exfiltration Over Web Service",
"1029": "Scheduled Transfer",
"1537": "Transfer Data to Cloud Account",
# Impact
"1531": "Account Access Removal",
"1485": "Data Destruction",
"1486": "Data Encrypted for Impact",
"1565": "Data Manipulation",
"1491": "Defacement",
"1561": "Disk Wipe",
"1499": "Endpoint Denial of Service",
"1495": "Firmware Corruption",
"1490": "Inhibit System Recovery",
"1498": "Network Denial of Service",
"1496": "Resource Hijacking",
"1489": "Service Stop",
"1529": "System Shutdown/Reboot"
}
def get_mitre_techniques_for_cve(self, cve_id: str) -> List[str]:
"""Get MITRE ATT&CK techniques for a given CVE ID."""
try:
cve_data = self.cve_mappings.get(cve_id, {})
techniques = cve_data.get('TECHNIQUES', [])
# Convert technique IDs to T-prefixed format
formatted_techniques = []
for tech in techniques:
if isinstance(tech, (int, str)):
formatted_techniques.append(f"T{tech}")
return formatted_techniques
except Exception as e:
logger.error(f"Error getting MITRE techniques for {cve_id}: {e}")
return []
def get_technique_name(self, technique_id: str) -> str:
"""Get the name for a MITRE ATT&CK technique ID."""
# Remove T prefix if present
clean_id = technique_id.replace('T', '')
return self.technique_names.get(clean_id, f"Technique {technique_id}")
def get_cwe_for_cve(self, cve_id: str) -> List[str]:
"""Get CWE codes for a given CVE ID."""
try:
cve_data = self.cve_mappings.get(cve_id, {})
cwes = cve_data.get('CWE', [])
# Format CWE IDs
formatted_cwes = []
for cwe in cwes:
if isinstance(cwe, (int, str)):
formatted_cwes.append(f"CWE-{cwe}")
return formatted_cwes
except Exception as e:
logger.error(f"Error getting CWEs for {cve_id}: {e}")
return []
def get_capec_for_cve(self, cve_id: str) -> List[str]:
"""Get CAPEC codes for a given CVE ID."""
try:
cve_data = self.cve_mappings.get(cve_id, {})
capecs = cve_data.get('CAPEC', [])
# Format CAPEC IDs
formatted_capecs = []
for capec in capecs:
if isinstance(capec, (int, str)):
formatted_capecs.append(f"CAPEC-{capec}")
return formatted_capecs
except Exception as e:
logger.error(f"Error getting CAPECs for {cve_id}: {e}")
return []
def get_full_mapping_for_cve(self, cve_id: str) -> Dict:
"""Get complete CVE mapping including CWE, CAPEC, and MITRE techniques."""
try:
return {
'cve_id': cve_id,
'mitre_techniques': self.get_mitre_techniques_for_cve(cve_id),
'cwe_codes': self.get_cwe_for_cve(cve_id),
'capec_codes': self.get_capec_for_cve(cve_id),
'has_mappings': bool(self.cve_mappings.get(cve_id, {}))
}
except Exception as e:
logger.error(f"Error getting full mapping for {cve_id}: {e}")
return {
'cve_id': cve_id,
'mitre_techniques': [],
'cwe_codes': [],
'capec_codes': [],
'has_mappings': False
}
def get_stats(self) -> Dict:
"""Get statistics about the CVE2CAPEC dataset."""
total_cves = len(self.cve_mappings)
cves_with_techniques = len([cve for cve, data in self.cve_mappings.items()
if data.get('TECHNIQUES')])
cves_with_cwe = len([cve for cve, data in self.cve_mappings.items()
if data.get('CWE')])
cves_with_capec = len([cve for cve, data in self.cve_mappings.items()
if data.get('CAPEC')])
return {
'total_cves': total_cves,
'cves_with_mitre_techniques': cves_with_techniques,
'cves_with_cwe': cves_with_cwe,
'cves_with_capec': cves_with_capec,
'coverage_percentage': (cves_with_techniques / total_cves * 100) if total_cves > 0 else 0
}