auto_sigma_rule_generator/cli/commands/process_commands.py
bpmcdevitt de30d4ce99 CLEANUP: Remove legacy web application components and streamline for CLI-first architecture
This commit completes the transformation to a CLI-first SIGMA rule generator by removing all legacy web application components:

REMOVED COMPONENTS:
- Frontend React application (frontend/ directory)
- Docker Compose web orchestration (docker-compose.yml, Dockerfiles)
- FastAPI web backend (main.py, celery_config.py, bulk_seeder.py)
- Web-specific task schedulers and executors
- Initialization scripts for web deployment (start.sh, init.sql, Makefile)

SIMPLIFIED ARCHITECTURE:
- Created backend/database_models.py for migration-only database access
- Updated CLI commands to use simplified database models
- Retained core processing modules (sigma generator, PoC clients, NVD processor)
- Fixed import paths in CLI migration and process commands

The application now operates as a streamlined CLI tool with file-based SIGMA rule storage,
eliminating web application complexity while maintaining all core CVE processing capabilities.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-21 13:24:38 -05:00

499 lines
No EOL
21 KiB
Python

"""
Process Commands
Commands for processing CVEs and generating SIGMA rules in the file-based system.
"""
import asyncio
import json
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
import click
# Import the base command class
from .base_command import BaseCommand
# Import processing components from the existing backend
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'backend'))
class ProcessCommands(BaseCommand):
"""Commands for processing CVEs and generating rules"""
def __init__(self, config):
super().__init__(config)
self._initialize_processors()
def _initialize_processors(self):
"""Initialize the processing components"""
try:
# Import core processing modules
from nvd_bulk_processor import NVDBulkProcessor
from nomi_sec_client import NomiSecClient
from enhanced_sigma_generator import EnhancedSigmaGenerator
from poc_analyzer import PoCAnalyzer
from yaml_metadata_generator import YAMLMetadataGenerator
# Create processors (will be initialized per operation due to session requirements)
self.nvd_processor_class = NVDBulkProcessor
self.nomi_sec_client_class = NomiSecClient
self.sigma_generator_class = EnhancedSigmaGenerator
self.poc_analyzer = PoCAnalyzer()
self.yaml_generator_class = YAMLMetadataGenerator
except ImportError as e:
self.error(f"Could not import processing modules: {e}")
self.error("Make sure you're running from the project root directory")
sys.exit(1)
async def process_year(self, year: int, methods: List[str], force: bool, batch_size: int):
"""Process all CVEs for a specific year"""
self.info(f"Processing CVEs for year {year}")
self.info(f"Methods: {', '.join(methods)}")
self.info(f"Batch size: {batch_size}")
if force:
self.warning("Force mode enabled - will regenerate existing rules")
try:
# First, fetch/update CVE data for the year
await self._fetch_cve_data_for_year(year, batch_size)
# Get all CVEs for the year
cves = self.get_all_cves(year)
if not cves:
self.warning(f"No CVEs found for year {year}")
return
self.info(f"Found {len(cves)} CVEs for {year}")
# Process in batches
processed = 0
failed = 0
for i in range(0, len(cves), batch_size):
batch = cves[i:i+batch_size]
for cve_id in batch:
try:
success = await self._process_single_cve(cve_id, methods, force)
if success:
processed += 1
else:
failed += 1
if (processed + failed) % 10 == 0:
self.info(f"Processed {processed + failed}/{len(cves)} CVEs...")
except Exception as e:
self.error(f"Error processing {cve_id}: {e}")
failed += 1
# Small delay between batches
await asyncio.sleep(1)
self.success(f"Year {year} processing completed!")
self.success(f"Processed: {processed}, Failed: {failed}")
except Exception as e:
self.error(f"Error processing year {year}: {e}")
import traceback
traceback.print_exc()
async def process_cve(self, cve_id: str, methods: List[str], force: bool):
"""Process a specific CVE"""
if not self.validate_cve_id(cve_id):
self.error(f"Invalid CVE ID format: {cve_id}")
return
self.info(f"Processing CVE: {cve_id}")
self.info(f"Methods: {', '.join(methods)}")
try:
# First ensure we have the CVE data
year = int(cve_id.split('-')[1])
await self._fetch_specific_cve_data(cve_id, year)
# Process the CVE
success = await self._process_single_cve(cve_id, methods, force)
if success:
self.success(f"Successfully processed {cve_id}")
else:
self.error(f"Failed to process {cve_id}")
except Exception as e:
self.error(f"Error processing {cve_id}: {e}")
import traceback
traceback.print_exc()
async def process_bulk(self, start_year: int, end_year: int, methods: List[str], batch_size: int):
"""Bulk process CVEs across multiple years"""
self.info(f"Bulk processing CVEs from {start_year} to {end_year}")
self.info(f"Methods: {', '.join(methods)}")
total_processed = 0
total_failed = 0
for year in range(start_year, end_year + 1):
try:
self.info(f"\n--- Processing Year {year} ---")
year_start_processed = total_processed
await self.process_year(year, methods, False, batch_size)
# Update totals (approximate, since process_year doesn't return counts)
cves_in_year = len(self.get_all_cves(year))
total_processed += cves_in_year
except Exception as e:
self.error(f"Error processing year {year}: {e}")
total_failed += 1
self.success(f"\nBulk processing completed!")
self.success(f"Years processed: {end_year - start_year + 1}")
self.success(f"Approximate CVEs processed: {total_processed}")
async def process_incremental(self, days: int, methods: List[str]):
"""Process recently modified CVEs"""
self.info(f"Processing CVEs modified in the last {days} days")
cutoff_date = datetime.utcnow() - timedelta(days=days)
self.info(f"Cutoff date: {cutoff_date.isoformat()}")
# Find CVEs modified since cutoff date
recent_cves = []
for cve_id in self.get_all_cves():
metadata = self.load_cve_metadata(cve_id)
if metadata and 'cve_info' in metadata:
modified_date_str = metadata['cve_info'].get('modified_date')
if modified_date_str:
try:
modified_date = datetime.fromisoformat(modified_date_str.replace('Z', '+00:00'))
if modified_date >= cutoff_date:
recent_cves.append(cve_id)
except (ValueError, TypeError):
pass # Skip if date parsing fails
if not recent_cves:
self.warning("No recently modified CVEs found")
return
self.info(f"Found {len(recent_cves)} recently modified CVEs")
processed = 0
failed = 0
for cve_id in recent_cves:
try:
success = await self._process_single_cve(cve_id, methods, False)
if success:
processed += 1
else:
failed += 1
except Exception as e:
self.error(f"Error processing {cve_id}: {e}")
failed += 1
self.success(f"Incremental processing completed!")
self.success(f"Processed: {processed}, Failed: {failed}")
async def _fetch_cve_data_for_year(self, year: int, batch_size: int):
"""Fetch CVE data for a specific year from NVD"""
self.info(f"Fetching CVE data for year {year}...")
try:
# Use the existing NVD bulk processor
from database_models import SessionLocal # Import session factory
db_session = SessionLocal()
try:
processor = self.nvd_processor_class(db_session)
# Download and process NVD data for the year
result = await processor.download_and_process_year(year)
if result.get('success'):
self.info(f"Successfully fetched {result.get('processed_cves', 0)} CVEs for {year}")
# Convert database records to file structure
await self._sync_database_to_files(db_session, year)
else:
self.warning(f"Issues fetching CVE data for {year}: {result.get('error', 'Unknown error')}")
finally:
db_session.close()
except Exception as e:
self.error(f"Error fetching CVE data for year {year}: {e}")
async def _fetch_specific_cve_data(self, cve_id: str, year: int):
"""Fetch data for a specific CVE"""
# Check if we already have metadata for this CVE
existing_metadata = self.load_cve_metadata(cve_id)
if existing_metadata:
return # Already have the data
# Fetch from NVD if not already present
self.info(f"Fetching data for {cve_id}...")
try:
from database_models import SessionLocal
db_session = SessionLocal()
try:
processor = self.nvd_processor_class(db_session)
# Fetch single CVE data
result = await processor.fetch_single_cve(cve_id)
if result:
# Convert to file structure
await self._sync_single_cve_to_files(db_session, cve_id)
self.info(f"Successfully fetched data for {cve_id}")
else:
self.warning(f"Could not fetch data for {cve_id}")
finally:
db_session.close()
except Exception as e:
self.error(f"Error fetching data for {cve_id}: {e}")
async def _sync_database_to_files(self, db_session, year: int):
"""Sync database records to file structure for a specific year"""
try:
from database_models import CVE
# Get all CVEs for the year from database
year_pattern = f"CVE-{year}-%"
cves = db_session.query(CVE).filter(CVE.cve_id.like(year_pattern)).all()
for cve in cves:
await self._convert_cve_to_file(cve)
except Exception as e:
self.error(f"Error syncing database to files for year {year}: {e}")
async def _sync_single_cve_to_files(self, db_session, cve_id: str):
"""Sync a single CVE from database to file structure"""
try:
from database_models import CVE
cve = db_session.query(CVE).filter(CVE.cve_id == cve_id).first()
if cve:
await self._convert_cve_to_file(cve)
except Exception as e:
self.error(f"Error syncing {cve_id} to files: {e}")
async def _convert_cve_to_file(self, cve):
"""Convert a database CVE record to file structure"""
try:
# Create metadata structure
metadata = {
"cve_info": {
"cve_id": cve.cve_id,
"description": cve.description,
"cvss_score": float(cve.cvss_score) if cve.cvss_score else None,
"severity": cve.severity,
"published_date": cve.published_date.isoformat() if cve.published_date else None,
"modified_date": cve.modified_date.isoformat() if cve.modified_date else None,
"affected_products": cve.affected_products or [],
"reference_urls": cve.reference_urls or []
},
"poc_data": {
"poc_count": getattr(cve, 'poc_count', 0),
"poc_data": getattr(cve, 'poc_data', {}),
},
"processing": {
"data_source": getattr(cve, 'data_source', 'nvd_api'),
"bulk_processed": getattr(cve, 'bulk_processed', False),
"reference_sync_status": getattr(cve, 'reference_sync_status', 'pending')
},
"file_manifest": [],
"rule_generation": {},
"created_at": cve.created_at.isoformat() if cve.created_at else datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat()
}
# Save metadata
self.save_cve_metadata(cve.cve_id, metadata)
except Exception as e:
self.error(f"Error converting CVE {cve.cve_id} to file: {e}")
async def _process_single_cve(self, cve_id: str, methods: List[str], force: bool) -> bool:
"""Process a single CVE with specified methods"""
try:
# Load CVE metadata
metadata = self.load_cve_metadata(cve_id)
if not metadata:
self.error(f"No metadata found for {cve_id}")
return False
# Check if processing is needed
existing_rules = self.list_cve_rules(cve_id)
if existing_rules and not force:
self.info(f"Rules already exist for {cve_id}, skipping (use --force to regenerate)")
return True
success = True
# Process with each requested method
for method in methods:
if method == 'all':
# Generate with all available methods
await self._generate_template_rule(cve_id, metadata)
await self._generate_llm_rule(cve_id, metadata, 'openai')
await self._generate_llm_rule(cve_id, metadata, 'anthropic')
await self._generate_hybrid_rule(cve_id, metadata)
elif method == 'template':
await self._generate_template_rule(cve_id, metadata)
elif method == 'llm':
await self._generate_llm_rule(cve_id, metadata)
elif method == 'hybrid':
await self._generate_hybrid_rule(cve_id, metadata)
return success
except Exception as e:
self.error(f"Error processing {cve_id}: {e}")
return False
async def _generate_template_rule(self, cve_id: str, metadata: Dict) -> bool:
"""Generate template-based SIGMA rule"""
try:
from database_models import SessionLocal
db_session = SessionLocal()
try:
generator = self.sigma_generator_class(db_session)
# Create mock CVE object from metadata
class MockCVE:
def __init__(self, meta):
cve_info = meta.get('cve_info', {})
self.cve_id = cve_info.get('cve_id')
self.description = cve_info.get('description')
self.severity = cve_info.get('severity')
self.affected_products = cve_info.get('affected_products', [])
self.poc_data = meta.get('poc_data', {}).get('poc_data', {})
mock_cve = MockCVE(metadata)
# Generate rule using template method
rule_content = await generator._generate_template_based_rule(mock_cve, None, None)
if rule_content:
self.save_sigma_rule(cve_id, "rule_template.sigma", rule_content)
self.info(f"Generated template rule for {cve_id}")
return True
else:
self.warning(f"Failed to generate template rule for {cve_id}")
return False
finally:
db_session.close()
except Exception as e:
self.error(f"Error generating template rule for {cve_id}: {e}")
return False
async def _generate_llm_rule(self, cve_id: str, metadata: Dict, provider: str = 'openai') -> bool:
"""Generate LLM-based SIGMA rule"""
try:
from database_models import SessionLocal
db_session = SessionLocal()
try:
generator = self.sigma_generator_class(db_session, llm_provider=provider)
# Check if LLM is available
if not generator.llm_client.is_available():
self.warning(f"LLM provider {provider} not available for {cve_id}")
return False
# Create mock CVE object
class MockCVE:
def __init__(self, meta):
cve_info = meta.get('cve_info', {})
self.cve_id = cve_info.get('cve_id')
self.description = cve_info.get('description', '')
self.severity = cve_info.get('severity')
self.affected_products = cve_info.get('affected_products', [])
self.poc_data = meta.get('poc_data', {}).get('poc_data', {})
mock_cve = MockCVE(metadata)
# Get PoC data for enhanced generation
poc_data = metadata.get('poc_data', {}).get('poc_data', {})
best_poc = None
poc_content = ""
# Try to find best PoC content
if poc_data and 'nomi_sec' in poc_data:
nomi_pocs = poc_data['nomi_sec']
if nomi_pocs:
best_poc = nomi_pocs[0] # Use first PoC
poc_content = best_poc.get('content', '')
# Generate LLM-enhanced rule
rule_content = await generator.llm_client.generate_sigma_rule(
cve_id=cve_id,
poc_content=poc_content,
cve_description=mock_cve.description
)
if rule_content:
filename = f"rule_llm_{provider}.sigma"
self.save_sigma_rule(cve_id, filename, rule_content)
self.info(f"Generated {provider} LLM rule for {cve_id}")
return True
else:
self.warning(f"Failed to generate {provider} LLM rule for {cve_id}")
return False
finally:
db_session.close()
except Exception as e:
self.error(f"Error generating {provider} LLM rule for {cve_id}: {e}")
return False
async def _generate_hybrid_rule(self, cve_id: str, metadata: Dict) -> bool:
"""Generate hybrid SIGMA rule (template + LLM enhancement)"""
try:
# First generate template-based rule
template_success = await self._generate_template_rule(cve_id, metadata)
if not template_success:
return False
# Then enhance with LLM if available
llm_success = await self._generate_llm_rule(cve_id, metadata, 'openai')
if llm_success:
# Load both rules and create hybrid version
template_rule = self.load_sigma_rule(cve_id, "rule_template.sigma")
llm_rule = self.load_sigma_rule(cve_id, "rule_llm_openai.sigma")
if template_rule and llm_rule:
# Simple hybrid: use LLM rule but keep template metadata structure
# This is a simplified approach - could be made more sophisticated
hybrid_rule = llm_rule # For now, just use the LLM rule as hybrid
self.save_sigma_rule(cve_id, "rule_hybrid.sigma", hybrid_rule)
self.info(f"Generated hybrid rule for {cve_id}")
return True
# If LLM enhancement failed, template rule is still valid
return template_success
except Exception as e:
self.error(f"Error generating hybrid rule for {cve_id}: {e}")
return False