""" Process Commands Commands for processing CVEs and generating SIGMA rules in the file-based system. """ import asyncio import json import os import sys from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Any, Tuple import click # Import the base command class from .base_command import BaseCommand # Import processing components from the existing backend sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'backend')) class ProcessCommands(BaseCommand): """Commands for processing CVEs and generating rules""" def __init__(self, config): super().__init__(config) self._initialize_processors() def _initialize_processors(self): """Initialize the processing components""" try: # Import core processing modules from nvd_bulk_processor import NVDBulkProcessor from nomi_sec_client import NomiSecClient from enhanced_sigma_generator import EnhancedSigmaGenerator from poc_analyzer import PoCAnalyzer from yaml_metadata_generator import YAMLMetadataGenerator # Create processors (will be initialized per operation due to session requirements) self.nvd_processor_class = NVDBulkProcessor self.nomi_sec_client_class = NomiSecClient self.sigma_generator_class = EnhancedSigmaGenerator self.poc_analyzer = PoCAnalyzer() self.yaml_generator_class = YAMLMetadataGenerator except ImportError as e: self.error(f"Could not import processing modules: {e}") self.error("Make sure you're running from the project root directory") sys.exit(1) async def process_year(self, year: int, methods: List[str], force: bool, batch_size: int): """Process all CVEs for a specific year""" self.info(f"Processing CVEs for year {year}") self.info(f"Methods: {', '.join(methods)}") self.info(f"Batch size: {batch_size}") if force: self.warning("Force mode enabled - will regenerate existing rules") try: # First, fetch/update CVE data for the year await self._fetch_cve_data_for_year(year, batch_size) # Get all CVEs for the year cves = self.get_all_cves(year) if not cves: self.warning(f"No CVEs found for year {year}") return self.info(f"Found {len(cves)} CVEs for {year}") # Process in batches processed = 0 failed = 0 for i in range(0, len(cves), batch_size): batch = cves[i:i+batch_size] for cve_id in batch: try: success = await self._process_single_cve(cve_id, methods, force) if success: processed += 1 else: failed += 1 if (processed + failed) % 10 == 0: self.info(f"Processed {processed + failed}/{len(cves)} CVEs...") except Exception as e: self.error(f"Error processing {cve_id}: {e}") failed += 1 # Small delay between batches await asyncio.sleep(1) self.success(f"Year {year} processing completed!") self.success(f"Processed: {processed}, Failed: {failed}") except Exception as e: self.error(f"Error processing year {year}: {e}") import traceback traceback.print_exc() async def process_cve(self, cve_id: str, methods: List[str], force: bool): """Process a specific CVE""" if not self.validate_cve_id(cve_id): self.error(f"Invalid CVE ID format: {cve_id}") return self.info(f"Processing CVE: {cve_id}") self.info(f"Methods: {', '.join(methods)}") try: # First ensure we have the CVE data year = int(cve_id.split('-')[1]) await self._fetch_specific_cve_data(cve_id, year) # Process the CVE success = await self._process_single_cve(cve_id, methods, force) if success: self.success(f"Successfully processed {cve_id}") else: self.error(f"Failed to process {cve_id}") except Exception as e: self.error(f"Error processing {cve_id}: {e}") import traceback traceback.print_exc() async def process_bulk(self, start_year: int, end_year: int, methods: List[str], batch_size: int): """Bulk process CVEs across multiple years""" self.info(f"Bulk processing CVEs from {start_year} to {end_year}") self.info(f"Methods: {', '.join(methods)}") total_processed = 0 total_failed = 0 for year in range(start_year, end_year + 1): try: self.info(f"\n--- Processing Year {year} ---") year_start_processed = total_processed await self.process_year(year, methods, False, batch_size) # Update totals (approximate, since process_year doesn't return counts) cves_in_year = len(self.get_all_cves(year)) total_processed += cves_in_year except Exception as e: self.error(f"Error processing year {year}: {e}") total_failed += 1 self.success(f"\nBulk processing completed!") self.success(f"Years processed: {end_year - start_year + 1}") self.success(f"Approximate CVEs processed: {total_processed}") async def process_incremental(self, days: int, methods: List[str]): """Process recently modified CVEs""" self.info(f"Processing CVEs modified in the last {days} days") cutoff_date = datetime.utcnow() - timedelta(days=days) self.info(f"Cutoff date: {cutoff_date.isoformat()}") # Find CVEs modified since cutoff date recent_cves = [] for cve_id in self.get_all_cves(): metadata = self.load_cve_metadata(cve_id) if metadata and 'cve_info' in metadata: modified_date_str = metadata['cve_info'].get('modified_date') if modified_date_str: try: modified_date = datetime.fromisoformat(modified_date_str.replace('Z', '+00:00')) if modified_date >= cutoff_date: recent_cves.append(cve_id) except (ValueError, TypeError): pass # Skip if date parsing fails if not recent_cves: self.warning("No recently modified CVEs found") return self.info(f"Found {len(recent_cves)} recently modified CVEs") processed = 0 failed = 0 for cve_id in recent_cves: try: success = await self._process_single_cve(cve_id, methods, False) if success: processed += 1 else: failed += 1 except Exception as e: self.error(f"Error processing {cve_id}: {e}") failed += 1 self.success(f"Incremental processing completed!") self.success(f"Processed: {processed}, Failed: {failed}") async def _fetch_cve_data_for_year(self, year: int, batch_size: int): """Fetch CVE data for a specific year from NVD""" self.info(f"Fetching CVE data for year {year}...") try: # Use the existing NVD bulk processor from database_models import SessionLocal # Import session factory db_session = SessionLocal() try: processor = self.nvd_processor_class(db_session) # Download and process NVD data for the year result = await processor.download_and_process_year(year) if result.get('success'): self.info(f"Successfully fetched {result.get('processed_cves', 0)} CVEs for {year}") # Convert database records to file structure await self._sync_database_to_files(db_session, year) else: self.warning(f"Issues fetching CVE data for {year}: {result.get('error', 'Unknown error')}") finally: db_session.close() except Exception as e: self.error(f"Error fetching CVE data for year {year}: {e}") async def _fetch_specific_cve_data(self, cve_id: str, year: int): """Fetch data for a specific CVE""" # Check if we already have metadata for this CVE existing_metadata = self.load_cve_metadata(cve_id) if existing_metadata: return # Already have the data # Fetch from NVD if not already present self.info(f"Fetching data for {cve_id}...") try: from database_models import SessionLocal db_session = SessionLocal() try: processor = self.nvd_processor_class(db_session) # Fetch single CVE data result = await processor.fetch_single_cve(cve_id) if result: # Convert to file structure await self._sync_single_cve_to_files(db_session, cve_id) self.info(f"Successfully fetched data for {cve_id}") else: self.warning(f"Could not fetch data for {cve_id}") finally: db_session.close() except Exception as e: self.error(f"Error fetching data for {cve_id}: {e}") async def _sync_database_to_files(self, db_session, year: int): """Sync database records to file structure for a specific year""" try: from database_models import CVE # Get all CVEs for the year from database year_pattern = f"CVE-{year}-%" cves = db_session.query(CVE).filter(CVE.cve_id.like(year_pattern)).all() for cve in cves: await self._convert_cve_to_file(cve) except Exception as e: self.error(f"Error syncing database to files for year {year}: {e}") async def _sync_single_cve_to_files(self, db_session, cve_id: str): """Sync a single CVE from database to file structure""" try: from database_models import CVE cve = db_session.query(CVE).filter(CVE.cve_id == cve_id).first() if cve: await self._convert_cve_to_file(cve) except Exception as e: self.error(f"Error syncing {cve_id} to files: {e}") async def _convert_cve_to_file(self, cve): """Convert a database CVE record to file structure""" try: # Create metadata structure metadata = { "cve_info": { "cve_id": cve.cve_id, "description": cve.description, "cvss_score": float(cve.cvss_score) if cve.cvss_score else None, "severity": cve.severity, "published_date": cve.published_date.isoformat() if cve.published_date else None, "modified_date": cve.modified_date.isoformat() if cve.modified_date else None, "affected_products": cve.affected_products or [], "reference_urls": cve.reference_urls or [] }, "poc_data": { "poc_count": getattr(cve, 'poc_count', 0), "poc_data": getattr(cve, 'poc_data', {}), }, "processing": { "data_source": getattr(cve, 'data_source', 'nvd_api'), "bulk_processed": getattr(cve, 'bulk_processed', False), "reference_sync_status": getattr(cve, 'reference_sync_status', 'pending') }, "file_manifest": [], "rule_generation": {}, "created_at": cve.created_at.isoformat() if cve.created_at else datetime.utcnow().isoformat(), "updated_at": datetime.utcnow().isoformat() } # Save metadata self.save_cve_metadata(cve.cve_id, metadata) except Exception as e: self.error(f"Error converting CVE {cve.cve_id} to file: {e}") async def _process_single_cve(self, cve_id: str, methods: List[str], force: bool) -> bool: """Process a single CVE with specified methods""" try: # Load CVE metadata metadata = self.load_cve_metadata(cve_id) if not metadata: self.error(f"No metadata found for {cve_id}") return False # Check if processing is needed existing_rules = self.list_cve_rules(cve_id) if existing_rules and not force: self.info(f"Rules already exist for {cve_id}, skipping (use --force to regenerate)") return True success = True # Process with each requested method for method in methods: if method == 'all': # Generate with all available methods await self._generate_template_rule(cve_id, metadata) await self._generate_llm_rule(cve_id, metadata, 'openai') await self._generate_llm_rule(cve_id, metadata, 'anthropic') await self._generate_hybrid_rule(cve_id, metadata) elif method == 'template': await self._generate_template_rule(cve_id, metadata) elif method == 'llm': await self._generate_llm_rule(cve_id, metadata) elif method == 'hybrid': await self._generate_hybrid_rule(cve_id, metadata) return success except Exception as e: self.error(f"Error processing {cve_id}: {e}") return False async def _generate_template_rule(self, cve_id: str, metadata: Dict) -> bool: """Generate template-based SIGMA rule""" try: from database_models import SessionLocal db_session = SessionLocal() try: generator = self.sigma_generator_class(db_session) # Create mock CVE object from metadata class MockCVE: def __init__(self, meta): cve_info = meta.get('cve_info', {}) self.cve_id = cve_info.get('cve_id') self.description = cve_info.get('description') self.severity = cve_info.get('severity') self.affected_products = cve_info.get('affected_products', []) self.poc_data = meta.get('poc_data', {}).get('poc_data', {}) mock_cve = MockCVE(metadata) # Generate rule using template method rule_content = await generator._generate_template_based_rule(mock_cve, None, None) if rule_content: self.save_sigma_rule(cve_id, "rule_template.sigma", rule_content) self.info(f"Generated template rule for {cve_id}") return True else: self.warning(f"Failed to generate template rule for {cve_id}") return False finally: db_session.close() except Exception as e: self.error(f"Error generating template rule for {cve_id}: {e}") return False async def _generate_llm_rule(self, cve_id: str, metadata: Dict, provider: str = 'openai') -> bool: """Generate LLM-based SIGMA rule""" try: from database_models import SessionLocal db_session = SessionLocal() try: generator = self.sigma_generator_class(db_session, llm_provider=provider) # Check if LLM is available if not generator.llm_client.is_available(): self.warning(f"LLM provider {provider} not available for {cve_id}") return False # Create mock CVE object class MockCVE: def __init__(self, meta): cve_info = meta.get('cve_info', {}) self.cve_id = cve_info.get('cve_id') self.description = cve_info.get('description', '') self.severity = cve_info.get('severity') self.affected_products = cve_info.get('affected_products', []) self.poc_data = meta.get('poc_data', {}).get('poc_data', {}) mock_cve = MockCVE(metadata) # Get PoC data for enhanced generation poc_data = metadata.get('poc_data', {}).get('poc_data', {}) best_poc = None poc_content = "" # Try to find best PoC content if poc_data and 'nomi_sec' in poc_data: nomi_pocs = poc_data['nomi_sec'] if nomi_pocs: best_poc = nomi_pocs[0] # Use first PoC poc_content = best_poc.get('content', '') # Generate LLM-enhanced rule rule_content = await generator.llm_client.generate_sigma_rule( cve_id=cve_id, poc_content=poc_content, cve_description=mock_cve.description ) if rule_content: filename = f"rule_llm_{provider}.sigma" self.save_sigma_rule(cve_id, filename, rule_content) self.info(f"Generated {provider} LLM rule for {cve_id}") return True else: self.warning(f"Failed to generate {provider} LLM rule for {cve_id}") return False finally: db_session.close() except Exception as e: self.error(f"Error generating {provider} LLM rule for {cve_id}: {e}") return False async def _generate_hybrid_rule(self, cve_id: str, metadata: Dict) -> bool: """Generate hybrid SIGMA rule (template + LLM enhancement)""" try: # First generate template-based rule template_success = await self._generate_template_rule(cve_id, metadata) if not template_success: return False # Then enhance with LLM if available llm_success = await self._generate_llm_rule(cve_id, metadata, 'openai') if llm_success: # Load both rules and create hybrid version template_rule = self.load_sigma_rule(cve_id, "rule_template.sigma") llm_rule = self.load_sigma_rule(cve_id, "rule_llm_openai.sigma") if template_rule and llm_rule: # Simple hybrid: use LLM rule but keep template metadata structure # This is a simplified approach - could be made more sophisticated hybrid_rule = llm_rule # For now, just use the LLM rule as hybrid self.save_sigma_rule(cve_id, "rule_hybrid.sigma", hybrid_rule) self.info(f"Generated hybrid rule for {cve_id}") return True # If LLM enhancement failed, template rule is still valid return template_success except Exception as e: self.error(f"Error generating hybrid rule for {cve_id}: {e}") return False