This commit completes the transformation to a CLI-first SIGMA rule generator by removing all legacy web application components: REMOVED COMPONENTS: - Frontend React application (frontend/ directory) - Docker Compose web orchestration (docker-compose.yml, Dockerfiles) - FastAPI web backend (main.py, celery_config.py, bulk_seeder.py) - Web-specific task schedulers and executors - Initialization scripts for web deployment (start.sh, init.sql, Makefile) SIMPLIFIED ARCHITECTURE: - Created backend/database_models.py for migration-only database access - Updated CLI commands to use simplified database models - Retained core processing modules (sigma generator, PoC clients, NVD processor) - Fixed import paths in CLI migration and process commands The application now operates as a streamlined CLI tool with file-based SIGMA rule storage, eliminating web application complexity while maintaining all core CVE processing capabilities. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
499 lines
No EOL
21 KiB
Python
499 lines
No EOL
21 KiB
Python
"""
|
|
Process Commands
|
|
|
|
Commands for processing CVEs and generating SIGMA rules in the file-based system.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|
import click
|
|
|
|
# Import the base command class
|
|
from .base_command import BaseCommand
|
|
|
|
# Import processing components from the existing backend
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'backend'))
|
|
|
|
class ProcessCommands(BaseCommand):
|
|
"""Commands for processing CVEs and generating rules"""
|
|
|
|
def __init__(self, config):
|
|
super().__init__(config)
|
|
self._initialize_processors()
|
|
|
|
def _initialize_processors(self):
|
|
"""Initialize the processing components"""
|
|
try:
|
|
# Import core processing modules
|
|
from nvd_bulk_processor import NVDBulkProcessor
|
|
from nomi_sec_client import NomiSecClient
|
|
from enhanced_sigma_generator import EnhancedSigmaGenerator
|
|
from poc_analyzer import PoCAnalyzer
|
|
from yaml_metadata_generator import YAMLMetadataGenerator
|
|
|
|
# Create processors (will be initialized per operation due to session requirements)
|
|
self.nvd_processor_class = NVDBulkProcessor
|
|
self.nomi_sec_client_class = NomiSecClient
|
|
self.sigma_generator_class = EnhancedSigmaGenerator
|
|
self.poc_analyzer = PoCAnalyzer()
|
|
self.yaml_generator_class = YAMLMetadataGenerator
|
|
|
|
except ImportError as e:
|
|
self.error(f"Could not import processing modules: {e}")
|
|
self.error("Make sure you're running from the project root directory")
|
|
sys.exit(1)
|
|
|
|
async def process_year(self, year: int, methods: List[str], force: bool, batch_size: int):
|
|
"""Process all CVEs for a specific year"""
|
|
self.info(f"Processing CVEs for year {year}")
|
|
self.info(f"Methods: {', '.join(methods)}")
|
|
self.info(f"Batch size: {batch_size}")
|
|
|
|
if force:
|
|
self.warning("Force mode enabled - will regenerate existing rules")
|
|
|
|
try:
|
|
# First, fetch/update CVE data for the year
|
|
await self._fetch_cve_data_for_year(year, batch_size)
|
|
|
|
# Get all CVEs for the year
|
|
cves = self.get_all_cves(year)
|
|
if not cves:
|
|
self.warning(f"No CVEs found for year {year}")
|
|
return
|
|
|
|
self.info(f"Found {len(cves)} CVEs for {year}")
|
|
|
|
# Process in batches
|
|
processed = 0
|
|
failed = 0
|
|
|
|
for i in range(0, len(cves), batch_size):
|
|
batch = cves[i:i+batch_size]
|
|
|
|
for cve_id in batch:
|
|
try:
|
|
success = await self._process_single_cve(cve_id, methods, force)
|
|
if success:
|
|
processed += 1
|
|
else:
|
|
failed += 1
|
|
|
|
if (processed + failed) % 10 == 0:
|
|
self.info(f"Processed {processed + failed}/{len(cves)} CVEs...")
|
|
|
|
except Exception as e:
|
|
self.error(f"Error processing {cve_id}: {e}")
|
|
failed += 1
|
|
|
|
# Small delay between batches
|
|
await asyncio.sleep(1)
|
|
|
|
self.success(f"Year {year} processing completed!")
|
|
self.success(f"Processed: {processed}, Failed: {failed}")
|
|
|
|
except Exception as e:
|
|
self.error(f"Error processing year {year}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
async def process_cve(self, cve_id: str, methods: List[str], force: bool):
|
|
"""Process a specific CVE"""
|
|
if not self.validate_cve_id(cve_id):
|
|
self.error(f"Invalid CVE ID format: {cve_id}")
|
|
return
|
|
|
|
self.info(f"Processing CVE: {cve_id}")
|
|
self.info(f"Methods: {', '.join(methods)}")
|
|
|
|
try:
|
|
# First ensure we have the CVE data
|
|
year = int(cve_id.split('-')[1])
|
|
await self._fetch_specific_cve_data(cve_id, year)
|
|
|
|
# Process the CVE
|
|
success = await self._process_single_cve(cve_id, methods, force)
|
|
|
|
if success:
|
|
self.success(f"Successfully processed {cve_id}")
|
|
else:
|
|
self.error(f"Failed to process {cve_id}")
|
|
|
|
except Exception as e:
|
|
self.error(f"Error processing {cve_id}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
async def process_bulk(self, start_year: int, end_year: int, methods: List[str], batch_size: int):
|
|
"""Bulk process CVEs across multiple years"""
|
|
self.info(f"Bulk processing CVEs from {start_year} to {end_year}")
|
|
self.info(f"Methods: {', '.join(methods)}")
|
|
|
|
total_processed = 0
|
|
total_failed = 0
|
|
|
|
for year in range(start_year, end_year + 1):
|
|
try:
|
|
self.info(f"\n--- Processing Year {year} ---")
|
|
year_start_processed = total_processed
|
|
|
|
await self.process_year(year, methods, False, batch_size)
|
|
|
|
# Update totals (approximate, since process_year doesn't return counts)
|
|
cves_in_year = len(self.get_all_cves(year))
|
|
total_processed += cves_in_year
|
|
|
|
except Exception as e:
|
|
self.error(f"Error processing year {year}: {e}")
|
|
total_failed += 1
|
|
|
|
self.success(f"\nBulk processing completed!")
|
|
self.success(f"Years processed: {end_year - start_year + 1}")
|
|
self.success(f"Approximate CVEs processed: {total_processed}")
|
|
|
|
async def process_incremental(self, days: int, methods: List[str]):
|
|
"""Process recently modified CVEs"""
|
|
self.info(f"Processing CVEs modified in the last {days} days")
|
|
|
|
cutoff_date = datetime.utcnow() - timedelta(days=days)
|
|
self.info(f"Cutoff date: {cutoff_date.isoformat()}")
|
|
|
|
# Find CVEs modified since cutoff date
|
|
recent_cves = []
|
|
|
|
for cve_id in self.get_all_cves():
|
|
metadata = self.load_cve_metadata(cve_id)
|
|
if metadata and 'cve_info' in metadata:
|
|
modified_date_str = metadata['cve_info'].get('modified_date')
|
|
if modified_date_str:
|
|
try:
|
|
modified_date = datetime.fromisoformat(modified_date_str.replace('Z', '+00:00'))
|
|
if modified_date >= cutoff_date:
|
|
recent_cves.append(cve_id)
|
|
except (ValueError, TypeError):
|
|
pass # Skip if date parsing fails
|
|
|
|
if not recent_cves:
|
|
self.warning("No recently modified CVEs found")
|
|
return
|
|
|
|
self.info(f"Found {len(recent_cves)} recently modified CVEs")
|
|
|
|
processed = 0
|
|
failed = 0
|
|
|
|
for cve_id in recent_cves:
|
|
try:
|
|
success = await self._process_single_cve(cve_id, methods, False)
|
|
if success:
|
|
processed += 1
|
|
else:
|
|
failed += 1
|
|
|
|
except Exception as e:
|
|
self.error(f"Error processing {cve_id}: {e}")
|
|
failed += 1
|
|
|
|
self.success(f"Incremental processing completed!")
|
|
self.success(f"Processed: {processed}, Failed: {failed}")
|
|
|
|
async def _fetch_cve_data_for_year(self, year: int, batch_size: int):
|
|
"""Fetch CVE data for a specific year from NVD"""
|
|
self.info(f"Fetching CVE data for year {year}...")
|
|
|
|
try:
|
|
# Use the existing NVD bulk processor
|
|
from database_models import SessionLocal # Import session factory
|
|
db_session = SessionLocal()
|
|
|
|
try:
|
|
processor = self.nvd_processor_class(db_session)
|
|
|
|
# Download and process NVD data for the year
|
|
result = await processor.download_and_process_year(year)
|
|
|
|
if result.get('success'):
|
|
self.info(f"Successfully fetched {result.get('processed_cves', 0)} CVEs for {year}")
|
|
|
|
# Convert database records to file structure
|
|
await self._sync_database_to_files(db_session, year)
|
|
else:
|
|
self.warning(f"Issues fetching CVE data for {year}: {result.get('error', 'Unknown error')}")
|
|
|
|
finally:
|
|
db_session.close()
|
|
|
|
except Exception as e:
|
|
self.error(f"Error fetching CVE data for year {year}: {e}")
|
|
|
|
async def _fetch_specific_cve_data(self, cve_id: str, year: int):
|
|
"""Fetch data for a specific CVE"""
|
|
# Check if we already have metadata for this CVE
|
|
existing_metadata = self.load_cve_metadata(cve_id)
|
|
if existing_metadata:
|
|
return # Already have the data
|
|
|
|
# Fetch from NVD if not already present
|
|
self.info(f"Fetching data for {cve_id}...")
|
|
|
|
try:
|
|
from database_models import SessionLocal
|
|
db_session = SessionLocal()
|
|
|
|
try:
|
|
processor = self.nvd_processor_class(db_session)
|
|
|
|
# Fetch single CVE data
|
|
result = await processor.fetch_single_cve(cve_id)
|
|
|
|
if result:
|
|
# Convert to file structure
|
|
await self._sync_single_cve_to_files(db_session, cve_id)
|
|
self.info(f"Successfully fetched data for {cve_id}")
|
|
else:
|
|
self.warning(f"Could not fetch data for {cve_id}")
|
|
|
|
finally:
|
|
db_session.close()
|
|
|
|
except Exception as e:
|
|
self.error(f"Error fetching data for {cve_id}: {e}")
|
|
|
|
async def _sync_database_to_files(self, db_session, year: int):
|
|
"""Sync database records to file structure for a specific year"""
|
|
try:
|
|
from database_models import CVE
|
|
|
|
# Get all CVEs for the year from database
|
|
year_pattern = f"CVE-{year}-%"
|
|
cves = db_session.query(CVE).filter(CVE.cve_id.like(year_pattern)).all()
|
|
|
|
for cve in cves:
|
|
await self._convert_cve_to_file(cve)
|
|
|
|
except Exception as e:
|
|
self.error(f"Error syncing database to files for year {year}: {e}")
|
|
|
|
async def _sync_single_cve_to_files(self, db_session, cve_id: str):
|
|
"""Sync a single CVE from database to file structure"""
|
|
try:
|
|
from database_models import CVE
|
|
|
|
cve = db_session.query(CVE).filter(CVE.cve_id == cve_id).first()
|
|
if cve:
|
|
await self._convert_cve_to_file(cve)
|
|
|
|
except Exception as e:
|
|
self.error(f"Error syncing {cve_id} to files: {e}")
|
|
|
|
async def _convert_cve_to_file(self, cve):
|
|
"""Convert a database CVE record to file structure"""
|
|
try:
|
|
# Create metadata structure
|
|
metadata = {
|
|
"cve_info": {
|
|
"cve_id": cve.cve_id,
|
|
"description": cve.description,
|
|
"cvss_score": float(cve.cvss_score) if cve.cvss_score else None,
|
|
"severity": cve.severity,
|
|
"published_date": cve.published_date.isoformat() if cve.published_date else None,
|
|
"modified_date": cve.modified_date.isoformat() if cve.modified_date else None,
|
|
"affected_products": cve.affected_products or [],
|
|
"reference_urls": cve.reference_urls or []
|
|
},
|
|
"poc_data": {
|
|
"poc_count": getattr(cve, 'poc_count', 0),
|
|
"poc_data": getattr(cve, 'poc_data', {}),
|
|
},
|
|
"processing": {
|
|
"data_source": getattr(cve, 'data_source', 'nvd_api'),
|
|
"bulk_processed": getattr(cve, 'bulk_processed', False),
|
|
"reference_sync_status": getattr(cve, 'reference_sync_status', 'pending')
|
|
},
|
|
"file_manifest": [],
|
|
"rule_generation": {},
|
|
"created_at": cve.created_at.isoformat() if cve.created_at else datetime.utcnow().isoformat(),
|
|
"updated_at": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
# Save metadata
|
|
self.save_cve_metadata(cve.cve_id, metadata)
|
|
|
|
except Exception as e:
|
|
self.error(f"Error converting CVE {cve.cve_id} to file: {e}")
|
|
|
|
async def _process_single_cve(self, cve_id: str, methods: List[str], force: bool) -> bool:
|
|
"""Process a single CVE with specified methods"""
|
|
try:
|
|
# Load CVE metadata
|
|
metadata = self.load_cve_metadata(cve_id)
|
|
if not metadata:
|
|
self.error(f"No metadata found for {cve_id}")
|
|
return False
|
|
|
|
# Check if processing is needed
|
|
existing_rules = self.list_cve_rules(cve_id)
|
|
if existing_rules and not force:
|
|
self.info(f"Rules already exist for {cve_id}, skipping (use --force to regenerate)")
|
|
return True
|
|
|
|
success = True
|
|
|
|
# Process with each requested method
|
|
for method in methods:
|
|
if method == 'all':
|
|
# Generate with all available methods
|
|
await self._generate_template_rule(cve_id, metadata)
|
|
await self._generate_llm_rule(cve_id, metadata, 'openai')
|
|
await self._generate_llm_rule(cve_id, metadata, 'anthropic')
|
|
await self._generate_hybrid_rule(cve_id, metadata)
|
|
elif method == 'template':
|
|
await self._generate_template_rule(cve_id, metadata)
|
|
elif method == 'llm':
|
|
await self._generate_llm_rule(cve_id, metadata)
|
|
elif method == 'hybrid':
|
|
await self._generate_hybrid_rule(cve_id, metadata)
|
|
|
|
return success
|
|
|
|
except Exception as e:
|
|
self.error(f"Error processing {cve_id}: {e}")
|
|
return False
|
|
|
|
async def _generate_template_rule(self, cve_id: str, metadata: Dict) -> bool:
|
|
"""Generate template-based SIGMA rule"""
|
|
try:
|
|
from database_models import SessionLocal
|
|
|
|
db_session = SessionLocal()
|
|
try:
|
|
generator = self.sigma_generator_class(db_session)
|
|
|
|
# Create mock CVE object from metadata
|
|
class MockCVE:
|
|
def __init__(self, meta):
|
|
cve_info = meta.get('cve_info', {})
|
|
self.cve_id = cve_info.get('cve_id')
|
|
self.description = cve_info.get('description')
|
|
self.severity = cve_info.get('severity')
|
|
self.affected_products = cve_info.get('affected_products', [])
|
|
self.poc_data = meta.get('poc_data', {}).get('poc_data', {})
|
|
|
|
mock_cve = MockCVE(metadata)
|
|
|
|
# Generate rule using template method
|
|
rule_content = await generator._generate_template_based_rule(mock_cve, None, None)
|
|
|
|
if rule_content:
|
|
self.save_sigma_rule(cve_id, "rule_template.sigma", rule_content)
|
|
self.info(f"Generated template rule for {cve_id}")
|
|
return True
|
|
else:
|
|
self.warning(f"Failed to generate template rule for {cve_id}")
|
|
return False
|
|
|
|
finally:
|
|
db_session.close()
|
|
|
|
except Exception as e:
|
|
self.error(f"Error generating template rule for {cve_id}: {e}")
|
|
return False
|
|
|
|
async def _generate_llm_rule(self, cve_id: str, metadata: Dict, provider: str = 'openai') -> bool:
|
|
"""Generate LLM-based SIGMA rule"""
|
|
try:
|
|
from database_models import SessionLocal
|
|
|
|
db_session = SessionLocal()
|
|
try:
|
|
generator = self.sigma_generator_class(db_session, llm_provider=provider)
|
|
|
|
# Check if LLM is available
|
|
if not generator.llm_client.is_available():
|
|
self.warning(f"LLM provider {provider} not available for {cve_id}")
|
|
return False
|
|
|
|
# Create mock CVE object
|
|
class MockCVE:
|
|
def __init__(self, meta):
|
|
cve_info = meta.get('cve_info', {})
|
|
self.cve_id = cve_info.get('cve_id')
|
|
self.description = cve_info.get('description', '')
|
|
self.severity = cve_info.get('severity')
|
|
self.affected_products = cve_info.get('affected_products', [])
|
|
self.poc_data = meta.get('poc_data', {}).get('poc_data', {})
|
|
|
|
mock_cve = MockCVE(metadata)
|
|
|
|
# Get PoC data for enhanced generation
|
|
poc_data = metadata.get('poc_data', {}).get('poc_data', {})
|
|
best_poc = None
|
|
poc_content = ""
|
|
|
|
# Try to find best PoC content
|
|
if poc_data and 'nomi_sec' in poc_data:
|
|
nomi_pocs = poc_data['nomi_sec']
|
|
if nomi_pocs:
|
|
best_poc = nomi_pocs[0] # Use first PoC
|
|
poc_content = best_poc.get('content', '')
|
|
|
|
# Generate LLM-enhanced rule
|
|
rule_content = await generator.llm_client.generate_sigma_rule(
|
|
cve_id=cve_id,
|
|
poc_content=poc_content,
|
|
cve_description=mock_cve.description
|
|
)
|
|
|
|
if rule_content:
|
|
filename = f"rule_llm_{provider}.sigma"
|
|
self.save_sigma_rule(cve_id, filename, rule_content)
|
|
self.info(f"Generated {provider} LLM rule for {cve_id}")
|
|
return True
|
|
else:
|
|
self.warning(f"Failed to generate {provider} LLM rule for {cve_id}")
|
|
return False
|
|
|
|
finally:
|
|
db_session.close()
|
|
|
|
except Exception as e:
|
|
self.error(f"Error generating {provider} LLM rule for {cve_id}: {e}")
|
|
return False
|
|
|
|
async def _generate_hybrid_rule(self, cve_id: str, metadata: Dict) -> bool:
|
|
"""Generate hybrid SIGMA rule (template + LLM enhancement)"""
|
|
try:
|
|
# First generate template-based rule
|
|
template_success = await self._generate_template_rule(cve_id, metadata)
|
|
|
|
if not template_success:
|
|
return False
|
|
|
|
# Then enhance with LLM if available
|
|
llm_success = await self._generate_llm_rule(cve_id, metadata, 'openai')
|
|
|
|
if llm_success:
|
|
# Load both rules and create hybrid version
|
|
template_rule = self.load_sigma_rule(cve_id, "rule_template.sigma")
|
|
llm_rule = self.load_sigma_rule(cve_id, "rule_llm_openai.sigma")
|
|
|
|
if template_rule and llm_rule:
|
|
# Simple hybrid: use LLM rule but keep template metadata structure
|
|
# This is a simplified approach - could be made more sophisticated
|
|
hybrid_rule = llm_rule # For now, just use the LLM rule as hybrid
|
|
|
|
self.save_sigma_rule(cve_id, "rule_hybrid.sigma", hybrid_rule)
|
|
self.info(f"Generated hybrid rule for {cve_id}")
|
|
return True
|
|
|
|
# If LLM enhancement failed, template rule is still valid
|
|
return template_success
|
|
|
|
except Exception as e:
|
|
self.error(f"Error generating hybrid rule for {cve_id}: {e}")
|
|
return False |