""" CVE2CAPEC client for retrieving MITRE ATT&CK technique mappings. Integrates with the CVE2CAPEC repository: https://github.com/Galeax/CVE2CAPEC """ import json import logging import requests from typing import Dict, List, Optional import time from datetime import datetime, timedelta import os logger = logging.getLogger(__name__) class CVE2CAPECClient: """Client for accessing CVE to MITRE ATT&CK technique mappings.""" def __init__(self, lazy_load: bool = True): self.base_url = "https://raw.githubusercontent.com/Galeax/CVE2CAPEC/main" self.cache_file = "/tmp/cve2capec_cache.json" self.cache_expiry_hours = 24 # Cache for 24 hours self.cve_mappings = {} self.technique_names = {} # Map technique IDs to names self._data_loaded = False # Load MITRE ATT&CK technique names (lightweight) self._load_technique_names() # Only load cached data if not lazy loading if not lazy_load: self._load_cache() def _load_cache(self): """Load cached CVE mappings if they exist and are fresh.""" try: if os.path.exists(self.cache_file): with open(self.cache_file, 'r') as f: cache_data = json.load(f) # Check if cache is still fresh cache_time = datetime.fromisoformat(cache_data.get('timestamp', '2000-01-01')) if datetime.now() - cache_time < timedelta(hours=self.cache_expiry_hours): self.cve_mappings = cache_data.get('mappings', {}) self._data_loaded = True logger.info(f"Loaded {len(self.cve_mappings)} CVE mappings from cache") return # Cache is stale or doesn't exist, fetch fresh data self._fetch_fresh_data() self._data_loaded = True except Exception as e: logger.error(f"Error loading CVE2CAPEC cache: {e}") self._fetch_fresh_data() self._data_loaded = True def _fetch_fresh_data(self): """Fetch fresh CVE mappings from the repository.""" try: logger.info("Fetching fresh CVE2CAPEC data from all database files...") # Define year range to fetch (focusing on recent years first for better performance) # Start with recent years that are most likely to be relevant years_to_fetch = list(range(2018, 2026)) # 2018-2025 all_mappings = {} for year in years_to_fetch: try: url = f"{self.base_url}/database/CVE-{year}.jsonl" logger.info(f"Fetching CVE mappings for year {year}...") response = requests.get(url, timeout=30) response.raise_for_status() # Parse JSONL format year_mappings = {} for line in response.text.strip().split('\n'): if line.strip(): try: data = json.loads(line) year_mappings.update(data) except json.JSONDecodeError as e: logger.warning(f"Failed to parse line in {year} data: {e}") continue all_mappings.update(year_mappings) logger.info(f"Loaded {len(year_mappings)} CVE mappings from {year}") # Add a small delay to be respectful to the server time.sleep(0.5) except requests.RequestException as e: logger.warning(f"Failed to fetch CVE-{year}.jsonl: {e}") continue except Exception as e: logger.warning(f"Error processing CVE-{year}.jsonl: {e}") continue # Also try to fetch the new_cves.jsonl for the latest data try: logger.info("Fetching latest CVE mappings from new_cves.jsonl...") url = f"{self.base_url}/results/new_cves.jsonl" response = requests.get(url, timeout=30) response.raise_for_status() latest_mappings = {} for line in response.text.strip().split('\n'): if line.strip(): try: data = json.loads(line) latest_mappings.update(data) except json.JSONDecodeError: continue all_mappings.update(latest_mappings) logger.info(f"Added {len(latest_mappings)} latest CVE mappings") except Exception as e: logger.warning(f"Failed to fetch new_cves.jsonl: {e}") self.cve_mappings = all_mappings # Save to cache cache_data = { 'timestamp': datetime.now().isoformat(), 'mappings': all_mappings, 'years_fetched': years_to_fetch } with open(self.cache_file, 'w') as f: json.dump(cache_data, f) logger.info(f"Successfully fetched and cached {len(all_mappings)} total CVE mappings") except Exception as e: logger.error(f"Error fetching CVE2CAPEC data: {e}") # Continue with empty mappings if fetch fails self.cve_mappings = {} def _ensure_data_loaded(self): """Ensure CVE mappings are loaded, loading from cache if needed.""" if not self._data_loaded: logger.info("CVE2CAPEC data not loaded, loading from cache...") self._load_cache() def _load_technique_names(self): """Load MITRE ATT&CK technique names for better rule descriptions.""" # Common MITRE ATT&CK techniques and their names self.technique_names = { # Initial Access "1189": "Drive-by Compromise", "1190": "Exploit Public-Facing Application", "1133": "External Remote Services", "1200": "Hardware Additions", "1566": "Phishing", "1091": "Replication Through Removable Media", "1195": "Supply Chain Compromise", "1199": "Trusted Relationship", "1078": "Valid Accounts", # Execution "1059": "Command and Scripting Interpreter", "1059.001": "PowerShell", "1059.003": "Windows Command Shell", "1059.005": "Visual Basic", "1059.006": "Python", "1203": "Exploitation for Client Execution", "1559": "Inter-Process Communication", "1106": "Execution through Module Load", "1053": "Scheduled Task/Job", "1129": "Shared Modules", "1204": "User Execution", "1047": "Windows Management Instrumentation", # Persistence "1098": "Account Manipulation", "1197": "BITS Jobs", "1547": "Boot or Logon Autostart Execution", "1037": "Boot or Logon Initialization Scripts", "1176": "Browser Extensions", "1554": "Compromise Client Software Binary", "1136": "Create Account", "1543": "Create or Modify System Process", "1546": "Event Triggered Execution", "1133": "External Remote Services", "1574": "Hijack Execution Flow", "1525": "Implant Internal Image", "1556": "Modify Authentication Process", "1137": "Office Application Startup", "1542": "Pre-OS Boot", "1053": "Scheduled Task/Job", "1505": "Server Software Component", "1205": "Traffic Signaling", "1078": "Valid Accounts", # Privilege Escalation "1548": "Abuse Elevation Control Mechanism", "1134": "Access Token Manipulation", "1547": "Boot or Logon Autostart Execution", "1037": "Boot or Logon Initialization Scripts", "1543": "Create or Modify System Process", "1484": "Domain Policy Modification", "1546": "Event Triggered Execution", "1068": "Exploitation for Privilege Escalation", "1574": "Hijack Execution Flow", "1055": "Process Injection", "1053": "Scheduled Task/Job", "1078": "Valid Accounts", # Defense Evasion "1548": "Abuse Elevation Control Mechanism", "1134": "Access Token Manipulation", "1197": "BITS Jobs", "1610": "Deploy Container", "1140": "Deobfuscate/Decode Files or Information", "1006": "Direct Volume Access", "1484": "Domain Policy Modification", "1480": "Execution Guardrails", "1211": "Exploitation for Defense Evasion", "1222": "File and Directory Permissions Modification", "1564": "Hide Artifacts", "1574": "Hijack Execution Flow", "1562": "Impair Defenses", "1070": "Indicator Removal on Host", "1202": "Indirect Command Execution", "1036": "Masquerading", "1556": "Modify Authentication Process", "1112": "Modify Registry", "1207": "Rogue Domain Controller", "1014": "Rootkit", "1218": "Signed Binary Proxy Execution", "1216": "Signed Script Proxy Execution", "1553": "Subvert Trust Controls", "1221": "Template Injection", "1205": "Traffic Signaling", "1535": "Unused/Unsupported Cloud Regions", "1078": "Valid Accounts", "1497": "Virtualization/Sandbox Evasion", "1220": "XSL Script Processing", # Credential Access "1557": "Adversary-in-the-Middle", "1110": "Brute Force", "1555": "Credentials from Password Stores", "1212": "Exploitation for Credential Access", "1187": "Forced Authentication", "1606": "Forge Web Credentials", "1056": "Input Capture", "1556": "Modify Authentication Process", "1040": "Network Sniffing", "1003": "OS Credential Dumping", "1528": "Steal Application Access Token", "1558": "Steal or Forge Kerberos Tickets", "1111": "Two-Factor Authentication Interception", "1552": "Unsecured Credentials", # Discovery "1087": "Account Discovery", "1010": "Application Window Discovery", "1217": "Browser Bookmark Discovery", "1580": "Cloud Infrastructure Discovery", "1538": "Cloud Service Dashboard", "1526": "Cloud Service Discovery", "1613": "Container and Resource Discovery", "1482": "Domain Trust Discovery", "1083": "File and Directory Discovery", "1615": "Group Policy Discovery", "1046": "Network Service Scanning", "1135": "Network Share Discovery", "1201": "Password Policy Discovery", "1069": "Permission Groups Discovery", "1057": "Process Discovery", "1012": "Query Registry", "1018": "Remote System Discovery", "1518": "Software Discovery", "1082": "System Information Discovery", "1614": "System Location Discovery", "1016": "System Network Configuration Discovery", "1049": "System Network Connections Discovery", "1033": "System Owner/User Discovery", "1007": "System Service Discovery", "1124": "System Time Discovery", "1497": "Virtualization/Sandbox Evasion", # Lateral Movement "1210": "Exploitation of Remote Services", "1534": "Internal Spearphishing", "1570": "Lateral Tool Transfer", "1021": "Remote Service Session Hijacking", "1021.001": "RDP Hijacking", "1021.002": "SSH Hijacking", "1021.004": "Tty Shell Hijacking", "1021.005": "VNC Hijacking", "1080": "Taint Shared Content", "1550": "Use Alternate Authentication Material", # Collection "1557": "Adversary-in-the-Middle", "1560": "Archive Collected Data", "1123": "Audio Capture", "1119": "Automated Collection", "1185": "Browser Session Hijacking", "1115": "Clipboard Data", "1530": "Data from Cloud Storage Object", "1602": "Data from Configuration Repository", "1213": "Data from Information Repositories", "1005": "Data from Local System", "1039": "Data from Network Shared Drive", "1025": "Data from Removable Media", "1074": "Data Staged", "1114": "Email Collection", "1056": "Input Capture", "1113": "Screen Capture", "1125": "Video Capture", # Command and Control "1071": "Application Layer Protocol", "1092": "Communication Through Removable Media", "1132": "Data Encoding", "1001": "Data Obfuscation", "1568": "Dynamic Resolution", "1573": "Encrypted Channel", "1008": "Fallback Channels", "1105": "Ingress Tool Transfer", "1104": "Multi-Stage Channels", "1095": "Non-Application Layer Protocol", "1571": "Non-Standard Port", "1572": "Protocol Tunneling", "1090": "Proxy", "1219": "Remote Access Software", "1102": "Web Service", # Exfiltration "1020": "Automated Exfiltration", "1030": "Data Transfer Size Limits", "1048": "Exfiltration Over Alternative Protocol", "1041": "Exfiltration Over C2 Channel", "1011": "Exfiltration Over Other Network Medium", "1052": "Exfiltration Over Physical Medium", "1567": "Exfiltration Over Web Service", "1029": "Scheduled Transfer", "1537": "Transfer Data to Cloud Account", # Impact "1531": "Account Access Removal", "1485": "Data Destruction", "1486": "Data Encrypted for Impact", "1565": "Data Manipulation", "1491": "Defacement", "1561": "Disk Wipe", "1499": "Endpoint Denial of Service", "1495": "Firmware Corruption", "1490": "Inhibit System Recovery", "1498": "Network Denial of Service", "1496": "Resource Hijacking", "1489": "Service Stop", "1529": "System Shutdown/Reboot" } def get_mitre_techniques_for_cve(self, cve_id: str) -> List[str]: """Get MITRE ATT&CK techniques for a given CVE ID.""" try: self._ensure_data_loaded() cve_data = self.cve_mappings.get(cve_id, {}) techniques = cve_data.get('TECHNIQUES', []) # Convert technique IDs to T-prefixed format formatted_techniques = [] for tech in techniques: if isinstance(tech, (int, str)): formatted_techniques.append(f"T{tech}") return formatted_techniques except Exception as e: logger.error(f"Error getting MITRE techniques for {cve_id}: {e}") return [] def get_technique_name(self, technique_id: str) -> str: """Get the name for a MITRE ATT&CK technique ID.""" # Remove T prefix if present clean_id = technique_id.replace('T', '') return self.technique_names.get(clean_id, f"Technique {technique_id}") def get_cwe_for_cve(self, cve_id: str) -> List[str]: """Get CWE codes for a given CVE ID.""" try: self._ensure_data_loaded() cve_data = self.cve_mappings.get(cve_id, {}) cwes = cve_data.get('CWE', []) # Format CWE IDs formatted_cwes = [] for cwe in cwes: if isinstance(cwe, (int, str)): formatted_cwes.append(f"CWE-{cwe}") return formatted_cwes except Exception as e: logger.error(f"Error getting CWEs for {cve_id}: {e}") return [] def get_capec_for_cve(self, cve_id: str) -> List[str]: """Get CAPEC codes for a given CVE ID.""" try: self._ensure_data_loaded() cve_data = self.cve_mappings.get(cve_id, {}) capecs = cve_data.get('CAPEC', []) # Format CAPEC IDs formatted_capecs = [] for capec in capecs: if isinstance(capec, (int, str)): formatted_capecs.append(f"CAPEC-{capec}") return formatted_capecs except Exception as e: logger.error(f"Error getting CAPECs for {cve_id}: {e}") return [] def get_full_mapping_for_cve(self, cve_id: str) -> Dict: """Get complete CVE mapping including CWE, CAPEC, and MITRE techniques.""" try: return { 'cve_id': cve_id, 'mitre_techniques': self.get_mitre_techniques_for_cve(cve_id), 'cwe_codes': self.get_cwe_for_cve(cve_id), 'capec_codes': self.get_capec_for_cve(cve_id), 'has_mappings': bool(self.cve_mappings.get(cve_id, {})) } except Exception as e: logger.error(f"Error getting full mapping for {cve_id}: {e}") return { 'cve_id': cve_id, 'mitre_techniques': [], 'cwe_codes': [], 'capec_codes': [], 'has_mappings': False } def get_stats(self) -> Dict: """Get statistics about the CVE2CAPEC dataset.""" self._ensure_data_loaded() total_cves = len(self.cve_mappings) cves_with_techniques = len([cve for cve, data in self.cve_mappings.items() if data.get('TECHNIQUES')]) cves_with_cwe = len([cve for cve, data in self.cve_mappings.items() if data.get('CWE')]) cves_with_capec = len([cve for cve, data in self.cve_mappings.items() if data.get('CAPEC')]) return { 'total_cves': total_cves, 'cves_with_mitre_techniques': cves_with_techniques, 'cves_with_cwe': cves_with_cwe, 'cves_with_capec': cves_with_capec, 'coverage_percentage': (cves_with_techniques / total_cves * 100) if total_cves > 0 else 0 }