auto_sigma_rule_generator/backend/cisa_kev_client.py

559 lines
No EOL
22 KiB
Python

"""
CISA Known Exploited Vulnerabilities (KEV) Integration Client
Interfaces with the CISA KEV catalog at https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json
"""
import aiohttp
import asyncio
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from sqlalchemy.orm import Session
from sqlalchemy import text
import re
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CISAKEVClient:
"""Client for interfacing with CISA Known Exploited Vulnerabilities catalog"""
def __init__(self, db_session: Session):
self.db_session = db_session
# CISA KEV URLs
self.kev_catalog_url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
self.kev_schema_url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities_schema.json"
# Cache for KEV data
self.kev_cache = None
self.cache_timestamp = None
self.cache_ttl = 3600 # 1 hour cache
# Rate limiting
self.rate_limit_delay = 1.0 # 1 second between requests
self.last_request_time = 0
async def _make_request(self, session: aiohttp.ClientSession, url: str) -> Optional[dict]:
"""Make a rate-limited request to CISA"""
try:
# Rate limiting
current_time = asyncio.get_event_loop().time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.rate_limit_delay:
await asyncio.sleep(self.rate_limit_delay - time_since_last)
async with session.get(url, timeout=30) as response:
self.last_request_time = asyncio.get_event_loop().time()
if response.status == 200:
return await response.json()
else:
logger.warning(f"CISA KEV request failed: {response.status} for {url}")
return None
except Exception as e:
logger.error(f"Error making request to {url}: {e}")
return None
async def fetch_kev_catalog(self, force_refresh: bool = False) -> Optional[dict]:
"""Fetch the CISA KEV catalog with caching"""
# Check cache first
if not force_refresh and self.kev_cache and self.cache_timestamp:
cache_age = datetime.now().timestamp() - self.cache_timestamp
if cache_age < self.cache_ttl:
logger.info("Using cached CISA KEV data")
return self.kev_cache
logger.info("Fetching fresh CISA KEV catalog...")
async with aiohttp.ClientSession() as session:
data = await self._make_request(session, self.kev_catalog_url)
if data:
self.kev_cache = data
self.cache_timestamp = datetime.now().timestamp()
logger.info(f"Fetched CISA KEV catalog with {data.get('count', 0)} vulnerabilities")
return data
else:
logger.error("Failed to fetch CISA KEV catalog")
return None
def analyze_kev_vulnerability(self, vuln: dict) -> dict:
"""Analyze a CISA KEV vulnerability entry to extract threat intelligence"""
analysis = {
'cve_id': vuln.get('cveID'),
'vendor_project': vuln.get('vendorProject'),
'product': vuln.get('product'),
'vulnerability_name': vuln.get('vulnerabilityName'),
'date_added': vuln.get('dateAdded'),
'short_description': vuln.get('shortDescription'),
'required_action': vuln.get('requiredAction'),
'due_date': vuln.get('dueDate'),
'known_ransomware_use': vuln.get('knownRansomwareCampaignUse', 'Unknown'),
'notes': vuln.get('notes', ''),
'cwes': vuln.get('cwes', [])
}
# Calculate threat priority score based on various factors
threat_score = self._calculate_threat_score(vuln)
analysis['threat_score'] = threat_score
analysis['threat_level'] = self._get_threat_level(threat_score)
# Extract indicators from descriptions and required actions
indicators = self._extract_kev_indicators(vuln)
analysis['indicators'] = indicators
# Categorize the vulnerability type
analysis['vulnerability_category'] = self._categorize_vulnerability(vuln)
return analysis
def _calculate_threat_score(self, vuln: dict) -> int:
"""Calculate threat priority score (0-100) based on KEV data"""
score = 0
# Base score for being in CISA KEV
score += 50 # High base score since it's actively exploited
# Ransomware usage factor (0-25 points)
ransomware_use = vuln.get('knownRansomwareCampaignUse', 'Unknown').lower()
if ransomware_use == 'known':
score += 25
elif ransomware_use == 'unknown':
score += 10 # Still concerning but less certain
# Recency factor (0-15 points)
try:
date_added = datetime.strptime(vuln.get('dateAdded', ''), '%Y-%m-%d')
days_since_added = (datetime.now() - date_added).days
if days_since_added <= 30: # Very recent
score += 15
elif days_since_added <= 90: # Recent
score += 10
elif days_since_added <= 365: # This year
score += 5
except:
pass
# Urgency based on due date (0-10 points)
try:
due_date = datetime.strptime(vuln.get('dueDate', ''), '%Y-%m-%d')
days_to_due = (due_date - datetime.now()).days
if days_to_due <= 0: # Overdue
score += 10
elif days_to_due <= 7: # Due soon
score += 8
elif days_to_due <= 30: # Due this month
score += 5
except:
pass
return min(score, 100) # Cap at 100
def _get_threat_level(self, score: int) -> str:
"""Get threat level based on score"""
if score >= 85:
return 'critical'
elif score >= 70:
return 'high'
elif score >= 55:
return 'medium'
else:
return 'low'
def _extract_kev_indicators(self, vuln: dict) -> dict:
"""Extract threat indicators from CISA KEV vulnerability data"""
indicators = {
'attack_vectors': [],
'affected_products': [],
'required_actions': [],
'ransomware_indicators': [],
'vulnerability_types': [],
'mitigation_techniques': [],
'technical_details': [],
'impact_analysis': [],
'urgency_indicators': []
}
# Extract from vulnerability name and description
text_sources = [
vuln.get('vulnerabilityName', ''),
vuln.get('shortDescription', ''),
vuln.get('requiredAction', ''),
vuln.get('notes', '')
]
full_text = ' '.join(text_sources).lower()
# Attack vector patterns
attack_vector_patterns = [
r'\b(remote code execution|rce)\b',
r'\b(sql injection|sqli)\b',
r'\b(cross.?site scripting|xss)\b',
r'\b(buffer overflow)\b',
r'\b(privilege escalation)\b',
r'\b(authentication bypass)\b',
r'\b(directory traversal|path traversal)\b',
r'\b(file upload)\b',
r'\b(command injection)\b',
r'\b(deserialization)\b',
r'\b(memory corruption)\b',
r'\b(use.?after.?free)\b',
r'\b(heap overflow)\b',
r'\b(stack overflow)\b'
]
for pattern in attack_vector_patterns:
matches = re.findall(pattern, full_text, re.IGNORECASE)
indicators['attack_vectors'].extend(matches)
# Product and vendor information
vendor = vuln.get('vendorProject', '')
product = vuln.get('product', '')
if vendor and product:
indicators['affected_products'].append(f"{vendor} {product}")
# Required action keywords
required_action = vuln.get('requiredAction', '')
action_patterns = [
r'\b(patch|update|upgrade)\b',
r'\b(disable|remove|uninstall)\b',
r'\b(configure|reconfigure)\b',
r'\b(firewall|network segmentation)\b',
r'\b(monitor|logging)\b',
r'\b(backup|restore)\b'
]
for pattern in action_patterns:
matches = re.findall(pattern, required_action, re.IGNORECASE)
indicators['required_actions'].extend(matches)
# Ransomware indicators
if vuln.get('knownRansomwareCampaignUse') == 'Known':
indicators['ransomware_indicators'].append('known_ransomware_use')
# Vulnerability type classification
vuln_type_patterns = [
r'\b(zero.?day|0.?day)\b',
r'\b(denial.?of.?service|dos)\b',
r'\b(information disclosure)\b',
r'\b(data breach)\b',
r'\b(backdoor)\b',
r'\b(trojan)\b',
r'\b(webshell)\b'
]
for pattern in vuln_type_patterns:
matches = re.findall(pattern, full_text, re.IGNORECASE)
indicators['vulnerability_types'].extend(matches)
# Technical details extraction
technical_patterns = [
r'\b(port|service)\s+(\d+)\b',
r'\b(protocol)\s+(\w+)\b',
r'\b(version)\s+([\d\.]+)\b',
r'\b(cve-\d{4}-\d{4,7})\b',
r'\b(application|software|system)\s+(\w+)\b'
]
for pattern in technical_patterns:
matches = re.findall(pattern, full_text, re.IGNORECASE)
for match in matches:
if isinstance(match, tuple):
indicators['technical_details'].append(' '.join(match))
else:
indicators['technical_details'].append(match)
# Impact analysis
impact_keywords = [
'critical system', 'data exfiltration', 'system compromise',
'unauthorized access', 'privilege escalation', 'lateral movement',
'ransomware deployment', 'data encryption', 'service disruption'
]
for keyword in impact_keywords:
if keyword in full_text:
indicators['impact_analysis'].append(keyword)
# Urgency indicators
urgency_patterns = [
r'\b(immediate|urgent|critical|emergency)\b',
r'\b(actively exploited|in-the-wild|widespread)\b',
r'\b(patch.{0,10}available|fix.{0,10}available)\b',
r'\b(due.{0,10}date|deadline|must.{0,10}complete)\b'
]
for pattern in urgency_patterns:
matches = re.findall(pattern, full_text, re.IGNORECASE)
indicators['urgency_indicators'].extend(matches)
# Clean up and deduplicate
for key in indicators:
indicators[key] = list(set([item.strip() for item in indicators[key] if item and len(item.strip()) > 2]))
indicators[key] = indicators[key][:10] # Limit to 10 items per category
return indicators
def _categorize_vulnerability(self, vuln: dict) -> str:
"""Categorize the vulnerability based on CISA KEV data"""
vuln_name = vuln.get('vulnerabilityName', '').lower()
description = vuln.get('shortDescription', '').lower()
full_text = f"{vuln_name} {description}"
# Check for specific vulnerability categories
if re.search(r'\b(remote code execution|rce)\b', full_text):
return 'remote_code_execution'
elif re.search(r'\b(sql injection|sqli)\b', full_text):
return 'injection'
elif re.search(r'\b(cross.?site scripting|xss)\b', full_text):
return 'cross_site_scripting'
elif re.search(r'\b(privilege escalation)\b', full_text):
return 'privilege_escalation'
elif re.search(r'\b(authentication bypass)\b', full_text):
return 'authentication_bypass'
elif re.search(r'\b(buffer overflow|memory corruption)\b', full_text):
return 'memory_corruption'
elif re.search(r'\b(directory traversal|path traversal)\b', full_text):
return 'path_traversal'
elif re.search(r'\b(denial.?of.?service|dos)\b', full_text):
return 'denial_of_service'
elif re.search(r'\b(information disclosure)\b', full_text):
return 'information_disclosure'
else:
return 'other'
async def sync_cve_kev_data(self, cve_id: str) -> dict:
"""Synchronize CISA KEV data for a specific CVE"""
from main import CVE, SigmaRule
# Get existing CVE
cve = self.db_session.query(CVE).filter(CVE.cve_id == cve_id).first()
if not cve:
logger.warning(f"CVE {cve_id} not found in database")
return {"error": "CVE not found"}
# Fetch CISA KEV catalog
kev_catalog = await self.fetch_kev_catalog()
if not kev_catalog:
return {"error": "Failed to fetch CISA KEV catalog"}
# Search for the CVE in the KEV catalog
kev_entry = None
for vuln in kev_catalog.get('vulnerabilities', []):
if vuln.get('cveID') == cve_id:
kev_entry = vuln
break
if not kev_entry:
logger.info(f"CVE {cve_id} not found in CISA KEV catalog")
return {"cve_id": cve_id, "kev_found": False}
# Analyze the KEV entry
kev_analysis = self.analyze_kev_vulnerability(kev_entry)
# Update CVE with CISA KEV data
if not cve.poc_data:
cve.poc_data = {}
cve.poc_data['cisa_kev'] = {
'vulnerability_data': kev_analysis,
'catalog_version': kev_catalog.get('catalogVersion'),
'date_released': kev_catalog.get('dateReleased'),
'synced_at': datetime.utcnow().isoformat(),
'source': 'cisa_kev'
}
cve.updated_at = datetime.utcnow()
# Update SIGMA rule with CISA KEV data
sigma_rule = self.db_session.query(SigmaRule).filter(
SigmaRule.cve_id == cve_id
).first()
if sigma_rule:
# Update rule with CISA KEV data
if not sigma_rule.nomi_sec_data:
sigma_rule.nomi_sec_data = {}
sigma_rule.nomi_sec_data['cisa_kev'] = {
'threat_level': kev_analysis['threat_level'],
'threat_score': kev_analysis['threat_score'],
'known_ransomware_use': kev_analysis['known_ransomware_use'],
'vulnerability_category': kev_analysis['vulnerability_category'],
'due_date': kev_analysis['due_date'],
'indicators': kev_analysis['indicators'],
'source': 'cisa_kev'
}
# Update exploit indicators with CISA KEV data
existing_indicators = json.loads(sigma_rule.exploit_indicators) if sigma_rule.exploit_indicators else {}
kev_indicators = kev_analysis['indicators']
for key, values in kev_indicators.items():
if key not in existing_indicators:
existing_indicators[key] = []
existing_indicators[key].extend(values)
existing_indicators[key] = list(set(existing_indicators[key]))
sigma_rule.exploit_indicators = json.dumps(existing_indicators)
sigma_rule.updated_at = datetime.utcnow()
self.db_session.commit()
logger.info(f"Synchronized CISA KEV data for {cve_id} (Threat Level: {kev_analysis['threat_level']})")
return {
"cve_id": cve_id,
"kev_found": True,
"threat_level": kev_analysis['threat_level'],
"threat_score": kev_analysis['threat_score'],
"known_ransomware_use": kev_analysis['known_ransomware_use'],
"vulnerability_category": kev_analysis['vulnerability_category'],
"source": "cisa_kev"
}
async def bulk_sync_kev_data(self, batch_size: int = 100, cancellation_flag: Optional[callable] = None) -> dict:
"""Synchronize CISA KEV data for all matching CVEs"""
from main import CVE, BulkProcessingJob
# Create bulk processing job
job = BulkProcessingJob(
job_type='cisa_kev_sync',
status='running',
started_at=datetime.utcnow(),
job_metadata={'batch_size': batch_size}
)
self.db_session.add(job)
self.db_session.commit()
total_processed = 0
total_found = 0
results = []
try:
# Fetch CISA KEV catalog first
kev_catalog = await self.fetch_kev_catalog(force_refresh=True)
if not kev_catalog:
raise Exception("Failed to fetch CISA KEV catalog")
# Extract all CVE IDs from the KEV catalog
kev_cve_ids = [vuln.get('cveID') for vuln in kev_catalog.get('vulnerabilities', []) if vuln.get('cveID')]
# Get CVEs that exist in our database and match KEV catalog
cves = self.db_session.query(CVE).filter(CVE.cve_id.in_(kev_cve_ids)).all()
job.total_items = len(cves)
self.db_session.commit()
logger.info(f"Found {len(cves)} CVEs matching CISA KEV catalog")
# Process in batches
for i in range(0, len(cves), batch_size):
# Check for cancellation
if cancellation_flag and cancellation_flag():
logger.info("CISA KEV sync cancelled by user")
job.status = 'cancelled'
job.cancelled_at = datetime.utcnow()
job.error_message = "Job cancelled by user"
break
batch = cves[i:i + batch_size]
for cve in batch:
# Check for cancellation
if cancellation_flag and cancellation_flag():
logger.info("CISA KEV sync cancelled by user")
job.status = 'cancelled'
job.cancelled_at = datetime.utcnow()
job.error_message = "Job cancelled by user"
break
try:
result = await self.sync_cve_kev_data(cve.cve_id)
total_processed += 1
if result.get("kev_found", False):
total_found += 1
results.append(result)
job.processed_items += 1
# Small delay to be respectful to CISA
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"Error syncing CISA KEV for {cve.cve_id}: {e}")
job.failed_items += 1
# Break out of outer loop if cancelled
if job.status == 'cancelled':
break
# Commit after each batch
self.db_session.commit()
logger.info(f"Processed CISA KEV batch {i//batch_size + 1}/{(len(cves) + batch_size - 1)//batch_size}")
# Update job status
if job.status != 'cancelled':
job.status = 'completed'
job.completed_at = datetime.utcnow()
job.job_metadata.update({
'total_processed': total_processed,
'total_kev_found': total_found,
'cves_with_kev': len(results),
'catalog_version': kev_catalog.get('catalogVersion'),
'source': 'cisa_kev'
})
except Exception as e:
job.status = 'failed'
job.error_message = str(e)
job.completed_at = datetime.utcnow()
logger.error(f"Bulk CISA KEV sync job failed: {e}")
finally:
self.db_session.commit()
return {
'job_id': str(job.id),
'status': job.status,
'total_processed': total_processed,
'total_kev_found': total_found,
'cves_with_kev': len(results),
'source': 'cisa_kev'
}
async def get_kev_sync_status(self) -> dict:
"""Get CISA KEV synchronization status"""
from main import CVE
# Count CVEs with CISA KEV data
total_cves = self.db_session.query(CVE).count()
result = self.db_session.execute(
text("SELECT COUNT(*) FROM cves WHERE poc_data::text LIKE '%\"cisa_kev\"%'")
)
cves_with_kev = result.scalar()
# Get catalog information if available
catalog_info = {}
if self.kev_cache:
catalog_info = {
'catalog_version': self.kev_cache.get('catalogVersion'),
'date_released': self.kev_cache.get('dateReleased'),
'total_vulnerabilities': self.kev_cache.get('count', 0),
'cache_age_seconds': int(datetime.now().timestamp() - (self.cache_timestamp or 0))
}
return {
'total_cves': total_cves,
'cves_with_kev_data': cves_with_kev,
'kev_coverage': (cves_with_kev / total_cves * 100) if total_cves > 0 else 0,
'kev_sync_status': 'active' if cves_with_kev > 0 else 'pending',
'catalog_info': catalog_info,
'source': 'cisa_kev'
}