""" Enhanced SIGMA Rule Generator Generates improved SIGMA rules using nomi-sec PoC data and traditional indicators """ import json import logging from datetime import datetime from typing import Dict, List, Optional, Tuple from sqlalchemy.orm import Session import re # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class EnhancedSigmaGenerator: """Enhanced SIGMA rule generator using nomi-sec PoC data""" def __init__(self, db_session: Session): self.db_session = db_session async def generate_enhanced_rule(self, cve) -> dict: """Generate enhanced SIGMA rule for a CVE using PoC data""" from main import SigmaRule, RuleTemplate try: # Get PoC data poc_data = cve.poc_data or [] # Find the best quality PoC best_poc = None if poc_data: best_poc = max(poc_data, key=lambda x: x.get('quality_analysis', {}).get('quality_score', 0)) # Select appropriate template based on PoC analysis template = await self._select_template(cve, best_poc) if not template: logger.warning(f"No suitable template found for {cve.cve_id}") return {'success': False, 'error': 'No suitable template'} # Generate rule content rule_content = await self._generate_rule_content(cve, template, poc_data) # Calculate confidence level confidence_level = self._calculate_confidence_level(cve, poc_data) # Store or update SIGMA rule existing_rule = self.db_session.query(SigmaRule).filter( SigmaRule.cve_id == cve.cve_id ).first() rule_data = { 'cve_id': cve.cve_id, 'rule_name': f"{cve.cve_id} Enhanced Detection", 'rule_content': rule_content, 'detection_type': template.template_name, 'log_source': self._extract_log_source(template.template_name), 'confidence_level': confidence_level, 'auto_generated': True, 'exploit_based': len(poc_data) > 0, 'poc_source': 'nomi_sec', 'poc_quality_score': best_poc.get('quality_analysis', {}).get('quality_score', 0) if best_poc else 0, 'nomi_sec_data': { 'total_pocs': len(poc_data), 'best_poc_quality': best_poc.get('quality_analysis', {}).get('quality_score', 0) if best_poc else 0, 'total_stars': sum(p.get('stargazers_count', 0) for p in poc_data), 'avg_stars': sum(p.get('stargazers_count', 0) for p in poc_data) / len(poc_data) if poc_data else 0 }, 'github_repos': [p.get('html_url', '') for p in poc_data], 'exploit_indicators': json.dumps(self._combine_exploit_indicators(poc_data)), 'updated_at': datetime.utcnow() } if existing_rule: # Update existing rule for key, value in rule_data.items(): setattr(existing_rule, key, value) logger.info(f"Updated SIGMA rule for {cve.cve_id}") else: # Create new rule new_rule = SigmaRule(**rule_data) self.db_session.add(new_rule) logger.info(f"Created new SIGMA rule for {cve.cve_id}") self.db_session.commit() return { 'success': True, 'cve_id': cve.cve_id, 'template': template.template_name, 'confidence_level': confidence_level, 'poc_count': len(poc_data), 'quality_score': best_poc.get('quality_analysis', {}).get('quality_score', 0) if best_poc else 0 } except Exception as e: logger.error(f"Error generating enhanced rule for {cve.cve_id}: {e}") return {'success': False, 'error': str(e)} async def _select_template(self, cve, best_poc: Optional[dict]) -> Optional[object]: """Select the most appropriate template based on CVE and PoC analysis""" from main import RuleTemplate templates = self.db_session.query(RuleTemplate).all() if not templates: logger.warning("No rule templates found in database") return None # Score templates based on relevance template_scores = {} for template in templates: score = 0 # Score based on PoC indicators (highest priority) if best_poc: indicators = best_poc.get('exploit_indicators', {}) score += self._score_template_poc_match(template, indicators) # Score based on CVE description score += self._score_template_cve_match(template, cve) # Score based on affected products if cve.affected_products: score += self._score_template_product_match(template, cve.affected_products) template_scores[template] = score # Return template with highest score if template_scores: best_template = max(template_scores, key=template_scores.get) logger.info(f"Selected template {best_template.template_name} with score {template_scores[best_template]}") return best_template return None def _score_template_poc_match(self, template: object, indicators: dict) -> int: """Score template based on PoC indicators""" score = 0 template_name = template.template_name.lower() # Process-based templates if 'process' in template_name or 'execution' in template_name: if indicators.get('processes') or indicators.get('commands'): score += 30 # Network-based templates if 'network' in template_name or 'connection' in template_name: if indicators.get('network') or indicators.get('urls'): score += 30 # File-based templates if 'file' in template_name or 'modification' in template_name: if indicators.get('files'): score += 30 # PowerShell templates if 'powershell' in template_name: processes = indicators.get('processes', []) if any('powershell' in p.lower() for p in processes): score += 35 return score def _score_template_cve_match(self, template: object, cve) -> int: """Score template based on CVE description""" score = 0 template_name = template.template_name.lower() description = (cve.description or '').lower() # Keyword matching if 'remote' in description and 'execution' in description: if 'process' in template_name or 'execution' in template_name: score += 20 if 'powershell' in description: if 'powershell' in template_name: score += 25 if 'network' in description or 'http' in description: if 'network' in template_name: score += 20 if 'file' in description or 'upload' in description: if 'file' in template_name: score += 20 return score def _score_template_product_match(self, template: object, affected_products: list) -> int: """Score template based on affected products""" score = 0 if not template.applicable_product_patterns: return 0 for pattern in template.applicable_product_patterns: pattern_lower = pattern.lower() for product in affected_products: product_lower = product.lower() if pattern_lower in product_lower: score += 10 break return score async def _generate_rule_content(self, cve, template: object, poc_data: list) -> str: """Generate the actual SIGMA rule content""" # Combine all exploit indicators combined_indicators = self._combine_exploit_indicators(poc_data) # Get base template content rule_content = template.template_content # Replace template placeholders replacements = { '{{CVE_ID}}': cve.cve_id, '{{TITLE}}': f"{cve.cve_id} Enhanced Detection", '{{DESCRIPTION}}': self._generate_description(cve, poc_data), '{{LEVEL}}': self._calculate_confidence_level(cve, poc_data).lower(), '{{REFERENCES}}': self._generate_references(cve, poc_data), '{{TAGS}}': self._generate_tags(cve, poc_data), '{{PROCESSES}}': self._format_indicators(combined_indicators.get('processes', [])), '{{FILES}}': self._format_indicators(combined_indicators.get('files', [])), '{{COMMANDS}}': self._format_indicators(combined_indicators.get('commands', [])), '{{NETWORK}}': self._format_indicators(combined_indicators.get('network', [])), '{{URLS}}': self._format_indicators(combined_indicators.get('urls', [])), '{{REGISTRY}}': self._format_indicators(combined_indicators.get('registry', [])) } # Apply replacements for placeholder, value in replacements.items(): rule_content = rule_content.replace(placeholder, value) # Add enhanced detection based on PoC quality if poc_data: rule_content = self._enhance_detection_logic(rule_content, combined_indicators, poc_data) return rule_content def _combine_exploit_indicators(self, poc_data: list) -> dict: """Combine exploit indicators from all PoCs""" combined = { 'processes': [], 'files': [], 'commands': [], 'network': [], 'urls': [], 'registry': [] } for poc in poc_data: indicators = poc.get('exploit_indicators', {}) for key in combined.keys(): if key in indicators: combined[key].extend(indicators[key]) # Deduplicate and filter for key in combined.keys(): combined[key] = list(set(combined[key])) # Remove empty and invalid entries combined[key] = [item for item in combined[key] if item and len(item) > 2] return combined def _generate_description(self, cve, poc_data: list) -> str: """Generate enhanced rule description""" base_desc = f"Detection for {cve.cve_id}" if cve.description: # Extract key terms from CVE description desc_words = cve.description.lower().split() key_terms = [word for word in desc_words if word in [ 'remote', 'execution', 'injection', 'bypass', 'privilege', 'escalation', 'overflow', 'disclosure', 'traversal', 'deserialization' ]] if key_terms: base_desc += f" involving {', '.join(set(key_terms[:3]))}" if poc_data: total_pocs = len(poc_data) total_stars = sum(p.get('stargazers_count', 0) for p in poc_data) base_desc += f" [Enhanced with {total_pocs} PoC(s), {total_stars} stars]" return base_desc def _generate_references(self, cve, poc_data: list) -> str: """Generate references section""" refs = [] # Add CVE reference refs.append(f"https://nvd.nist.gov/vuln/detail/{cve.cve_id}") # Add top PoC references (max 3) if poc_data: sorted_pocs = sorted(poc_data, key=lambda x: x.get('stargazers_count', 0), reverse=True) for poc in sorted_pocs[:3]: if poc.get('html_url'): refs.append(poc['html_url']) return '\\n'.join(f" - {ref}" for ref in refs) def _generate_tags(self, cve, poc_data: list) -> str: """Generate MITRE ATT&CK tags and other tags""" tags = [] # CVE tag tags.append(cve.cve_id.lower()) # Add technique tags based on indicators combined_indicators = self._combine_exploit_indicators(poc_data) if combined_indicators.get('processes'): tags.append('attack.t1059') # Command and Scripting Interpreter if combined_indicators.get('network'): tags.append('attack.t1071') # Application Layer Protocol if combined_indicators.get('files'): tags.append('attack.t1105') # Ingress Tool Transfer if any('powershell' in p.lower() for p in combined_indicators.get('processes', [])): tags.append('attack.t1059.001') # PowerShell # Add PoC quality tags if poc_data: tags.append('exploit.poc') best_poc = max(poc_data, key=lambda x: x.get('quality_analysis', {}).get('quality_score', 0)) quality_tier = best_poc.get('quality_analysis', {}).get('quality_tier', 'poor') tags.append(f'poc.quality.{quality_tier}') return '\\n'.join(f" - {tag}" for tag in tags) def _format_indicators(self, indicators: list) -> str: """Format indicators for SIGMA rule""" if not indicators: return '' # Limit indicators to avoid overly complex rules limited_indicators = indicators[:10] formatted = [] for indicator in limited_indicators: # Escape special characters for SIGMA escaped = indicator.replace('\\\\', '\\\\\\\\').replace('*', '\\\\*').replace('?', '\\\\?') formatted.append(f' - "{escaped}"') return '\\n'.join(formatted) def _enhance_detection_logic(self, rule_content: str, indicators: dict, poc_data: list) -> str: """Enhance detection logic based on PoC quality and indicators""" # If we have high-quality PoCs, add additional detection conditions best_poc = max(poc_data, key=lambda x: x.get('quality_analysis', {}).get('quality_score', 0)) quality_score = best_poc.get('quality_analysis', {}).get('quality_score', 0) if quality_score > 60: # High quality PoC # Add more specific detection conditions if indicators.get('processes') and indicators.get('commands'): additional_condition = """ process_and_command: Image|contains: {{PROCESSES}} CommandLine|contains: {{COMMANDS}}""" # Insert before the condition line rule_content = rule_content.replace( 'condition: selection', additional_condition + '\\n condition: selection or process_and_command' ) return rule_content def _calculate_confidence_level(self, cve, poc_data: list) -> str: """Calculate confidence level based on CVE and PoC data""" score = 0 # CVSS score factor if cve.cvss_score: if cve.cvss_score >= 9.0: score += 40 elif cve.cvss_score >= 7.0: score += 30 elif cve.cvss_score >= 5.0: score += 20 else: score += 10 # PoC quality factor if poc_data: total_stars = sum(p.get('stargazers_count', 0) for p in poc_data) poc_count = len(poc_data) score += min(total_stars, 30) # Max 30 points for stars score += min(poc_count * 5, 20) # Max 20 points for PoC count # Quality tier bonus best_poc = max(poc_data, key=lambda x: x.get('quality_analysis', {}).get('quality_score', 0)) quality_tier = best_poc.get('quality_analysis', {}).get('quality_tier', 'poor') tier_bonus = { 'excellent': 20, 'good': 15, 'fair': 10, 'poor': 5, 'very_poor': 0 } score += tier_bonus.get(quality_tier, 0) # Determine confidence level if score >= 80: return 'HIGH' elif score >= 60: return 'MEDIUM' elif score >= 40: return 'LOW' else: return 'INFORMATIONAL' def _extract_log_source(self, template_name: str) -> str: """Extract log source from template name""" template_lower = template_name.lower() if 'process' in template_lower or 'execution' in template_lower: return 'process_creation' elif 'network' in template_lower: return 'network_connection' elif 'file' in template_lower: return 'file_event' elif 'powershell' in template_lower: return 'powershell' elif 'registry' in template_lower: return 'registry_event' else: return 'generic'