auto_sigma_rule_generator/backend/reference_client.py

603 lines
No EOL
25 KiB
Python

"""
Reference Data Extraction Client
Extracts and analyzes text content from CVE references and KEV records
"""
import aiohttp
import asyncio
import json
import logging
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any
from urllib.parse import urlparse, urljoin
from sqlalchemy.orm import Session
from sqlalchemy import text, func
import hashlib
from bs4 import BeautifulSoup
import ssl
import certifi
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ReferenceClient:
"""Client for extracting and analyzing reference content from CVE and KEV records"""
def __init__(self, db_session: Session):
self.db_session = db_session
# Rate limiting
self.rate_limit_delay = 2.0 # 2 seconds between requests
self.last_request_time = 0
# Cache for processed URLs
self.url_cache = {}
self.cache_ttl = 86400 # 24 hours cache
# SSL context for secure requests
self.ssl_context = ssl.create_default_context(cafile=certifi.where())
# Allow self-signed certificates for some sites that might have issues
self.ssl_context.check_hostname = False
self.ssl_context.verify_mode = ssl.CERT_NONE
# Common headers to avoid being blocked
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none'
}
# Supported reference types
self.reference_types = {
'security_advisory': ['security', 'advisory', 'bulletin', 'alert', 'cve', 'vulnerability'],
'patch': ['patch', 'fix', 'update', 'hotfix', 'security-update'],
'exploit': ['exploit', 'poc', 'proof-of-concept', 'github.com', 'exploit-db'],
'technical_analysis': ['analysis', 'research', 'technical', 'writeup', 'blog'],
'vendor_advisory': ['microsoft', 'apple', 'oracle', 'cisco', 'vmware', 'adobe'],
'cve_database': ['cve.mitre.org', 'nvd.nist.gov', 'cve.org']
}
async def _make_request(self, session: aiohttp.ClientSession, url: str) -> Optional[Tuple[str, str]]:
"""Make a rate-limited request to fetch URL content"""
try:
# Rate limiting
current_time = asyncio.get_event_loop().time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.rate_limit_delay:
await asyncio.sleep(self.rate_limit_delay - time_since_last)
# Check cache first
url_hash = hashlib.md5(url.encode()).hexdigest()
if url_hash in self.url_cache:
cache_entry = self.url_cache[url_hash]
if datetime.now().timestamp() - cache_entry['timestamp'] < self.cache_ttl:
logger.info(f"Using cached content for {url}")
return cache_entry['content'], cache_entry['content_type']
async with session.get(url, headers=self.headers) as response:
self.last_request_time = asyncio.get_event_loop().time()
if response.status == 200:
content_type = response.headers.get('content-type', '').lower()
# Only process text content
if 'text/html' in content_type or 'text/plain' in content_type or 'application/json' in content_type:
try:
content = await response.text(encoding='utf-8', errors='ignore')
except UnicodeDecodeError:
# Fallback to binary content if text decode fails
content_bytes = await response.read()
content = content_bytes.decode('utf-8', errors='ignore')
# Cache the result
self.url_cache[url_hash] = {
'content': content,
'content_type': content_type,
'timestamp': datetime.now().timestamp()
}
return content, content_type
else:
logger.warning(f"Unsupported content type {content_type} for {url}")
return None, None
elif response.status in [301, 302, 303, 307, 308]:
logger.info(f"Redirect response {response.status} for {url}")
return None, None
else:
logger.warning(f"Request failed: {response.status} for {url}")
return None, None
except aiohttp.ClientError as e:
logger.warning(f"Client error fetching {url}: {e}")
return None, None
except asyncio.TimeoutError:
logger.warning(f"Timeout fetching {url}")
return None, None
except Exception as e:
logger.error(f"Unexpected error fetching {url}: {e}")
return None, None
def _extract_text_from_html(self, html_content: str) -> str:
"""Extract meaningful text from HTML content"""
try:
soup = BeautifulSoup(html_content, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
# Extract text from common content areas
content_selectors = [
'article', 'main', '.content', '#content',
'.post-content', '.entry-content', 'section'
]
text_content = ""
for selector in content_selectors:
elements = soup.select(selector)
if elements:
for element in elements:
text_content += element.get_text(separator=' ', strip=True) + '\n'
break
# If no structured content found, get all text
if not text_content.strip():
text_content = soup.get_text(separator=' ', strip=True)
# Clean up the text
text_content = re.sub(r'\s+', ' ', text_content)
text_content = text_content.strip()
return text_content
except Exception as e:
logger.error(f"Error extracting text from HTML: {e}")
return ""
def _analyze_reference_content(self, url: str, content: str) -> Dict[str, Any]:
"""Analyze reference content to extract security-relevant information"""
analysis = {
'url': url,
'content_length': len(content),
'reference_type': 'unknown',
'security_keywords': [],
'technical_indicators': [],
'patch_information': [],
'exploit_indicators': [],
'cve_mentions': [],
'severity_indicators': [],
'mitigation_steps': [],
'affected_products': [],
'attack_vectors': [],
'confidence_score': 0
}
if not content:
return analysis
content_lower = content.lower()
# Classify reference type
domain = urlparse(url).netloc.lower()
for ref_type, keywords in self.reference_types.items():
if any(keyword in domain or keyword in content_lower for keyword in keywords):
analysis['reference_type'] = ref_type
break
# Extract CVE mentions
cve_pattern = r'(CVE-\d{4}-\d{4,7})'
cve_matches = re.findall(cve_pattern, content, re.IGNORECASE)
analysis['cve_mentions'] = list(set(cve_matches))
# Security keywords
security_keywords = [
'vulnerability', 'exploit', 'attack', 'malware', 'backdoor',
'privilege escalation', 'remote code execution', 'rce',
'sql injection', 'xss', 'csrf', 'buffer overflow',
'authentication bypass', 'authorization', 'injection',
'denial of service', 'dos', 'ddos', 'ransomware'
]
for keyword in security_keywords:
if keyword in content_lower:
analysis['security_keywords'].append(keyword)
# Technical indicators
technical_patterns = [
r'\b(function|method|class|variable)\s+\w+',
r'\b(file|directory|path|folder)\s+[^\s]+',
r'\b(port|service|protocol)\s+\d+',
r'\b(registry|key|value)\s+[^\s]+',
r'\b(process|executable|binary)\s+[^\s]+',
r'\b(dll|exe|bat|ps1|sh|py|jar)\b',
r'\b(http|https|ftp|smb|tcp|udp)://[^\s]+',
r'\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b'
]
for pattern in technical_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
analysis['technical_indicators'].extend(matches[:10]) # Limit to 10 per pattern
# Patch information
patch_keywords = [
'patch', 'fix', 'update', 'hotfix', 'security update',
'kb\d+', 'ms\d+-\d+', 'version \d+\.\d+',
'download', 'install', 'upgrade'
]
for keyword in patch_keywords:
if re.search(keyword, content_lower):
analysis['patch_information'].append(keyword)
# Exploit indicators
exploit_keywords = [
'proof of concept', 'poc', 'exploit code', 'payload',
'shellcode', 'reverse shell', 'metasploit', 'nmap',
'vulnerability assessment', 'penetration test', 'bypass'
]
for keyword in exploit_keywords:
if keyword in content_lower:
analysis['exploit_indicators'].append(keyword)
# Severity indicators
severity_patterns = [
r'\b(critical|high|medium|low)\s+(severity|risk|priority)',
r'\b(cvss|score)\s*[:=]?\s*(\d+\.\d+|\d+)',
r'\b(exploitability|impact)\s*[:=]?\s*(high|medium|low)'
]
for pattern in severity_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
analysis['severity_indicators'].extend([' '.join(match) if isinstance(match, tuple) else match for match in matches])
# Mitigation steps
mitigation_keywords = [
'mitigation', 'workaround', 'prevention', 'remediation',
'disable', 'block', 'restrict', 'configure', 'setting'
]
# Find sentences containing mitigation keywords
sentences = re.split(r'[.!?]+', content)
for sentence in sentences:
if any(keyword in sentence.lower() for keyword in mitigation_keywords):
if len(sentence.strip()) > 20: # Avoid very short sentences
analysis['mitigation_steps'].append(sentence.strip()[:200]) # Limit length
# Calculate confidence score
score = 0
score += min(len(analysis['security_keywords']) * 5, 25)
score += min(len(analysis['technical_indicators']) * 2, 20)
score += min(len(analysis['cve_mentions']) * 10, 30)
score += min(len(analysis['patch_information']) * 3, 15)
score += min(len(analysis['exploit_indicators']) * 4, 20)
if analysis['reference_type'] != 'unknown':
score += 10
analysis['confidence_score'] = min(score, 100)
# Clean up and deduplicate
for key in ['security_keywords', 'technical_indicators', 'patch_information',
'exploit_indicators', 'severity_indicators', 'mitigation_steps']:
analysis[key] = list(set(analysis[key]))[:10] # Limit to 10 items each
return analysis
async def extract_reference_content(self, url: str) -> Optional[Dict[str, Any]]:
"""Extract and analyze content from a single reference URL"""
try:
connector = aiohttp.TCPConnector(
ssl=self.ssl_context,
limit=100,
limit_per_host=30,
ttl_dns_cache=300,
use_dns_cache=True,
)
timeout = aiohttp.ClientTimeout(total=60, connect=30)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
content, content_type = await self._make_request(session, url)
if not content:
return None
# Extract text from HTML if needed
if content_type and 'text/html' in content_type:
text_content = self._extract_text_from_html(content)
else:
text_content = content
# Analyze the content
analysis = self._analyze_reference_content(url, text_content)
# Add metadata
analysis.update({
'extracted_at': datetime.utcnow().isoformat(),
'content_type': content_type,
'text_length': len(text_content),
'source': 'reference_extraction'
})
return analysis
except Exception as e:
logger.error(f"Error extracting reference content from {url}: {e}")
return None
async def sync_cve_references(self, cve_id: str) -> Dict[str, Any]:
"""Sync reference data for a specific CVE"""
from main import CVE, SigmaRule
# Get existing CVE
cve = self.db_session.query(CVE).filter(CVE.cve_id == cve_id).first()
if not cve:
logger.warning(f"CVE {cve_id} not found in database")
return {"error": "CVE not found"}
if not cve.reference_urls:
logger.info(f"No reference URLs found for CVE {cve_id}")
return {"cve_id": cve_id, "references_processed": 0}
logger.info(f"Processing {len(cve.reference_urls)} references for CVE {cve_id}")
processed_references = []
successful_extractions = 0
for url in cve.reference_urls:
try:
# Extract reference content
ref_analysis = await self.extract_reference_content(url)
if ref_analysis:
processed_references.append(ref_analysis)
successful_extractions += 1
logger.info(f"Successfully extracted content from {url}")
else:
logger.warning(f"Failed to extract content from {url}")
# Small delay between requests
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Error processing reference {url}: {e}")
# Update CVE with reference data
cve.reference_data = {
'reference_analysis': {
'references': processed_references,
'total_references': len(cve.reference_urls),
'successful_extractions': successful_extractions,
'extraction_rate': successful_extractions / len(cve.reference_urls) if cve.reference_urls else 0,
'extracted_at': datetime.utcnow().isoformat(),
'source': 'reference_extraction'
}
}
cve.reference_sync_status = 'completed' if successful_extractions > 0 else 'failed'
cve.reference_last_synced = datetime.utcnow()
cve.updated_at = datetime.utcnow()
# Update SIGMA rule with reference data
sigma_rule = self.db_session.query(SigmaRule).filter(
SigmaRule.cve_id == cve_id
).first()
if sigma_rule:
# Aggregate indicators from all references
aggregated_indicators = {
'security_keywords': [],
'technical_indicators': [],
'exploit_indicators': [],
'patch_information': [],
'attack_vectors': [],
'mitigation_steps': []
}
for ref in processed_references:
for key in aggregated_indicators.keys():
if key in ref:
aggregated_indicators[key].extend(ref[key])
# Deduplicate
for key in aggregated_indicators:
aggregated_indicators[key] = list(set(aggregated_indicators[key]))
# Update rule with reference data
if not sigma_rule.nomi_sec_data:
sigma_rule.nomi_sec_data = {}
sigma_rule.nomi_sec_data['reference_analysis'] = {
'aggregated_indicators': aggregated_indicators,
'reference_count': len(processed_references),
'high_confidence_references': len([r for r in processed_references if r.get('confidence_score', 0) > 70]),
'reference_types': list(set([r.get('reference_type') for r in processed_references if r.get('reference_type') != 'unknown'])),
'source': 'reference_extraction'
}
# Update exploit indicators
if sigma_rule.exploit_indicators:
existing_indicators = json.loads(sigma_rule.exploit_indicators)
else:
existing_indicators = {}
for key, values in aggregated_indicators.items():
if key not in existing_indicators:
existing_indicators[key] = []
existing_indicators[key].extend(values)
existing_indicators[key] = list(set(existing_indicators[key]))
sigma_rule.exploit_indicators = json.dumps(existing_indicators)
sigma_rule.updated_at = datetime.utcnow()
self.db_session.commit()
logger.info(f"Successfully synchronized reference data for {cve_id}")
return {
"cve_id": cve_id,
"references_processed": len(processed_references),
"successful_extractions": successful_extractions,
"extraction_rate": successful_extractions / len(cve.reference_urls) if cve.reference_urls else 0,
"high_confidence_references": len([r for r in processed_references if r.get('confidence_score', 0) > 70]),
"source": "reference_extraction"
}
async def bulk_sync_references(self, batch_size: int = 50, max_cves: int = None,
force_resync: bool = False, cancellation_flag: Optional[callable] = None) -> Dict[str, Any]:
"""Bulk synchronize reference data for multiple CVEs"""
from main import CVE, BulkProcessingJob
# Create bulk processing job
job = BulkProcessingJob(
job_type='reference_sync',
status='running',
started_at=datetime.utcnow(),
job_metadata={'batch_size': batch_size, 'max_cves': max_cves}
)
self.db_session.add(job)
self.db_session.commit()
total_processed = 0
total_references = 0
successful_extractions = 0
try:
# Get CVEs that have reference URLs but no reference analysis
query = self.db_session.query(CVE).filter(
CVE.reference_urls.isnot(None),
func.array_length(CVE.reference_urls, 1) > 0
)
# Filter out CVEs that already have reference analysis (unless force_resync is True)
if not force_resync:
query = query.filter(
CVE.reference_sync_status != 'completed'
)
if max_cves:
cves = query.limit(max_cves).all()
else:
cves = query.all()
job.total_items = len(cves)
self.db_session.commit()
logger.info(f"Starting bulk reference sync for {len(cves)} CVEs")
# Process in batches
for i in range(0, len(cves), batch_size):
# Check for cancellation
if cancellation_flag and cancellation_flag():
logger.info("Bulk reference sync cancelled by user")
job.status = 'cancelled'
job.cancelled_at = datetime.utcnow()
job.error_message = "Job cancelled by user"
break
batch = cves[i:i + batch_size]
for cve in batch:
# Check for cancellation
if cancellation_flag and cancellation_flag():
logger.info("Bulk reference sync cancelled by user")
job.status = 'cancelled'
job.cancelled_at = datetime.utcnow()
job.error_message = "Job cancelled by user"
break
try:
result = await self.sync_cve_references(cve.cve_id)
if "error" not in result:
total_processed += 1
total_references += result.get("references_processed", 0)
successful_extractions += result.get("successful_extractions", 0)
else:
job.failed_items += 1
job.processed_items += 1
# Longer delay for reference extraction to be respectful
await asyncio.sleep(2)
except Exception as e:
logger.error(f"Error processing references for {cve.cve_id}: {e}")
job.failed_items += 1
# Break out of outer loop if cancelled
if job.status == 'cancelled':
break
# Commit after each batch
self.db_session.commit()
logger.info(f"Processed reference batch {i//batch_size + 1}/{(len(cves) + batch_size - 1)//batch_size}")
# Update job status
if job.status != 'cancelled':
job.status = 'completed'
job.completed_at = datetime.utcnow()
job.job_metadata.update({
'total_processed': total_processed,
'total_references': total_references,
'successful_extractions': successful_extractions,
'extraction_rate': successful_extractions / total_references if total_references > 0 else 0,
'source': 'reference_extraction'
})
except Exception as e:
job.status = 'failed'
job.error_message = str(e)
job.completed_at = datetime.utcnow()
logger.error(f"Bulk reference sync job failed: {e}")
finally:
self.db_session.commit()
return {
'job_id': str(job.id),
'status': job.status,
'total_processed': total_processed,
'total_references': total_references,
'successful_extractions': successful_extractions,
'extraction_rate': successful_extractions / total_references if total_references > 0 else 0,
'source': 'reference_extraction'
}
async def get_reference_sync_status(self) -> Dict[str, Any]:
"""Get reference synchronization status"""
from main import CVE
# Count CVEs with reference URLs
total_cves = self.db_session.query(CVE).count()
cves_with_refs = self.db_session.query(CVE).filter(
CVE.reference_urls.isnot(None),
func.array_length(CVE.reference_urls, 1) > 0
).count()
# Count CVEs with reference analysis
cves_with_analysis = self.db_session.query(CVE).filter(
CVE.reference_sync_status == 'completed'
).count()
return {
'total_cves': total_cves,
'cves_with_references': cves_with_refs,
'cves_with_analysis': cves_with_analysis,
'reference_coverage': (cves_with_refs / total_cves * 100) if total_cves > 0 else 0,
'analysis_coverage': (cves_with_analysis / cves_with_refs * 100) if cves_with_refs > 0 else 0,
'sync_status': 'active' if cves_with_analysis > 0 else 'pending',
'source': 'reference_extraction'
}