- Extract database models from monolithic main.py (2,373 lines) into organized modules - Implement service layer pattern with dedicated business logic classes - Split API endpoints into modular FastAPI routers by functionality - Add centralized configuration management with environment variable handling - Create proper separation of concerns across data, service, and presentation layers **Architecture Changes:** - models/: SQLAlchemy database models (CVE, SigmaRule, RuleTemplate, BulkProcessingJob) - config/: Centralized settings and database configuration - services/: Business logic (CVEService, SigmaRuleService, GitHubExploitAnalyzer) - routers/: Modular API endpoints (cves, sigma_rules, bulk_operations, llm_operations) - schemas/: Pydantic request/response models **Key Improvements:** - 95% reduction in main.py size (2,373 → 120 lines) - Updated 15+ backend files with proper import structure - Eliminated circular dependencies and tight coupling - Enhanced testability with isolated service components - Better code organization for team collaboration **Backward Compatibility:** - All API endpoints maintain same URLs and behavior - Zero breaking changes to existing functionality - Database schema unchanged - Environment variables preserved 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
268 lines
No EOL
12 KiB
Python
268 lines
No EOL
12 KiB
Python
import re
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import List, Optional
|
|
from sqlalchemy.orm import Session
|
|
|
|
import sys
|
|
import os
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
from models import CVE, SigmaRule, RuleTemplate
|
|
from config.settings import settings
|
|
|
|
|
|
class SigmaRuleService:
|
|
"""Service for managing SIGMA rule generation and operations"""
|
|
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
|
|
def generate_sigma_rule(self, cve: CVE, exploit_indicators: dict = None) -> Optional[SigmaRule]:
|
|
"""Generate SIGMA rule based on CVE data and optional exploit indicators"""
|
|
if not cve.description:
|
|
return None
|
|
|
|
# Analyze CVE to determine appropriate template
|
|
description_lower = cve.description.lower()
|
|
affected_products = [p.lower() for p in (cve.affected_products or [])]
|
|
|
|
template = self._select_template(description_lower, affected_products, exploit_indicators)
|
|
if not template:
|
|
return None
|
|
|
|
# Generate rule content
|
|
rule_content = self._populate_template(cve, template, exploit_indicators)
|
|
if not rule_content:
|
|
return None
|
|
|
|
# Determine detection type and confidence
|
|
detection_type = self._determine_detection_type(description_lower, exploit_indicators)
|
|
confidence_level = self._calculate_confidence(cve, bool(exploit_indicators))
|
|
|
|
sigma_rule = SigmaRule(
|
|
cve_id=cve.cve_id,
|
|
rule_name=f"CVE-{cve.cve_id.split('-')[1]}-{cve.cve_id.split('-')[2]} Detection",
|
|
rule_content=rule_content,
|
|
detection_type=detection_type,
|
|
log_source=template.template_name.lower().replace(" ", "_"),
|
|
confidence_level=confidence_level,
|
|
auto_generated=True,
|
|
exploit_based=bool(exploit_indicators)
|
|
)
|
|
|
|
if exploit_indicators:
|
|
sigma_rule.exploit_indicators = str(exploit_indicators)
|
|
|
|
self.db.add(sigma_rule)
|
|
return sigma_rule
|
|
|
|
def get_rules_by_cve(self, cve_id: str) -> List[SigmaRule]:
|
|
"""Get all SIGMA rules for a specific CVE"""
|
|
return self.db.query(SigmaRule).filter(SigmaRule.cve_id == cve_id).all()
|
|
|
|
def get_all_rules(self, limit: int = 100, offset: int = 0) -> List[SigmaRule]:
|
|
"""Get all SIGMA rules with pagination"""
|
|
return self.db.query(SigmaRule).offset(offset).limit(limit).all()
|
|
|
|
def get_rule_stats(self) -> dict:
|
|
"""Get SIGMA rule statistics"""
|
|
total_rules = self.db.query(SigmaRule).count()
|
|
exploit_based = self.db.query(SigmaRule).filter(SigmaRule.exploit_based == True).count()
|
|
high_confidence = self.db.query(SigmaRule).filter(SigmaRule.confidence_level == 'high').count()
|
|
|
|
return {
|
|
"total_rules": total_rules,
|
|
"exploit_based": exploit_based,
|
|
"high_confidence": high_confidence
|
|
}
|
|
|
|
def _select_template(self, description: str, affected_products: List[str], exploit_indicators: dict = None) -> Optional[RuleTemplate]:
|
|
"""Select appropriate SIGMA rule template based on CVE and exploit analysis"""
|
|
templates = self.db.query(RuleTemplate).all()
|
|
|
|
# If we have exploit indicators, use them to determine the best template
|
|
if exploit_indicators:
|
|
if exploit_indicators.get('powershell'):
|
|
powershell_template = next((t for t in templates if "PowerShell" in t.template_name), None)
|
|
if powershell_template:
|
|
return powershell_template
|
|
|
|
if exploit_indicators.get('network'):
|
|
network_template = next((t for t in templates if "Network Connection" in t.template_name), None)
|
|
if network_template:
|
|
return network_template
|
|
|
|
if exploit_indicators.get('files'):
|
|
file_template = next((t for t in templates if "File Modification" in t.template_name), None)
|
|
if file_template:
|
|
return file_template
|
|
|
|
if exploit_indicators.get('processes') or exploit_indicators.get('commands'):
|
|
process_template = next((t for t in templates if "Process Execution" in t.template_name), None)
|
|
if process_template:
|
|
return process_template
|
|
|
|
# Fallback to original logic
|
|
if any("windows" in p or "microsoft" in p for p in affected_products):
|
|
if "process" in description or "execution" in description:
|
|
return next((t for t in templates if "Process Execution" in t.template_name), None)
|
|
elif "network" in description or "remote" in description:
|
|
return next((t for t in templates if "Network Connection" in t.template_name), None)
|
|
elif "file" in description or "write" in description:
|
|
return next((t for t in templates if "File Modification" in t.template_name), None)
|
|
|
|
# Default to process execution template
|
|
return next((t for t in templates if "Process Execution" in t.template_name), None)
|
|
|
|
def _populate_template(self, cve: CVE, template: RuleTemplate, exploit_indicators: dict = None) -> str:
|
|
"""Populate template with CVE-specific data and exploit indicators"""
|
|
try:
|
|
# Use exploit indicators if available, otherwise extract from description
|
|
if exploit_indicators:
|
|
suspicious_processes = exploit_indicators.get('processes', []) + exploit_indicators.get('commands', [])
|
|
suspicious_ports = []
|
|
file_patterns = exploit_indicators.get('files', [])
|
|
|
|
# Extract ports from network indicators
|
|
for net_indicator in exploit_indicators.get('network', []):
|
|
if ':' in str(net_indicator):
|
|
try:
|
|
port = int(str(net_indicator).split(':')[-1])
|
|
suspicious_ports.append(port)
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
# Fallback to original extraction
|
|
suspicious_processes = self._extract_suspicious_indicators(cve.description, "process")
|
|
suspicious_ports = self._extract_suspicious_indicators(cve.description, "port")
|
|
file_patterns = self._extract_suspicious_indicators(cve.description, "file")
|
|
|
|
# Determine severity level
|
|
level = "high" if cve.cvss_score and cve.cvss_score >= 7.0 else "medium"
|
|
|
|
# Create enhanced description
|
|
enhanced_description = cve.description[:200] + "..." if len(cve.description) > 200 else cve.description
|
|
if exploit_indicators:
|
|
enhanced_description += " [Enhanced with GitHub exploit analysis]"
|
|
|
|
# Build tags
|
|
tags = [f"attack.{self._get_mitre_technique(cve.description, exploit_indicators)}", cve.cve_id.lower()]
|
|
if exploit_indicators:
|
|
tags.append("exploit.github")
|
|
|
|
rule_content = template.template_content.format(
|
|
title=f"CVE-{cve.cve_id} {'Exploit-Based ' if exploit_indicators else ''}Detection",
|
|
description=enhanced_description,
|
|
rule_id=str(uuid.uuid4()),
|
|
date=datetime.utcnow().strftime("%Y/%m/%d"),
|
|
cve_url=f"https://nvd.nist.gov/vuln/detail/{cve.cve_id}",
|
|
cve_id=cve.cve_id.lower(),
|
|
tags="\\n - ".join(tags),
|
|
suspicious_processes=suspicious_processes or ["suspicious.exe", "malware.exe"],
|
|
suspicious_ports=suspicious_ports or [4444, 8080, 9999],
|
|
file_patterns=file_patterns or ["temp", "malware", "exploit"],
|
|
level=level
|
|
)
|
|
|
|
return rule_content
|
|
|
|
except Exception as e:
|
|
print(f"Error populating template: {str(e)}")
|
|
return None
|
|
|
|
def _get_mitre_technique(self, description: str, exploit_indicators: dict = None) -> str:
|
|
"""Map CVE and exploit indicators to MITRE ATT&CK techniques"""
|
|
desc_lower = description.lower()
|
|
|
|
# Check exploit indicators first
|
|
if exploit_indicators:
|
|
if exploit_indicators.get('powershell'):
|
|
return "t1059.001" # PowerShell
|
|
elif exploit_indicators.get('commands'):
|
|
return "t1059.003" # Windows Command Shell
|
|
elif exploit_indicators.get('network'):
|
|
return "t1071.001" # Web Protocols
|
|
elif exploit_indicators.get('files'):
|
|
return "t1105" # Ingress Tool Transfer
|
|
elif exploit_indicators.get('processes'):
|
|
return "t1106" # Native API
|
|
|
|
# Fallback to description analysis
|
|
if "powershell" in desc_lower:
|
|
return "t1059.001"
|
|
elif "command" in desc_lower or "cmd" in desc_lower:
|
|
return "t1059.003"
|
|
elif "network" in desc_lower or "remote" in desc_lower:
|
|
return "t1071.001"
|
|
elif "file" in desc_lower or "upload" in desc_lower:
|
|
return "t1105"
|
|
elif "process" in desc_lower or "execution" in desc_lower:
|
|
return "t1106"
|
|
else:
|
|
return "execution" # Generic
|
|
|
|
def _extract_suspicious_indicators(self, description: str, indicator_type: str) -> List:
|
|
"""Extract suspicious indicators from CVE description"""
|
|
if indicator_type == "process":
|
|
# Look for executable names or process patterns
|
|
exe_pattern = re.findall(r'(\\w+\\.exe)', description, re.IGNORECASE)
|
|
return exe_pattern[:5] if exe_pattern else None
|
|
|
|
elif indicator_type == "port":
|
|
# Look for port numbers
|
|
port_pattern = re.findall(r'port\\s+(\\d+)', description, re.IGNORECASE)
|
|
return [int(p) for p in port_pattern[:3]] if port_pattern else None
|
|
|
|
elif indicator_type == "file":
|
|
# Look for file extensions or paths
|
|
file_pattern = re.findall(r'(\\w+\\.\\w{3,4})', description, re.IGNORECASE)
|
|
return file_pattern[:5] if file_pattern else None
|
|
|
|
return None
|
|
|
|
def _determine_detection_type(self, description: str, exploit_indicators: dict = None) -> str:
|
|
"""Determine detection type based on CVE description and exploit indicators"""
|
|
if exploit_indicators:
|
|
if exploit_indicators.get('powershell'):
|
|
return "powershell"
|
|
elif exploit_indicators.get('network'):
|
|
return "network"
|
|
elif exploit_indicators.get('files'):
|
|
return "file"
|
|
elif exploit_indicators.get('processes') or exploit_indicators.get('commands'):
|
|
return "process"
|
|
|
|
# Fallback to original logic
|
|
if "remote" in description or "network" in description:
|
|
return "network"
|
|
elif "process" in description or "execution" in description:
|
|
return "process"
|
|
elif "file" in description or "filesystem" in description:
|
|
return "file"
|
|
else:
|
|
return "general"
|
|
|
|
def _calculate_confidence(self, cve: CVE, exploit_based: bool = False) -> str:
|
|
"""Calculate confidence level for the generated rule"""
|
|
base_confidence = 0
|
|
|
|
# CVSS score contributes to confidence
|
|
if cve.cvss_score:
|
|
if cve.cvss_score >= 9.0:
|
|
base_confidence += 3
|
|
elif cve.cvss_score >= 7.0:
|
|
base_confidence += 2
|
|
else:
|
|
base_confidence += 1
|
|
|
|
# Exploit-based rules get higher confidence
|
|
if exploit_based:
|
|
base_confidence += 2
|
|
|
|
# Map to confidence levels
|
|
if base_confidence >= 4:
|
|
return "high"
|
|
elif base_confidence >= 2:
|
|
return "medium"
|
|
else:
|
|
return "low" |