559 lines
No EOL
22 KiB
Python
559 lines
No EOL
22 KiB
Python
"""
|
|
CISA Known Exploited Vulnerabilities (KEV) Integration Client
|
|
Interfaces with the CISA KEV catalog at https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json
|
|
"""
|
|
|
|
import aiohttp
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import text
|
|
import re
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class CISAKEVClient:
|
|
"""Client for interfacing with CISA Known Exploited Vulnerabilities catalog"""
|
|
|
|
def __init__(self, db_session: Session):
|
|
self.db_session = db_session
|
|
|
|
# CISA KEV URLs
|
|
self.kev_catalog_url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
|
|
self.kev_schema_url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities_schema.json"
|
|
|
|
# Cache for KEV data
|
|
self.kev_cache = None
|
|
self.cache_timestamp = None
|
|
self.cache_ttl = 3600 # 1 hour cache
|
|
|
|
# Rate limiting
|
|
self.rate_limit_delay = 1.0 # 1 second between requests
|
|
self.last_request_time = 0
|
|
|
|
async def _make_request(self, session: aiohttp.ClientSession, url: str) -> Optional[dict]:
|
|
"""Make a rate-limited request to CISA"""
|
|
try:
|
|
# Rate limiting
|
|
current_time = asyncio.get_event_loop().time()
|
|
time_since_last = current_time - self.last_request_time
|
|
if time_since_last < self.rate_limit_delay:
|
|
await asyncio.sleep(self.rate_limit_delay - time_since_last)
|
|
|
|
async with session.get(url, timeout=30) as response:
|
|
self.last_request_time = asyncio.get_event_loop().time()
|
|
|
|
if response.status == 200:
|
|
return await response.json()
|
|
else:
|
|
logger.warning(f"CISA KEV request failed: {response.status} for {url}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error making request to {url}: {e}")
|
|
return None
|
|
|
|
async def fetch_kev_catalog(self, force_refresh: bool = False) -> Optional[dict]:
|
|
"""Fetch the CISA KEV catalog with caching"""
|
|
# Check cache first
|
|
if not force_refresh and self.kev_cache and self.cache_timestamp:
|
|
cache_age = datetime.now().timestamp() - self.cache_timestamp
|
|
if cache_age < self.cache_ttl:
|
|
logger.info("Using cached CISA KEV data")
|
|
return self.kev_cache
|
|
|
|
logger.info("Fetching fresh CISA KEV catalog...")
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
data = await self._make_request(session, self.kev_catalog_url)
|
|
|
|
if data:
|
|
self.kev_cache = data
|
|
self.cache_timestamp = datetime.now().timestamp()
|
|
logger.info(f"Fetched CISA KEV catalog with {data.get('count', 0)} vulnerabilities")
|
|
return data
|
|
else:
|
|
logger.error("Failed to fetch CISA KEV catalog")
|
|
return None
|
|
|
|
def analyze_kev_vulnerability(self, vuln: dict) -> dict:
|
|
"""Analyze a CISA KEV vulnerability entry to extract threat intelligence"""
|
|
analysis = {
|
|
'cve_id': vuln.get('cveID'),
|
|
'vendor_project': vuln.get('vendorProject'),
|
|
'product': vuln.get('product'),
|
|
'vulnerability_name': vuln.get('vulnerabilityName'),
|
|
'date_added': vuln.get('dateAdded'),
|
|
'short_description': vuln.get('shortDescription'),
|
|
'required_action': vuln.get('requiredAction'),
|
|
'due_date': vuln.get('dueDate'),
|
|
'known_ransomware_use': vuln.get('knownRansomwareCampaignUse', 'Unknown'),
|
|
'notes': vuln.get('notes', ''),
|
|
'cwes': vuln.get('cwes', [])
|
|
}
|
|
|
|
# Calculate threat priority score based on various factors
|
|
threat_score = self._calculate_threat_score(vuln)
|
|
analysis['threat_score'] = threat_score
|
|
analysis['threat_level'] = self._get_threat_level(threat_score)
|
|
|
|
# Extract indicators from descriptions and required actions
|
|
indicators = self._extract_kev_indicators(vuln)
|
|
analysis['indicators'] = indicators
|
|
|
|
# Categorize the vulnerability type
|
|
analysis['vulnerability_category'] = self._categorize_vulnerability(vuln)
|
|
|
|
return analysis
|
|
|
|
def _calculate_threat_score(self, vuln: dict) -> int:
|
|
"""Calculate threat priority score (0-100) based on KEV data"""
|
|
score = 0
|
|
|
|
# Base score for being in CISA KEV
|
|
score += 50 # High base score since it's actively exploited
|
|
|
|
# Ransomware usage factor (0-25 points)
|
|
ransomware_use = vuln.get('knownRansomwareCampaignUse', 'Unknown').lower()
|
|
if ransomware_use == 'known':
|
|
score += 25
|
|
elif ransomware_use == 'unknown':
|
|
score += 10 # Still concerning but less certain
|
|
|
|
# Recency factor (0-15 points)
|
|
try:
|
|
date_added = datetime.strptime(vuln.get('dateAdded', ''), '%Y-%m-%d')
|
|
days_since_added = (datetime.now() - date_added).days
|
|
if days_since_added <= 30: # Very recent
|
|
score += 15
|
|
elif days_since_added <= 90: # Recent
|
|
score += 10
|
|
elif days_since_added <= 365: # This year
|
|
score += 5
|
|
except:
|
|
pass
|
|
|
|
# Urgency based on due date (0-10 points)
|
|
try:
|
|
due_date = datetime.strptime(vuln.get('dueDate', ''), '%Y-%m-%d')
|
|
days_to_due = (due_date - datetime.now()).days
|
|
if days_to_due <= 0: # Overdue
|
|
score += 10
|
|
elif days_to_due <= 7: # Due soon
|
|
score += 8
|
|
elif days_to_due <= 30: # Due this month
|
|
score += 5
|
|
except:
|
|
pass
|
|
|
|
return min(score, 100) # Cap at 100
|
|
|
|
def _get_threat_level(self, score: int) -> str:
|
|
"""Get threat level based on score"""
|
|
if score >= 85:
|
|
return 'critical'
|
|
elif score >= 70:
|
|
return 'high'
|
|
elif score >= 55:
|
|
return 'medium'
|
|
else:
|
|
return 'low'
|
|
|
|
def _extract_kev_indicators(self, vuln: dict) -> dict:
|
|
"""Extract threat indicators from CISA KEV vulnerability data"""
|
|
indicators = {
|
|
'attack_vectors': [],
|
|
'affected_products': [],
|
|
'required_actions': [],
|
|
'ransomware_indicators': [],
|
|
'vulnerability_types': [],
|
|
'mitigation_techniques': [],
|
|
'technical_details': [],
|
|
'impact_analysis': [],
|
|
'urgency_indicators': []
|
|
}
|
|
|
|
# Extract from vulnerability name and description
|
|
text_sources = [
|
|
vuln.get('vulnerabilityName', ''),
|
|
vuln.get('shortDescription', ''),
|
|
vuln.get('requiredAction', ''),
|
|
vuln.get('notes', '')
|
|
]
|
|
|
|
full_text = ' '.join(text_sources).lower()
|
|
|
|
# Attack vector patterns
|
|
attack_vector_patterns = [
|
|
r'\b(remote code execution|rce)\b',
|
|
r'\b(sql injection|sqli)\b',
|
|
r'\b(cross.?site scripting|xss)\b',
|
|
r'\b(buffer overflow)\b',
|
|
r'\b(privilege escalation)\b',
|
|
r'\b(authentication bypass)\b',
|
|
r'\b(directory traversal|path traversal)\b',
|
|
r'\b(file upload)\b',
|
|
r'\b(command injection)\b',
|
|
r'\b(deserialization)\b',
|
|
r'\b(memory corruption)\b',
|
|
r'\b(use.?after.?free)\b',
|
|
r'\b(heap overflow)\b',
|
|
r'\b(stack overflow)\b'
|
|
]
|
|
|
|
for pattern in attack_vector_patterns:
|
|
matches = re.findall(pattern, full_text, re.IGNORECASE)
|
|
indicators['attack_vectors'].extend(matches)
|
|
|
|
# Product and vendor information
|
|
vendor = vuln.get('vendorProject', '')
|
|
product = vuln.get('product', '')
|
|
if vendor and product:
|
|
indicators['affected_products'].append(f"{vendor} {product}")
|
|
|
|
# Required action keywords
|
|
required_action = vuln.get('requiredAction', '')
|
|
action_patterns = [
|
|
r'\b(patch|update|upgrade)\b',
|
|
r'\b(disable|remove|uninstall)\b',
|
|
r'\b(configure|reconfigure)\b',
|
|
r'\b(firewall|network segmentation)\b',
|
|
r'\b(monitor|logging)\b',
|
|
r'\b(backup|restore)\b'
|
|
]
|
|
|
|
for pattern in action_patterns:
|
|
matches = re.findall(pattern, required_action, re.IGNORECASE)
|
|
indicators['required_actions'].extend(matches)
|
|
|
|
# Ransomware indicators
|
|
if vuln.get('knownRansomwareCampaignUse') == 'Known':
|
|
indicators['ransomware_indicators'].append('known_ransomware_use')
|
|
|
|
# Vulnerability type classification
|
|
vuln_type_patterns = [
|
|
r'\b(zero.?day|0.?day)\b',
|
|
r'\b(denial.?of.?service|dos)\b',
|
|
r'\b(information disclosure)\b',
|
|
r'\b(data breach)\b',
|
|
r'\b(backdoor)\b',
|
|
r'\b(trojan)\b',
|
|
r'\b(webshell)\b'
|
|
]
|
|
|
|
for pattern in vuln_type_patterns:
|
|
matches = re.findall(pattern, full_text, re.IGNORECASE)
|
|
indicators['vulnerability_types'].extend(matches)
|
|
|
|
# Technical details extraction
|
|
technical_patterns = [
|
|
r'\b(port|service)\s+(\d+)\b',
|
|
r'\b(protocol)\s+(\w+)\b',
|
|
r'\b(version)\s+([\d\.]+)\b',
|
|
r'\b(cve-\d{4}-\d{4,7})\b',
|
|
r'\b(application|software|system)\s+(\w+)\b'
|
|
]
|
|
|
|
for pattern in technical_patterns:
|
|
matches = re.findall(pattern, full_text, re.IGNORECASE)
|
|
for match in matches:
|
|
if isinstance(match, tuple):
|
|
indicators['technical_details'].append(' '.join(match))
|
|
else:
|
|
indicators['technical_details'].append(match)
|
|
|
|
# Impact analysis
|
|
impact_keywords = [
|
|
'critical system', 'data exfiltration', 'system compromise',
|
|
'unauthorized access', 'privilege escalation', 'lateral movement',
|
|
'ransomware deployment', 'data encryption', 'service disruption'
|
|
]
|
|
|
|
for keyword in impact_keywords:
|
|
if keyword in full_text:
|
|
indicators['impact_analysis'].append(keyword)
|
|
|
|
# Urgency indicators
|
|
urgency_patterns = [
|
|
r'\b(immediate|urgent|critical|emergency)\b',
|
|
r'\b(actively exploited|in-the-wild|widespread)\b',
|
|
r'\b(patch.{0,10}available|fix.{0,10}available)\b',
|
|
r'\b(due.{0,10}date|deadline|must.{0,10}complete)\b'
|
|
]
|
|
|
|
for pattern in urgency_patterns:
|
|
matches = re.findall(pattern, full_text, re.IGNORECASE)
|
|
indicators['urgency_indicators'].extend(matches)
|
|
|
|
# Clean up and deduplicate
|
|
for key in indicators:
|
|
indicators[key] = list(set([item.strip() for item in indicators[key] if item and len(item.strip()) > 2]))
|
|
indicators[key] = indicators[key][:10] # Limit to 10 items per category
|
|
|
|
return indicators
|
|
|
|
def _categorize_vulnerability(self, vuln: dict) -> str:
|
|
"""Categorize the vulnerability based on CISA KEV data"""
|
|
vuln_name = vuln.get('vulnerabilityName', '').lower()
|
|
description = vuln.get('shortDescription', '').lower()
|
|
full_text = f"{vuln_name} {description}"
|
|
|
|
# Check for specific vulnerability categories
|
|
if re.search(r'\b(remote code execution|rce)\b', full_text):
|
|
return 'remote_code_execution'
|
|
elif re.search(r'\b(sql injection|sqli)\b', full_text):
|
|
return 'injection'
|
|
elif re.search(r'\b(cross.?site scripting|xss)\b', full_text):
|
|
return 'cross_site_scripting'
|
|
elif re.search(r'\b(privilege escalation)\b', full_text):
|
|
return 'privilege_escalation'
|
|
elif re.search(r'\b(authentication bypass)\b', full_text):
|
|
return 'authentication_bypass'
|
|
elif re.search(r'\b(buffer overflow|memory corruption)\b', full_text):
|
|
return 'memory_corruption'
|
|
elif re.search(r'\b(directory traversal|path traversal)\b', full_text):
|
|
return 'path_traversal'
|
|
elif re.search(r'\b(denial.?of.?service|dos)\b', full_text):
|
|
return 'denial_of_service'
|
|
elif re.search(r'\b(information disclosure)\b', full_text):
|
|
return 'information_disclosure'
|
|
else:
|
|
return 'other'
|
|
|
|
async def sync_cve_kev_data(self, cve_id: str) -> dict:
|
|
"""Synchronize CISA KEV data for a specific CVE"""
|
|
from main import CVE, SigmaRule
|
|
|
|
# Get existing CVE
|
|
cve = self.db_session.query(CVE).filter(CVE.cve_id == cve_id).first()
|
|
if not cve:
|
|
logger.warning(f"CVE {cve_id} not found in database")
|
|
return {"error": "CVE not found"}
|
|
|
|
# Fetch CISA KEV catalog
|
|
kev_catalog = await self.fetch_kev_catalog()
|
|
if not kev_catalog:
|
|
return {"error": "Failed to fetch CISA KEV catalog"}
|
|
|
|
# Search for the CVE in the KEV catalog
|
|
kev_entry = None
|
|
for vuln in kev_catalog.get('vulnerabilities', []):
|
|
if vuln.get('cveID') == cve_id:
|
|
kev_entry = vuln
|
|
break
|
|
|
|
if not kev_entry:
|
|
logger.info(f"CVE {cve_id} not found in CISA KEV catalog")
|
|
return {"cve_id": cve_id, "kev_found": False}
|
|
|
|
# Analyze the KEV entry
|
|
kev_analysis = self.analyze_kev_vulnerability(kev_entry)
|
|
|
|
# Update CVE with CISA KEV data
|
|
if not cve.poc_data:
|
|
cve.poc_data = {}
|
|
|
|
cve.poc_data['cisa_kev'] = {
|
|
'vulnerability_data': kev_analysis,
|
|
'catalog_version': kev_catalog.get('catalogVersion'),
|
|
'date_released': kev_catalog.get('dateReleased'),
|
|
'synced_at': datetime.utcnow().isoformat(),
|
|
'source': 'cisa_kev'
|
|
}
|
|
|
|
cve.updated_at = datetime.utcnow()
|
|
|
|
# Update SIGMA rule with CISA KEV data
|
|
sigma_rule = self.db_session.query(SigmaRule).filter(
|
|
SigmaRule.cve_id == cve_id
|
|
).first()
|
|
|
|
if sigma_rule:
|
|
# Update rule with CISA KEV data
|
|
if not sigma_rule.nomi_sec_data:
|
|
sigma_rule.nomi_sec_data = {}
|
|
|
|
sigma_rule.nomi_sec_data['cisa_kev'] = {
|
|
'threat_level': kev_analysis['threat_level'],
|
|
'threat_score': kev_analysis['threat_score'],
|
|
'known_ransomware_use': kev_analysis['known_ransomware_use'],
|
|
'vulnerability_category': kev_analysis['vulnerability_category'],
|
|
'due_date': kev_analysis['due_date'],
|
|
'indicators': kev_analysis['indicators'],
|
|
'source': 'cisa_kev'
|
|
}
|
|
|
|
# Update exploit indicators with CISA KEV data
|
|
existing_indicators = json.loads(sigma_rule.exploit_indicators) if sigma_rule.exploit_indicators else {}
|
|
kev_indicators = kev_analysis['indicators']
|
|
|
|
for key, values in kev_indicators.items():
|
|
if key not in existing_indicators:
|
|
existing_indicators[key] = []
|
|
existing_indicators[key].extend(values)
|
|
existing_indicators[key] = list(set(existing_indicators[key]))
|
|
|
|
sigma_rule.exploit_indicators = json.dumps(existing_indicators)
|
|
sigma_rule.updated_at = datetime.utcnow()
|
|
|
|
self.db_session.commit()
|
|
|
|
logger.info(f"Synchronized CISA KEV data for {cve_id} (Threat Level: {kev_analysis['threat_level']})")
|
|
|
|
return {
|
|
"cve_id": cve_id,
|
|
"kev_found": True,
|
|
"threat_level": kev_analysis['threat_level'],
|
|
"threat_score": kev_analysis['threat_score'],
|
|
"known_ransomware_use": kev_analysis['known_ransomware_use'],
|
|
"vulnerability_category": kev_analysis['vulnerability_category'],
|
|
"source": "cisa_kev"
|
|
}
|
|
|
|
async def bulk_sync_kev_data(self, batch_size: int = 100, cancellation_flag: Optional[callable] = None) -> dict:
|
|
"""Synchronize CISA KEV data for all matching CVEs"""
|
|
from main import CVE, BulkProcessingJob
|
|
|
|
# Create bulk processing job
|
|
job = BulkProcessingJob(
|
|
job_type='cisa_kev_sync',
|
|
status='running',
|
|
started_at=datetime.utcnow(),
|
|
job_metadata={'batch_size': batch_size}
|
|
)
|
|
self.db_session.add(job)
|
|
self.db_session.commit()
|
|
|
|
total_processed = 0
|
|
total_found = 0
|
|
results = []
|
|
|
|
try:
|
|
# Fetch CISA KEV catalog first
|
|
kev_catalog = await self.fetch_kev_catalog(force_refresh=True)
|
|
if not kev_catalog:
|
|
raise Exception("Failed to fetch CISA KEV catalog")
|
|
|
|
# Extract all CVE IDs from the KEV catalog
|
|
kev_cve_ids = [vuln.get('cveID') for vuln in kev_catalog.get('vulnerabilities', []) if vuln.get('cveID')]
|
|
|
|
# Get CVEs that exist in our database and match KEV catalog
|
|
cves = self.db_session.query(CVE).filter(CVE.cve_id.in_(kev_cve_ids)).all()
|
|
|
|
job.total_items = len(cves)
|
|
self.db_session.commit()
|
|
|
|
logger.info(f"Found {len(cves)} CVEs matching CISA KEV catalog")
|
|
|
|
# Process in batches
|
|
for i in range(0, len(cves), batch_size):
|
|
# Check for cancellation
|
|
if cancellation_flag and cancellation_flag():
|
|
logger.info("CISA KEV sync cancelled by user")
|
|
job.status = 'cancelled'
|
|
job.cancelled_at = datetime.utcnow()
|
|
job.error_message = "Job cancelled by user"
|
|
break
|
|
|
|
batch = cves[i:i + batch_size]
|
|
|
|
for cve in batch:
|
|
# Check for cancellation
|
|
if cancellation_flag and cancellation_flag():
|
|
logger.info("CISA KEV sync cancelled by user")
|
|
job.status = 'cancelled'
|
|
job.cancelled_at = datetime.utcnow()
|
|
job.error_message = "Job cancelled by user"
|
|
break
|
|
|
|
try:
|
|
result = await self.sync_cve_kev_data(cve.cve_id)
|
|
total_processed += 1
|
|
|
|
if result.get("kev_found", False):
|
|
total_found += 1
|
|
results.append(result)
|
|
|
|
job.processed_items += 1
|
|
|
|
# Small delay to be respectful to CISA
|
|
await asyncio.sleep(0.1)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error syncing CISA KEV for {cve.cve_id}: {e}")
|
|
job.failed_items += 1
|
|
|
|
# Break out of outer loop if cancelled
|
|
if job.status == 'cancelled':
|
|
break
|
|
|
|
# Commit after each batch
|
|
self.db_session.commit()
|
|
logger.info(f"Processed CISA KEV batch {i//batch_size + 1}/{(len(cves) + batch_size - 1)//batch_size}")
|
|
|
|
# Update job status
|
|
if job.status != 'cancelled':
|
|
job.status = 'completed'
|
|
job.completed_at = datetime.utcnow()
|
|
|
|
job.job_metadata.update({
|
|
'total_processed': total_processed,
|
|
'total_kev_found': total_found,
|
|
'cves_with_kev': len(results),
|
|
'catalog_version': kev_catalog.get('catalogVersion'),
|
|
'source': 'cisa_kev'
|
|
})
|
|
|
|
except Exception as e:
|
|
job.status = 'failed'
|
|
job.error_message = str(e)
|
|
job.completed_at = datetime.utcnow()
|
|
logger.error(f"Bulk CISA KEV sync job failed: {e}")
|
|
|
|
finally:
|
|
self.db_session.commit()
|
|
|
|
return {
|
|
'job_id': str(job.id),
|
|
'status': job.status,
|
|
'total_processed': total_processed,
|
|
'total_kev_found': total_found,
|
|
'cves_with_kev': len(results),
|
|
'source': 'cisa_kev'
|
|
}
|
|
|
|
async def get_kev_sync_status(self) -> dict:
|
|
"""Get CISA KEV synchronization status"""
|
|
from main import CVE
|
|
|
|
# Count CVEs with CISA KEV data
|
|
total_cves = self.db_session.query(CVE).count()
|
|
|
|
result = self.db_session.execute(
|
|
text("SELECT COUNT(*) FROM cves WHERE poc_data::text LIKE '%\"cisa_kev\"%'")
|
|
)
|
|
cves_with_kev = result.scalar()
|
|
|
|
# Get catalog information if available
|
|
catalog_info = {}
|
|
if self.kev_cache:
|
|
catalog_info = {
|
|
'catalog_version': self.kev_cache.get('catalogVersion'),
|
|
'date_released': self.kev_cache.get('dateReleased'),
|
|
'total_vulnerabilities': self.kev_cache.get('count', 0),
|
|
'cache_age_seconds': int(datetime.now().timestamp() - (self.cache_timestamp or 0))
|
|
}
|
|
|
|
return {
|
|
'total_cves': total_cves,
|
|
'cves_with_kev_data': cves_with_kev,
|
|
'kev_coverage': (cves_with_kev / total_cves * 100) if total_cves > 0 else 0,
|
|
'kev_sync_status': 'active' if cves_with_kev > 0 else 'pending',
|
|
'catalog_info': catalog_info,
|
|
'source': 'cisa_kev'
|
|
} |