447 lines
No EOL
18 KiB
Python
447 lines
No EOL
18 KiB
Python
"""
|
|
CVE2CAPEC client for retrieving MITRE ATT&CK technique mappings.
|
|
Integrates with the CVE2CAPEC repository: https://github.com/Galeax/CVE2CAPEC
|
|
"""
|
|
import json
|
|
import logging
|
|
import requests
|
|
from typing import Dict, List, Optional
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
import os
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class CVE2CAPECClient:
|
|
"""Client for accessing CVE to MITRE ATT&CK technique mappings."""
|
|
|
|
def __init__(self):
|
|
self.base_url = "https://raw.githubusercontent.com/Galeax/CVE2CAPEC/main"
|
|
self.cache_file = "/tmp/cve2capec_cache.json"
|
|
self.cache_expiry_hours = 24 # Cache for 24 hours
|
|
self.cve_mappings = {}
|
|
self.technique_names = {} # Map technique IDs to names
|
|
|
|
# Load cached data if available
|
|
self._load_cache()
|
|
|
|
# Load MITRE ATT&CK technique names
|
|
self._load_technique_names()
|
|
|
|
def _load_cache(self):
|
|
"""Load cached CVE mappings if they exist and are fresh."""
|
|
try:
|
|
if os.path.exists(self.cache_file):
|
|
with open(self.cache_file, 'r') as f:
|
|
cache_data = json.load(f)
|
|
|
|
# Check if cache is still fresh
|
|
cache_time = datetime.fromisoformat(cache_data.get('timestamp', '2000-01-01'))
|
|
if datetime.now() - cache_time < timedelta(hours=self.cache_expiry_hours):
|
|
self.cve_mappings = cache_data.get('mappings', {})
|
|
logger.info(f"Loaded {len(self.cve_mappings)} CVE mappings from cache")
|
|
return
|
|
|
|
# Cache is stale or doesn't exist, fetch fresh data
|
|
self._fetch_fresh_data()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading CVE2CAPEC cache: {e}")
|
|
self._fetch_fresh_data()
|
|
|
|
def _fetch_fresh_data(self):
|
|
"""Fetch fresh CVE mappings from the repository."""
|
|
try:
|
|
logger.info("Fetching fresh CVE2CAPEC data from all database files...")
|
|
|
|
# Define year range to fetch (focusing on recent years first for better performance)
|
|
# Start with recent years that are most likely to be relevant
|
|
years_to_fetch = list(range(2018, 2026)) # 2018-2025
|
|
|
|
all_mappings = {}
|
|
|
|
for year in years_to_fetch:
|
|
try:
|
|
url = f"{self.base_url}/database/CVE-{year}.jsonl"
|
|
logger.info(f"Fetching CVE mappings for year {year}...")
|
|
|
|
response = requests.get(url, timeout=30)
|
|
response.raise_for_status()
|
|
|
|
# Parse JSONL format
|
|
year_mappings = {}
|
|
for line in response.text.strip().split('\n'):
|
|
if line.strip():
|
|
try:
|
|
data = json.loads(line)
|
|
year_mappings.update(data)
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"Failed to parse line in {year} data: {e}")
|
|
continue
|
|
|
|
all_mappings.update(year_mappings)
|
|
logger.info(f"Loaded {len(year_mappings)} CVE mappings from {year}")
|
|
|
|
# Add a small delay to be respectful to the server
|
|
time.sleep(0.5)
|
|
|
|
except requests.RequestException as e:
|
|
logger.warning(f"Failed to fetch CVE-{year}.jsonl: {e}")
|
|
continue
|
|
except Exception as e:
|
|
logger.warning(f"Error processing CVE-{year}.jsonl: {e}")
|
|
continue
|
|
|
|
# Also try to fetch the new_cves.jsonl for the latest data
|
|
try:
|
|
logger.info("Fetching latest CVE mappings from new_cves.jsonl...")
|
|
url = f"{self.base_url}/results/new_cves.jsonl"
|
|
response = requests.get(url, timeout=30)
|
|
response.raise_for_status()
|
|
|
|
latest_mappings = {}
|
|
for line in response.text.strip().split('\n'):
|
|
if line.strip():
|
|
try:
|
|
data = json.loads(line)
|
|
latest_mappings.update(data)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
all_mappings.update(latest_mappings)
|
|
logger.info(f"Added {len(latest_mappings)} latest CVE mappings")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch new_cves.jsonl: {e}")
|
|
|
|
self.cve_mappings = all_mappings
|
|
|
|
# Save to cache
|
|
cache_data = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'mappings': all_mappings,
|
|
'years_fetched': years_to_fetch
|
|
}
|
|
|
|
with open(self.cache_file, 'w') as f:
|
|
json.dump(cache_data, f)
|
|
|
|
logger.info(f"Successfully fetched and cached {len(all_mappings)} total CVE mappings")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching CVE2CAPEC data: {e}")
|
|
# Continue with empty mappings if fetch fails
|
|
self.cve_mappings = {}
|
|
|
|
def _load_technique_names(self):
|
|
"""Load MITRE ATT&CK technique names for better rule descriptions."""
|
|
# Common MITRE ATT&CK techniques and their names
|
|
self.technique_names = {
|
|
# Initial Access
|
|
"1189": "Drive-by Compromise",
|
|
"1190": "Exploit Public-Facing Application",
|
|
"1133": "External Remote Services",
|
|
"1200": "Hardware Additions",
|
|
"1566": "Phishing",
|
|
"1091": "Replication Through Removable Media",
|
|
"1195": "Supply Chain Compromise",
|
|
"1199": "Trusted Relationship",
|
|
"1078": "Valid Accounts",
|
|
|
|
# Execution
|
|
"1059": "Command and Scripting Interpreter",
|
|
"1059.001": "PowerShell",
|
|
"1059.003": "Windows Command Shell",
|
|
"1059.005": "Visual Basic",
|
|
"1059.006": "Python",
|
|
"1203": "Exploitation for Client Execution",
|
|
"1559": "Inter-Process Communication",
|
|
"1106": "Execution through Module Load",
|
|
"1053": "Scheduled Task/Job",
|
|
"1129": "Shared Modules",
|
|
"1204": "User Execution",
|
|
"1047": "Windows Management Instrumentation",
|
|
|
|
# Persistence
|
|
"1098": "Account Manipulation",
|
|
"1197": "BITS Jobs",
|
|
"1547": "Boot or Logon Autostart Execution",
|
|
"1037": "Boot or Logon Initialization Scripts",
|
|
"1176": "Browser Extensions",
|
|
"1554": "Compromise Client Software Binary",
|
|
"1136": "Create Account",
|
|
"1543": "Create or Modify System Process",
|
|
"1546": "Event Triggered Execution",
|
|
"1133": "External Remote Services",
|
|
"1574": "Hijack Execution Flow",
|
|
"1525": "Implant Internal Image",
|
|
"1556": "Modify Authentication Process",
|
|
"1137": "Office Application Startup",
|
|
"1542": "Pre-OS Boot",
|
|
"1053": "Scheduled Task/Job",
|
|
"1505": "Server Software Component",
|
|
"1205": "Traffic Signaling",
|
|
"1078": "Valid Accounts",
|
|
|
|
# Privilege Escalation
|
|
"1548": "Abuse Elevation Control Mechanism",
|
|
"1134": "Access Token Manipulation",
|
|
"1547": "Boot or Logon Autostart Execution",
|
|
"1037": "Boot or Logon Initialization Scripts",
|
|
"1543": "Create or Modify System Process",
|
|
"1484": "Domain Policy Modification",
|
|
"1546": "Event Triggered Execution",
|
|
"1068": "Exploitation for Privilege Escalation",
|
|
"1574": "Hijack Execution Flow",
|
|
"1055": "Process Injection",
|
|
"1053": "Scheduled Task/Job",
|
|
"1078": "Valid Accounts",
|
|
|
|
# Defense Evasion
|
|
"1548": "Abuse Elevation Control Mechanism",
|
|
"1134": "Access Token Manipulation",
|
|
"1197": "BITS Jobs",
|
|
"1610": "Deploy Container",
|
|
"1140": "Deobfuscate/Decode Files or Information",
|
|
"1006": "Direct Volume Access",
|
|
"1484": "Domain Policy Modification",
|
|
"1480": "Execution Guardrails",
|
|
"1211": "Exploitation for Defense Evasion",
|
|
"1222": "File and Directory Permissions Modification",
|
|
"1564": "Hide Artifacts",
|
|
"1574": "Hijack Execution Flow",
|
|
"1562": "Impair Defenses",
|
|
"1070": "Indicator Removal on Host",
|
|
"1202": "Indirect Command Execution",
|
|
"1036": "Masquerading",
|
|
"1556": "Modify Authentication Process",
|
|
"1112": "Modify Registry",
|
|
"1207": "Rogue Domain Controller",
|
|
"1014": "Rootkit",
|
|
"1218": "Signed Binary Proxy Execution",
|
|
"1216": "Signed Script Proxy Execution",
|
|
"1553": "Subvert Trust Controls",
|
|
"1221": "Template Injection",
|
|
"1205": "Traffic Signaling",
|
|
"1535": "Unused/Unsupported Cloud Regions",
|
|
"1078": "Valid Accounts",
|
|
"1497": "Virtualization/Sandbox Evasion",
|
|
"1220": "XSL Script Processing",
|
|
|
|
# Credential Access
|
|
"1557": "Adversary-in-the-Middle",
|
|
"1110": "Brute Force",
|
|
"1555": "Credentials from Password Stores",
|
|
"1212": "Exploitation for Credential Access",
|
|
"1187": "Forced Authentication",
|
|
"1606": "Forge Web Credentials",
|
|
"1056": "Input Capture",
|
|
"1556": "Modify Authentication Process",
|
|
"1040": "Network Sniffing",
|
|
"1003": "OS Credential Dumping",
|
|
"1528": "Steal Application Access Token",
|
|
"1558": "Steal or Forge Kerberos Tickets",
|
|
"1111": "Two-Factor Authentication Interception",
|
|
"1552": "Unsecured Credentials",
|
|
|
|
# Discovery
|
|
"1087": "Account Discovery",
|
|
"1010": "Application Window Discovery",
|
|
"1217": "Browser Bookmark Discovery",
|
|
"1580": "Cloud Infrastructure Discovery",
|
|
"1538": "Cloud Service Dashboard",
|
|
"1526": "Cloud Service Discovery",
|
|
"1613": "Container and Resource Discovery",
|
|
"1482": "Domain Trust Discovery",
|
|
"1083": "File and Directory Discovery",
|
|
"1615": "Group Policy Discovery",
|
|
"1046": "Network Service Scanning",
|
|
"1135": "Network Share Discovery",
|
|
"1201": "Password Policy Discovery",
|
|
"1069": "Permission Groups Discovery",
|
|
"1057": "Process Discovery",
|
|
"1012": "Query Registry",
|
|
"1018": "Remote System Discovery",
|
|
"1518": "Software Discovery",
|
|
"1082": "System Information Discovery",
|
|
"1614": "System Location Discovery",
|
|
"1016": "System Network Configuration Discovery",
|
|
"1049": "System Network Connections Discovery",
|
|
"1033": "System Owner/User Discovery",
|
|
"1007": "System Service Discovery",
|
|
"1124": "System Time Discovery",
|
|
"1497": "Virtualization/Sandbox Evasion",
|
|
|
|
# Lateral Movement
|
|
"1210": "Exploitation of Remote Services",
|
|
"1534": "Internal Spearphishing",
|
|
"1570": "Lateral Tool Transfer",
|
|
"1021": "Remote Service Session Hijacking",
|
|
"1021.001": "RDP Hijacking",
|
|
"1021.002": "SSH Hijacking",
|
|
"1021.004": "Tty Shell Hijacking",
|
|
"1021.005": "VNC Hijacking",
|
|
"1080": "Taint Shared Content",
|
|
"1550": "Use Alternate Authentication Material",
|
|
|
|
# Collection
|
|
"1557": "Adversary-in-the-Middle",
|
|
"1560": "Archive Collected Data",
|
|
"1123": "Audio Capture",
|
|
"1119": "Automated Collection",
|
|
"1185": "Browser Session Hijacking",
|
|
"1115": "Clipboard Data",
|
|
"1530": "Data from Cloud Storage Object",
|
|
"1602": "Data from Configuration Repository",
|
|
"1213": "Data from Information Repositories",
|
|
"1005": "Data from Local System",
|
|
"1039": "Data from Network Shared Drive",
|
|
"1025": "Data from Removable Media",
|
|
"1074": "Data Staged",
|
|
"1114": "Email Collection",
|
|
"1056": "Input Capture",
|
|
"1113": "Screen Capture",
|
|
"1125": "Video Capture",
|
|
|
|
# Command and Control
|
|
"1071": "Application Layer Protocol",
|
|
"1092": "Communication Through Removable Media",
|
|
"1132": "Data Encoding",
|
|
"1001": "Data Obfuscation",
|
|
"1568": "Dynamic Resolution",
|
|
"1573": "Encrypted Channel",
|
|
"1008": "Fallback Channels",
|
|
"1105": "Ingress Tool Transfer",
|
|
"1104": "Multi-Stage Channels",
|
|
"1095": "Non-Application Layer Protocol",
|
|
"1571": "Non-Standard Port",
|
|
"1572": "Protocol Tunneling",
|
|
"1090": "Proxy",
|
|
"1219": "Remote Access Software",
|
|
"1102": "Web Service",
|
|
|
|
# Exfiltration
|
|
"1020": "Automated Exfiltration",
|
|
"1030": "Data Transfer Size Limits",
|
|
"1048": "Exfiltration Over Alternative Protocol",
|
|
"1041": "Exfiltration Over C2 Channel",
|
|
"1011": "Exfiltration Over Other Network Medium",
|
|
"1052": "Exfiltration Over Physical Medium",
|
|
"1567": "Exfiltration Over Web Service",
|
|
"1029": "Scheduled Transfer",
|
|
"1537": "Transfer Data to Cloud Account",
|
|
|
|
# Impact
|
|
"1531": "Account Access Removal",
|
|
"1485": "Data Destruction",
|
|
"1486": "Data Encrypted for Impact",
|
|
"1565": "Data Manipulation",
|
|
"1491": "Defacement",
|
|
"1561": "Disk Wipe",
|
|
"1499": "Endpoint Denial of Service",
|
|
"1495": "Firmware Corruption",
|
|
"1490": "Inhibit System Recovery",
|
|
"1498": "Network Denial of Service",
|
|
"1496": "Resource Hijacking",
|
|
"1489": "Service Stop",
|
|
"1529": "System Shutdown/Reboot"
|
|
}
|
|
|
|
def get_mitre_techniques_for_cve(self, cve_id: str) -> List[str]:
|
|
"""Get MITRE ATT&CK techniques for a given CVE ID."""
|
|
try:
|
|
cve_data = self.cve_mappings.get(cve_id, {})
|
|
techniques = cve_data.get('TECHNIQUES', [])
|
|
|
|
# Convert technique IDs to T-prefixed format
|
|
formatted_techniques = []
|
|
for tech in techniques:
|
|
if isinstance(tech, (int, str)):
|
|
formatted_techniques.append(f"T{tech}")
|
|
|
|
return formatted_techniques
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting MITRE techniques for {cve_id}: {e}")
|
|
return []
|
|
|
|
def get_technique_name(self, technique_id: str) -> str:
|
|
"""Get the name for a MITRE ATT&CK technique ID."""
|
|
# Remove T prefix if present
|
|
clean_id = technique_id.replace('T', '')
|
|
return self.technique_names.get(clean_id, f"Technique {technique_id}")
|
|
|
|
def get_cwe_for_cve(self, cve_id: str) -> List[str]:
|
|
"""Get CWE codes for a given CVE ID."""
|
|
try:
|
|
cve_data = self.cve_mappings.get(cve_id, {})
|
|
cwes = cve_data.get('CWE', [])
|
|
|
|
# Format CWE IDs
|
|
formatted_cwes = []
|
|
for cwe in cwes:
|
|
if isinstance(cwe, (int, str)):
|
|
formatted_cwes.append(f"CWE-{cwe}")
|
|
|
|
return formatted_cwes
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting CWEs for {cve_id}: {e}")
|
|
return []
|
|
|
|
def get_capec_for_cve(self, cve_id: str) -> List[str]:
|
|
"""Get CAPEC codes for a given CVE ID."""
|
|
try:
|
|
cve_data = self.cve_mappings.get(cve_id, {})
|
|
capecs = cve_data.get('CAPEC', [])
|
|
|
|
# Format CAPEC IDs
|
|
formatted_capecs = []
|
|
for capec in capecs:
|
|
if isinstance(capec, (int, str)):
|
|
formatted_capecs.append(f"CAPEC-{capec}")
|
|
|
|
return formatted_capecs
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting CAPECs for {cve_id}: {e}")
|
|
return []
|
|
|
|
def get_full_mapping_for_cve(self, cve_id: str) -> Dict:
|
|
"""Get complete CVE mapping including CWE, CAPEC, and MITRE techniques."""
|
|
try:
|
|
return {
|
|
'cve_id': cve_id,
|
|
'mitre_techniques': self.get_mitre_techniques_for_cve(cve_id),
|
|
'cwe_codes': self.get_cwe_for_cve(cve_id),
|
|
'capec_codes': self.get_capec_for_cve(cve_id),
|
|
'has_mappings': bool(self.cve_mappings.get(cve_id, {}))
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting full mapping for {cve_id}: {e}")
|
|
return {
|
|
'cve_id': cve_id,
|
|
'mitre_techniques': [],
|
|
'cwe_codes': [],
|
|
'capec_codes': [],
|
|
'has_mappings': False
|
|
}
|
|
|
|
def get_stats(self) -> Dict:
|
|
"""Get statistics about the CVE2CAPEC dataset."""
|
|
total_cves = len(self.cve_mappings)
|
|
cves_with_techniques = len([cve for cve, data in self.cve_mappings.items()
|
|
if data.get('TECHNIQUES')])
|
|
cves_with_cwe = len([cve for cve, data in self.cve_mappings.items()
|
|
if data.get('CWE')])
|
|
cves_with_capec = len([cve for cve, data in self.cve_mappings.items()
|
|
if data.get('CAPEC')])
|
|
|
|
return {
|
|
'total_cves': total_cves,
|
|
'cves_with_mitre_techniques': cves_with_techniques,
|
|
'cves_with_cwe': cves_with_cwe,
|
|
'cves_with_capec': cves_with_capec,
|
|
'coverage_percentage': (cves_with_techniques / total_cves * 100) if total_cves > 0 else 0
|
|
} |