""" Generate Commands Commands for generating SIGMA rules for existing CVEs. """ import asyncio from typing import Dict, List, Optional from .base_command import BaseCommand from .process_commands import ProcessCommands class GenerateCommands(BaseCommand): """Commands for generating SIGMA rules""" def __init__(self, config): super().__init__(config) self.process_commands = ProcessCommands(config) async def generate_cve(self, cve_id: str, method: str, provider: Optional[str], model: Optional[str], force: bool): """Generate SIGMA rules for a specific CVE""" if not self.validate_cve_id(cve_id): self.error(f"Invalid CVE ID format: {cve_id}") return # Check if CVE exists metadata = self.load_cve_metadata(cve_id) if not metadata: self.error(f"CVE {cve_id} not found. Run 'sigma-cli process cve {cve_id}' first to fetch data.") return self.info(f"Generating rules for {cve_id} using method: {method}") if provider: self.info(f"Using LLM provider: {provider}") if model: self.info(f"Using model: {model}") # Use the process command logic methods = [method] if method != 'all' else ['template', 'llm', 'hybrid'] success = await self.process_commands._process_single_cve(cve_id, methods, force) if success: rules = self.list_cve_rules(cve_id) self.success(f"Generated {len(rules)} rules for {cve_id}") for rule in rules: self.info(f" - {rule}") else: self.error(f"Failed to generate rules for {cve_id}") async def regenerate_rules(self, year: Optional[int], method: str, filter_quality: Optional[str]): """Regenerate existing SIGMA rules""" self.info(f"Regenerating rules with method: {method}") if year: self.info(f"Filtering by year: {year}") if filter_quality: self.info(f"Filtering by quality: {filter_quality}") # Get CVEs to regenerate cves_to_process = [] if year: cves = self.get_all_cves(year) else: cves = self.get_all_cves() # Filter by quality if specified for cve_id in cves: if filter_quality: metadata = self.load_cve_metadata(cve_id) if metadata: poc_data = metadata.get('poc_data', {}) # Simple quality filter based on PoC count poc_count = poc_data.get('poc_count', 0) quality_meets_filter = False if filter_quality == 'excellent' and poc_count >= 5: quality_meets_filter = True elif filter_quality == 'good' and poc_count >= 3: quality_meets_filter = True elif filter_quality == 'fair' and poc_count >= 1: quality_meets_filter = True if quality_meets_filter: cves_to_process.append(cve_id) else: cves_to_process.append(cve_id) if not cves_to_process: self.warning("No CVEs found matching the criteria") return self.info(f"Will regenerate rules for {len(cves_to_process)} CVEs") # Regenerate rules methods = [method] if method != 'all' else ['template', 'llm', 'hybrid'] processed = 0 failed = 0 for cve_id in cves_to_process: try: success = await self.process_commands._process_single_cve(cve_id, methods, True) # Force=True if success: processed += 1 else: failed += 1 if (processed + failed) % 10 == 0: self.info(f"Regenerated {processed + failed}/{len(cves_to_process)} CVEs...") except Exception as e: self.error(f"Error regenerating {cve_id}: {e}") failed += 1 self.success(f"Regeneration completed!") self.success(f"Processed: {processed}, Failed: {failed}")