diff --git a/backend/enhanced_sigma_generator.py b/backend/enhanced_sigma_generator.py index cab9472..19393d5 100644 --- a/backend/enhanced_sigma_generator.py +++ b/backend/enhanced_sigma_generator.py @@ -11,6 +11,7 @@ from sqlalchemy.orm import Session import re from llm_client import LLMClient from cve2capec_client import CVE2CAPECClient +from poc_analyzer import PoCAnalyzer # Configure logging logging.basicConfig(level=logging.INFO) @@ -23,6 +24,7 @@ class EnhancedSigmaGenerator: self.db_session = db_session self.llm_client = LLMClient(provider=llm_provider, model=llm_model) self.cve2capec_client = CVE2CAPECClient() + self.poc_analyzer = PoCAnalyzer() async def generate_enhanced_rule(self, cve, use_llm: bool = True) -> dict: """Generate enhanced SIGMA rule for a CVE using PoC data""" @@ -134,10 +136,17 @@ class EnhancedSigmaGenerator: logger.warning(f"No PoC content available for {cve.cve_id}") return None - # Generate rule using LLM + # Analyze PoC content with the PoC analyzer + logger.info(f"Analyzing PoC content for {cve.cve_id} with PoCAnalyzer") + poc_analysis = self.poc_analyzer.analyze_poc(poc_content, cve.cve_id) + + # Enhance the PoC content with structured analysis + enhanced_poc_content = self._format_poc_analysis_for_llm(poc_content, poc_analysis) + + # Generate rule using LLM with enhanced PoC content rule_content = await self.llm_client.generate_sigma_rule( cve_id=cve.cve_id, - poc_content=poc_content, + poc_content=enhanced_poc_content, cve_description=cve.description or "", existing_rule=None ) @@ -234,6 +243,120 @@ class EnhancedSigmaGenerator: return None + def _format_poc_analysis_for_llm(self, original_poc_content: str, poc_analysis: dict) -> str: + """Format PoC analysis results for LLM consumption""" + + # Extract key findings from analysis + language = poc_analysis.get('language', 'unknown') + quality_score = poc_analysis.get('quality_score', {}) + mitre_techniques = poc_analysis.get('mitre_techniques', []) + behaviors = poc_analysis.get('behaviors', []) + + # Extract indicators + processes = poc_analysis.get('processes', []) + files = poc_analysis.get('files', []) + network = poc_analysis.get('network', []) + registry = poc_analysis.get('registry', []) + commands = poc_analysis.get('commands', []) + + # Build enhanced content for LLM + enhanced_content = f"""**ORIGINAL POC CODE:** +{original_poc_content[:2000]} + +**STRUCTURED POC ANALYSIS:** + +**Language Detected:** {language} + +**Security Indicators Extracted:** + +**Process Execution Indicators:** +{self._format_indicators_for_display(processes)} + +**File System Indicators:** +{self._format_indicators_for_display(files)} + +**Network Communication Indicators:** +{self._format_indicators_for_display(network)} + +**Registry Modification Indicators:** +{self._format_indicators_for_display(registry)} + +**Command Execution Indicators:** +{self._format_indicators_for_display(commands)} + +**MITRE ATT&CK Techniques Detected:** +{self._format_mitre_techniques_for_display(mitre_techniques)} + +**Attack Behaviors Identified:** +{self._format_behaviors_for_display(behaviors)} + +**Analysis Quality:** +- Overall Score: {quality_score.get('overall_score', 0)}/1.0 +- Total Indicators: {poc_analysis.get('total_indicators', 0)} +- High Confidence Indicators: {poc_analysis.get('high_confidence_indicators', 0)} +- Recommendation: {quality_score.get('recommendation', 'Unknown')} + +**DETECTION GUIDANCE:** +Use the above structured indicators to create specific SIGMA detection patterns. Focus on the high-confidence indicators and behaviors for the most accurate detection rules.""" + + return enhanced_content + + def _format_indicators_for_display(self, indicators: list) -> str: + """Format indicators for LLM display""" + if not indicators: + return "- None detected" + + formatted = [] + for indicator in indicators[:5]: # Limit to top 5 indicators + if isinstance(indicator, dict): + value = indicator.get('value', str(indicator)) + confidence = indicator.get('confidence', 0) + attack_technique = indicator.get('attack_technique') + technique_info = f" (MITRE: {attack_technique})" if attack_technique else "" + formatted.append(f"- {value} (confidence: {confidence:.2f}){technique_info}") + else: + formatted.append(f"- {indicator}") + + if len(indicators) > 5: + formatted.append(f"- ... and {len(indicators) - 5} more indicators") + + return "\n".join(formatted) + + def _format_mitre_techniques_for_display(self, techniques: list) -> str: + """Format MITRE ATT&CK techniques for display""" + if not techniques: + return "- None detected" + + formatted = [] + for technique in techniques: + # Get technique name if available + technique_name = self.cve2capec_client.get_technique_name(technique) if hasattr(self, 'cve2capec_client') else "" + if technique_name: + formatted.append(f"- {technique}: {technique_name}") + else: + formatted.append(f"- {technique}") + + return "\n".join(formatted) + + def _format_behaviors_for_display(self, behaviors: list) -> str: + """Format attack behaviors for display""" + if not behaviors: + return "- None detected" + + formatted = [] + for behavior in behaviors: + if isinstance(behavior, dict): + behavior_type = behavior.get('behavior', 'unknown') + confidence = behavior.get('confidence', 0) + indicators = behavior.get('indicators', []) + formatted.append(f"- {behavior_type.replace('_', ' ').title()} (confidence: {confidence:.2f})") + if indicators: + formatted.append(f" Indicators: {', '.join(indicators[:3])}") + else: + formatted.append(f"- {behavior}") + + return "\n".join(formatted) + def _extract_log_source_from_content(self, rule_content: str) -> str: """Extract log source from the generated rule content""" try: @@ -467,7 +590,7 @@ class EnhancedSigmaGenerator: if poc.get('html_url'): refs.append(poc['html_url']) - return '\\n'.join(f" - {ref}" for ref in refs) + return '\n'.join(f" - {ref}" for ref in refs) def _generate_tags(self, cve, poc_data: list) -> str: """Generate MITRE ATT&CK tags and other tags using CVE2CAPEC mappings""" @@ -488,21 +611,9 @@ class EnhancedSigmaGenerator: if attack_tag not in tags: tags.append(attack_tag) else: - # Fallback to indicator-based technique detection - logger.info(f"No CVE2CAPEC mapping found for {cve.cve_id}, using indicator-based detection") - combined_indicators = self._combine_exploit_indicators(poc_data) - - if combined_indicators.get('processes'): - tags.append('attack.t1059') # Command and Scripting Interpreter - - if combined_indicators.get('network'): - tags.append('attack.t1071') # Application Layer Protocol - - if combined_indicators.get('files'): - tags.append('attack.t1105') # Ingress Tool Transfer - - if any('powershell' in p.lower() for p in combined_indicators.get('processes', [])): - tags.append('attack.t1059.001') # PowerShell + # No CVE2CAPEC mapping found - do not add fallback techniques + logger.warning(f"No CVE2CAPEC mapping found for {cve.cve_id}, no MITRE techniques will be added") + # Note: LLM will rely on the PoC analysis to determine appropriate techniques # Get CWE codes for additional context cwe_codes = self.cve2capec_client.get_cwe_for_cve(cve.cve_id) @@ -518,17 +629,18 @@ class EnhancedSigmaGenerator: quality_tier = best_poc.get('quality_analysis', {}).get('quality_tier', 'poor') tags.append(f'poc.quality.{quality_tier}') - # Return tags as a single line for first tag, then additional tags on new lines + # Return tags as YAML array format if not tags: return "unknown" if len(tags) == 1: return tags[0] else: - # First tag goes directly after the dash, rest are on new lines - first_tag = tags[0] - additional_tags = '\\n'.join(f" - {tag}" for tag in tags[1:]) - return f"{first_tag}\\n{additional_tags}" + # Format as proper YAML array + formatted_tags = [] + for tag in tags: + formatted_tags.append(f" - {tag}") + return '\n'.join(formatted_tags) def _format_indicators(self, indicators: list) -> str: """Format indicators for SIGMA rule""" @@ -546,7 +658,7 @@ class EnhancedSigmaGenerator: escaped = cleaned.replace('\\\\', '\\\\\\\\').replace('*', '\\\\*').replace('?', '\\\\?') formatted.append(f' - "{escaped}"') - return '\\n'.join(formatted) if formatted else ' - "*" # No valid indicators' + return '\n'.join(formatted) if formatted else ' - "*" # No valid indicators' def _enhance_detection_logic(self, rule_content: str, indicators: dict, poc_data: list) -> str: """Enhance detection logic based on PoC quality and indicators""" @@ -566,7 +678,7 @@ class EnhancedSigmaGenerator: # Insert before the condition line rule_content = rule_content.replace( 'condition: selection', - additional_condition + '\\n condition: selection or process_and_command' + additional_condition + '\n condition: selection or process_and_command' ) return rule_content diff --git a/backend/llm_client.py b/backend/llm_client.py index f946ab1..88805ad 100644 --- a/backend/llm_client.py +++ b/backend/llm_client.py @@ -278,13 +278,47 @@ class LLMClient: - status: experimental - description: Specific description based on CVE and PoC analysis - author: 'AI Generated' -- date: Current date (2025/01/14) +- date: Current date (2025/01/16) - references: Include the EXACT CVE URL with the CVE ID provided by the user - tags: Relevant MITRE ATT&CK techniques based on PoC analysis - logsource: Appropriate category based on exploit type - detection: Specific indicators from PoC analysis (NOT generic examples) - condition: Logic connecting the detection selections +**MITRE ATT&CK TAGS FORMAT REQUIREMENTS:** +- Use ONLY the MITRE ATT&CK techniques provided in the "MITRE ATT&CK TECHNIQUE MAPPINGS" section above +- Convert technique IDs to lowercase attack.t format (e.g., T1134 becomes attack.t1134) +- Include specific sub-techniques when available (e.g., T1134.001 becomes attack.t1134.001) +- DO NOT use generic techniques not listed in the mappings +- DO NOT add additional techniques based on your training data + +**CRITICAL:** ONLY use the MITRE ATT&CK techniques explicitly provided in the technique mappings above. Do not add any other techniques. + +**COMPLETE SIGMA RULE EXAMPLE (TECHNIQUE TAGS MUST MATCH PROVIDED MAPPINGS):** +```yaml +title: 'CVE-2024-XXXX Detection Rule' +id: a1b2c3d4-e5f6-7890-abcd-ef1234567890 +status: experimental +description: 'Detection for CVE-2024-XXXX vulnerability' +author: 'AI Generated' +date: 2025/01/16 +references: + - https://nvd.nist.gov/vuln/detail/CVE-2024-XXXX +tags: + - attack.t1134 # Access Token Manipulation (example - use actual mappings) + - attack.t1134.001 # Token Impersonation/Theft (example - use actual mappings) +logsource: + category: process_creation + product: windows +detection: + selection: + Image|contains: 'specific_indicator' + condition: selection +level: medium +``` + +**IMPORTANT:** The tags section above is just an example format. You MUST use the exact techniques provided in the MITRE ATT&CK TECHNIQUE MAPPINGS section for the specific CVE you're analyzing. + **CRITICAL ANTI-HALLUCINATION RULES:** 1. You MUST use the EXACT CVE ID provided in the user input - NEVER generate a different CVE ID 2. NEVER use example CVE IDs like CVE-2022-1234, CVE-2023-5678, or CVE-2024-1234 @@ -323,7 +357,14 @@ Enhance this rule with PoC insights. Output only valid SIGMA YAML starting with **MITRE ATT&CK TECHNIQUE MAPPINGS FOR {cve_id}:** {chr(10).join(technique_details)} -**IMPORTANT:** Use these exact MITRE ATT&CK techniques in your tags section. Convert them to lowercase attack.t format (e.g., T1059 becomes attack.t1059).""" +**CRITICAL REQUIREMENT:** Use ONLY these exact MITRE ATT&CK techniques in your tags section. Convert them to lowercase attack.t format (e.g., T1134 becomes attack.t1134, T1134.001 becomes attack.t1134.001). + +**ABSOLUTELY FORBIDDEN:** +- Do not use T1059, T1071, T1105, T1055, T1068, T1140, T1036, T1112, T1547 or any other techniques not listed above +- Do not add techniques based on PoC analysis if they're not in the provided mappings +- Do not use generic techniques from your training data + +If no MITRE techniques are provided above, use only CVE and CWE tags.""" if mitre_mappings['cwe_codes']: mitre_suggestions += f""" @@ -344,10 +385,25 @@ Enhance this rule with PoC insights. Output only valid SIGMA YAML starting with 1. Use EXACTLY this CVE ID in the title: {{cve_id}} 2. Use EXACTLY this CVE URL in references: https://nvd.nist.gov/vuln/detail/{{cve_id}} 3. Analyze the CVE description to understand the vulnerability type -4. Extract specific indicators from the PoC code (files, processes, commands, network patterns) -5. Create detection logic based on the actual exploit behavior -6. Use relevant logsource category (process_creation, file_event, network_connection, etc.) -7. Include the MITRE ATT&CK tags listed above in your tags section (convert to attack.t format) +4. If the PoC analysis above contains structured indicators, use those EXACT indicators in your detection rules +5. **USE ONLY THE MITRE ATT&CK TECHNIQUES LISTED IN THE MAPPINGS ABOVE** - Do not add any other techniques +6. Choose the appropriate logsource category based on the primary indicator types (process_creation, file_event, network_connection, registry_event, etc.) +7. Convert the mapped MITRE techniques to lowercase attack.t format (T1134 → attack.t1134, T1134.001 → attack.t1134.001) + +**DETECTION PATTERN GUIDANCE:** +- For Process Execution indicators: Use Image, CommandLine, or ProcessName fields +- For File System indicators: Use TargetFilename, SourceFilename, or FilePath fields +- For Network indicators: Use DestinationHostname, DestinationIp, or DestinationPort fields +- For Registry indicators: Use TargetObject, Details, or EventType fields +- For Command indicators: Use CommandLine or ProcessCommandLine fields + +**TAGS FORMATTING REQUIREMENTS:** +- Use ONLY the MITRE ATT&CK techniques provided in the "MITRE ATT&CK TECHNIQUE MAPPINGS" section above +- Convert to lowercase attack.t format: T1134 → attack.t1134, T1134.001 → attack.t1134.001 +- Include comments for clarity: attack.t1134 # Access Token Manipulation +- Use specific sub-techniques when available +- DO NOT add techniques not listed in the provided mappings +- DO NOT use generic techniques from your training data **CRITICAL ANTI-HALLUCINATION REQUIREMENTS:** - THE CVE ID IS: {{cve_id}} @@ -355,6 +411,7 @@ Enhance this rule with PoC insights. Output only valid SIGMA YAML starting with - DO NOT generate a different CVE ID from your training data - You MUST use the exact CVE ID "{{cve_id}}" - this is the ONLY acceptable CVE ID for this rule - Base your analysis ONLY on the provided CVE description and PoC code above +- If structured indicators are provided in the PoC analysis, use those exact values - Do not reference other vulnerabilities or exploits not mentioned in the provided content - NEVER use placeholder CVE IDs like CVE-YYYY-NNNN or CVE-2022-1234 @@ -741,30 +798,36 @@ Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE stripped = line.strip() # Check for orphaned list items (lines starting with - but not part of an array) + # But be more careful - don't remove items that are properly indented under a parent if (stripped.startswith('- ') and i > 0 and not lines[i-1].strip().endswith(':') and ':' not in stripped and - not stripped.startswith('- https://')): # Don't remove reference URLs + not stripped.startswith('- https://') and # Don't remove reference URLs + not stripped.startswith('- attack.') and # Don't remove MITRE ATT&CK tags + not re.match(r'- [a-z0-9._-]+$', stripped)): # Don't remove simple tags - # Check if this looks like a MITRE ATT&CK tag - if re.match(r'- T\d{4}', stripped): - # Try to find the tags section and add it there - tags_line_found = False - for j in range(len(fixed_lines)-1, -1, -1): - if fixed_lines[j].strip().startswith('tags:'): - # This is an orphaned tag, add it to the tags array - fixed_lines.append(f" {stripped}") - fixes_applied.append(f"Fixed orphaned MITRE tag: {stripped}") - tags_line_found = True - break + # Check if this is properly indented under a parent (like tags:) + is_properly_indented = False + current_indent = len(line) - len(line.lstrip()) + + # Look backwards to find a parent with less indentation + for j in range(i-1, -1, -1): + prev_line = lines[j] + prev_stripped = prev_line.strip() + prev_indent = len(prev_line) - len(prev_line.lstrip()) - if not tags_line_found: - # No tags section found, remove the orphaned item - fixes_applied.append(f"Removed orphaned tag (no tags section): {stripped}") - continue - else: - # Other orphaned list items, remove them + if prev_stripped and prev_indent < current_indent: + # Found a parent with less indentation + if prev_stripped.endswith(':'): + is_properly_indented = True + break + else: + # This is likely orphaned + break + + if not is_properly_indented: + # This is truly orphaned, remove it fixes_applied.append(f"Removed orphaned list item: {stripped}") continue @@ -825,8 +888,17 @@ Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE except yaml.YAMLError as e2: logger.warning(f"YAML repair attempt failed: {e2}") - # Last resort: try to build a minimal valid SIGMA rule - return self._build_minimal_valid_rule(content, fixes_applied) + # Try a more aggressive repair before falling back to minimal rule + aggressive_repair = self._aggressive_yaml_repair(content) + try: + yaml.safe_load(aggressive_repair) + fixes_applied.append("Applied aggressive YAML repair") + logger.info("Successfully repaired YAML with aggressive method") + return aggressive_repair + except yaml.YAMLError as e3: + logger.warning(f"Aggressive repair also failed: {e3}") + # Last resort: try to build a minimal valid SIGMA rule + return self._build_minimal_valid_rule(content, fixes_applied) def _repair_yaml_structure(self, content: str, error_msg: str) -> str: """Attempt to repair common YAML structural issues.""" @@ -837,6 +909,8 @@ Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE expected_indent = 0 in_detection = False detection_indent = 0 + in_tags = False + tags_indent = 0 for i, line in enumerate(lines): stripped = line.strip() @@ -847,6 +921,24 @@ Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE repaired_lines.append(line) continue + # Track if we're in the tags section + if stripped.startswith('tags:'): + in_tags = True + tags_indent = current_indent + repaired_lines.append(line) + continue + elif in_tags and current_indent <= tags_indent and not stripped.startswith('-'): + # We've left the tags section + in_tags = False + + # Fix tags section indentation + if in_tags and stripped.startswith('-'): + # Ensure proper indentation for tag items + if current_indent <= tags_indent: + corrected_line = ' ' * (tags_indent + 2) + stripped + repaired_lines.append(corrected_line) + continue + # Track if we're in the detection section if stripped.startswith('detection:'): in_detection = True @@ -875,6 +967,21 @@ Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE repaired_lines.append(corrected_line) continue + # Fix logsource section indentation + if stripped.startswith('logsource:'): + # Logsource should be at root level (no indentation) + if current_indent > 0: + corrected_line = stripped + repaired_lines.append(corrected_line) + continue + elif line.lstrip().startswith(('category:', 'product:', 'service:')) and i > 0: + # These should be indented under logsource + prev_line = lines[i-1].strip() + if prev_line.startswith('logsource:') or any('logsource' in repaired_lines[j] for j in range(max(0, len(repaired_lines)-5), len(repaired_lines))): + corrected_line = ' ' + stripped + repaired_lines.append(corrected_line) + continue + # Fix lines that start with wrong indentation if ':' in stripped and not stripped.startswith('-'): # This is a key-value pair @@ -891,6 +998,85 @@ Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE return '\n'.join(repaired_lines) + def _aggressive_yaml_repair(self, content: str) -> str: + """Aggressive YAML repair that reconstructs the document structure.""" + lines = content.split('\n') + + # Extract key components + title = "Generated SIGMA Rule" + rule_id = "00000000-0000-0000-0000-000000000000" + description = "Generated detection rule" + author = "AI Generated" + date = "2025/01/16" + references = [] + tags = [] + logsource_category = "process_creation" + logsource_product = "windows" + detection_rules = [] + condition = "selection" + level = "medium" + + # Parse existing content + for i, line in enumerate(lines): + stripped = line.strip() + + if stripped.startswith('title:'): + title = stripped.split(':', 1)[1].strip().strip('"\'') + elif stripped.startswith('id:'): + rule_id = stripped.split(':', 1)[1].strip().strip('"\'') + elif stripped.startswith('description:'): + description = stripped.split(':', 1)[1].strip().strip('"\'') + elif stripped.startswith('author:'): + author = stripped.split(':', 1)[1].strip().strip('"\'') + elif stripped.startswith('date:'): + date = stripped.split(':', 1)[1].strip().strip('"\'') + elif stripped.startswith('level:'): + level = stripped.split(':', 1)[1].strip().strip('"\'') + elif stripped.startswith('condition:'): + condition = stripped.split(':', 1)[1].strip().strip('"\'') + elif stripped.startswith('- http'): + references.append(stripped[2:].strip()) + elif stripped.startswith('- attack.') or stripped.startswith('- cve-') or stripped.startswith('- exploit.') or stripped.startswith('- poc.') or stripped.startswith('- cwe.'): + tags.append(stripped[2:].strip()) + elif 'category:' in stripped: + logsource_category = stripped.split(':', 1)[1].strip().strip('"\'') + elif 'product:' in stripped: + logsource_product = stripped.split(':', 1)[1].strip().strip('"\'') + + # Build a clean YAML structure + yaml_content = f"""title: '{title}' +id: {rule_id} +status: experimental +description: '{description}' +author: '{author}' +date: {date} +references:""" + + if references: + for ref in references: + yaml_content += f"\n - {ref}" + else: + yaml_content += "\n - https://example.com" + + yaml_content += "\ntags:" + if tags: + for tag in tags: + yaml_content += f"\n - {tag}" + else: + yaml_content += "\n - unknown" + + yaml_content += f""" +logsource: + category: {logsource_category} + product: {logsource_product} +detection: + selection: + Image: '*' + condition: {condition} +level: {level}""" + + return yaml_content + def _build_minimal_valid_rule(self, content: str, fixes_applied: list) -> str: """Build a minimal valid SIGMA rule from the content.""" lines = content.split('\n') @@ -915,7 +1101,7 @@ id: {rule_id} status: experimental description: '{description}' author: 'AI Generated' -date: 2025/01/14 +date: 2025/01/16 references: - https://example.com logsource: diff --git a/backend/poc_analyzer.py b/backend/poc_analyzer.py new file mode 100755 index 0000000..d6b7a47 --- /dev/null +++ b/backend/poc_analyzer.py @@ -0,0 +1,733 @@ +""" +Advanced PoC (Proof of Concept) analyzer for extracting security indicators +from exploit code across multiple programming languages and attack vectors. +""" + +import re +import base64 +import binascii +from typing import Dict, List, Set, Optional, Tuple +from dataclasses import dataclass +from enum import Enum +import logging + +logger = logging.getLogger(__name__) + +class AttackTechnique(Enum): + PROCESS_INJECTION = "T1055" + COMMAND_EXECUTION = "T1059" + POWERSHELL = "T1059.001" + COMMAND_LINE = "T1059.003" + FILE_CREATION = "T1105" + REGISTRY_MODIFICATION = "T1112" + NETWORK_CONNECTION = "T1071" + PRIVILEGE_ESCALATION = "T1068" + DLL_INJECTION = "T1055.001" + PROCESS_HOLLOWING = "T1055.012" + SERVICE_CREATION = "T1543.003" + +@dataclass +class SecurityIndicator: + """Represents a security indicator extracted from PoC code.""" + type: str # process, file, network, registry, command + value: str + confidence: float # 0.0 to 1.0 + context: str # surrounding code context + attack_technique: Optional[AttackTechnique] = None + metadata: Dict = None + +class PoCAnalyzer: + """Advanced analyzer for extracting security indicators from PoC code.""" + + def __init__(self): + self.indicators: List[SecurityIndicator] = [] + self.language_patterns = self._initialize_language_patterns() + self.attack_patterns = self._initialize_attack_patterns() + self.false_positive_filters = self._initialize_fp_filters() + + def analyze_poc(self, poc_content: str, cve_id: str = None) -> Dict[str, any]: + """ + Main analysis function that extracts all security indicators. + + Args: + poc_content: The PoC source code + cve_id: Optional CVE identifier for context + + Returns: + Dictionary containing categorized indicators and analysis + """ + self.indicators = [] + + # Detect programming language + language = self._detect_language(poc_content) + + # Extract indicators by category + processes = self._extract_process_indicators(poc_content, language) + files = self._extract_file_indicators(poc_content, language) + network = self._extract_network_indicators(poc_content, language) + registry = self._extract_registry_indicators(poc_content, language) + commands = self._extract_command_indicators(poc_content, language) + + # Extract encoded/obfuscated content + decoded_content = self._extract_encoded_content(poc_content) + if decoded_content: + # Recursively analyze decoded content + for content in decoded_content: + sub_analysis = self.analyze_poc(content) + processes.extend(sub_analysis['processes']) + files.extend(sub_analysis['files']) + network.extend(sub_analysis['network']) + registry.extend(sub_analysis['registry']) + commands.extend(sub_analysis['commands']) + + # Behavioral analysis + behaviors = self._analyze_attack_behaviors(poc_content, language) + + # MITRE ATT&CK technique mapping + techniques = self._map_to_mitre_attack( + processes + files + network + registry + commands + ) + + # Quality assessment + analysis_quality = self._assess_analysis_quality(poc_content) + + return { + 'language': language, + 'processes': self._deduplicate_and_rank(processes), + 'files': self._deduplicate_and_rank(files), + 'network': self._deduplicate_and_rank(network), + 'registry': self._deduplicate_and_rank(registry), + 'commands': self._deduplicate_and_rank(commands), + 'behaviors': behaviors, + 'mitre_techniques': techniques, + 'quality_score': analysis_quality, + 'total_indicators': len(self.indicators), + 'high_confidence_indicators': len([i for i in self.indicators if i.confidence > 0.7]) + } + + def _detect_language(self, content: str) -> str: + """Detect the primary programming language of the PoC.""" + language_indicators = { + 'powershell': [ + r'\$[a-zA-Z_][a-zA-Z0-9_]*', r'Get-\w+', r'Set-\w+', r'New-\w+', + r'Invoke-\w+', r'Add-Type', r'\[System\.\w+\]' + ], + 'python': [ + r'import\s+\w+', r'from\s+\w+\s+import', r'def\s+\w+\(', + r'subprocess\.', r'os\.system', r'__name__\s*==\s*["\']__main__["\']' + ], + 'bash': [ + r'#!/bin/bash', r'#!/bin/sh', r'\$\{[^}]+\}', r'chmod\s+\+x', + r'wget\s+', r'curl\s+', r'echo\s+.*\|' + ], + 'batch': [ + r'@echo\s+off', r'%[^%]+%', r'goto\s+\w+', r'if\s+exist', + r'cmd\s*/c', r'start\s+' + ], + 'c_cpp': [ + r'#include\s*<[^>]+>', r'int\s+main\s*\(', r'printf\s*\(', + r'malloc\s*\(', r'free\s*\(', r'system\s*\(' + ], + 'csharp': [ + r'using\s+System', r'namespace\s+\w+', r'class\s+\w+', + r'Process\.Start', r'Registry\.', r'new\s+ProcessStartInfo' + ], + 'javascript': [ + r'function\s+\w+\s*\(', r'var\s+\w+\s*=', r'console\.log', + r'require\s*\(', r'=>', r'new\s+XMLHttpRequest' + ], + 'php': [ + r'<\?php', r'\$[a-zA-Z_][a-zA-Z0-9_]*', r'echo\s+', + r'exec\s*\(', r'system\s*\(', r'shell_exec' + ] + } + + scores = {} + content_lower = content.lower() + + for lang, patterns in language_indicators.items(): + score = 0 + for pattern in patterns: + matches = len(re.findall(pattern, content, re.IGNORECASE | re.MULTILINE)) + score += matches + scores[lang] = score + + if not scores or max(scores.values()) == 0: + return 'unknown' + + return max(scores, key=scores.get) + + def _extract_process_indicators(self, content: str, language: str) -> List[SecurityIndicator]: + """Extract process execution indicators.""" + indicators = [] + + patterns = { + 'powershell': [ + r'Start-Process\s+["\']?([^"\';\s]+)', + r'Invoke-Expression\s+["\']?([^"\';\s]+)', + r'&\s+["\']?([^"\';\s]+\.exe)', + r'\.\s+["\']?([^"\';\s]+\.exe)' + ], + 'python': [ + r'subprocess\.call\(\s*["\']([^"\']+)', + r'subprocess\.Popen\(\s*["\']([^"\']+)', + r'os\.system\(\s*["\']([^"\']+)', + r'os\.exec[vl]?p?\(\s*["\']([^"\']+)' + ], + 'bash': [ + r'exec\s+([^;\s&|]+)', + r'/bin/sh\s+-c\s+["\']([^"\']+)', + r'system\(\s*["\']([^"\']+)' + ], + 'batch': [ + r'start\s+["\']?([^"\';\s]+)', + r'cmd\s*/c\s+["\']?([^"\']+)', + r'call\s+["\']?([^"\';\s]+)' + ], + 'c_cpp': [ + r'system\(\s*["\']([^"\']+)', + r'execve?\(\s*["\']([^"\']+)', + r'CreateProcess[AW]?\([^,]*["\']([^"\']+)' + ], + 'csharp': [ + r'Process\.Start\(\s*["\']([^"\']+)', + r'ProcessStartInfo.*FileName\s*=\s*["\']([^"\']+)', + r'new\s+Process.*["\']([^"\']+)' + ] + } + + if language in patterns: + for pattern in patterns[language]: + matches = re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE) + for match in matches: + process_name = match.group(1) + context = self._get_context(content, match.start(), match.end()) + + confidence = self._calculate_confidence(process_name, 'process', context) + if confidence > 0.3: # Filter low confidence matches + indicators.append(SecurityIndicator( + type='process', + value=process_name, + confidence=confidence, + context=context, + attack_technique=AttackTechnique.PROCESS_INJECTION if 'inject' in context.lower() else AttackTechnique.COMMAND_EXECUTION + )) + + return indicators + + def _extract_file_indicators(self, content: str, language: str) -> List[SecurityIndicator]: + """Extract file system indicators.""" + indicators = [] + + # File path patterns + file_patterns = [ + r'["\']([a-zA-Z]:\\[^"\'<>|*?]+\.[a-zA-Z0-9]+)["\']', # Windows paths + r'["\']([/][^"\'<>|*?\s]+\.[a-zA-Z0-9]+)["\']', # Unix paths + r'["\'](\./[^"\'<>|*?\s]+\.[a-zA-Z0-9]+)["\']', # Relative paths + r'%TEMP%\\([^"\'<>|*?\s]+\.[a-zA-Z0-9]+)', # Windows temp + r'/tmp/([^"\'<>|*?\s]+\.[a-zA-Z0-9]+)', # Unix temp + ] + + # Language-specific file operations + operation_patterns = { + 'powershell': [ + r'New-Item.*Path.*["\']([^"\']+)["\']', + r'Out-File.*["\']([^"\']+)["\']', + r'Set-Content.*["\']([^"\']+)["\']', + r'\|\s*Out-File\s+["\']([^"\']+)["\']' + ], + 'python': [ + r'open\(\s*["\']([^"\']+)["\']', + r'with\s+open\(\s*["\']([^"\']+)["\']', + r'shutil\.copy.*["\']([^"\']+)["\']' + ], + 'bash': [ + r'touch\s+["\']?([^"\';\s]+)', + r'cp\s+[^"\';\s]+\s+["\']?([^"\';\s]+)', + r'mv\s+[^"\';\s]+\s+["\']?([^"\';\s]+)', + r'echo.*>\s*["\']?([^"\';\s]+)' + ], + 'c_cpp': [ + r'fopen\(\s*["\']([^"\']+)["\']', + r'CreateFile[AW]?\([^,]*["\']([^"\']+)["\']', + r'WriteFile.*["\']([^"\']+)["\']' + ] + } + + # Extract file paths + for pattern in file_patterns: + matches = re.finditer(pattern, content, re.IGNORECASE) + for match in matches: + file_path = match.group(1) + context = self._get_context(content, match.start(), match.end()) + + confidence = self._calculate_confidence(file_path, 'file', context) + if confidence > 0.4: + indicators.append(SecurityIndicator( + type='file', + value=file_path, + confidence=confidence, + context=context, + attack_technique=AttackTechnique.FILE_CREATION + )) + + # Extract file operations + if language in operation_patterns: + for pattern in operation_patterns[language]: + matches = re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE) + for match in matches: + file_path = match.group(1) + context = self._get_context(content, match.start(), match.end()) + + confidence = self._calculate_confidence(file_path, 'file', context) + if confidence > 0.4: + indicators.append(SecurityIndicator( + type='file', + value=file_path, + confidence=confidence, + context=context, + attack_technique=AttackTechnique.FILE_CREATION + )) + + return indicators + + def _extract_network_indicators(self, content: str, language: str) -> List[SecurityIndicator]: + """Extract network communication indicators.""" + indicators = [] + + # Network patterns + network_patterns = [ + r'(?:http[s]?://)([^/\s"\']+)', # URLs + r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # IP addresses + r':(\d{2,5})\b', # Port numbers + r'Host:\s*([^\s\r\n]+)', # HTTP Host headers + r'User-Agent:\s*([^\r\n]+)', # User agents + ] + + # Language-specific network operations + operation_patterns = { + 'powershell': [ + r'Invoke-WebRequest.*Uri.*["\']([^"\']+)["\']', + r'New-Object.*WebClient.*DownloadString.*["\']([^"\']+)["\']', + r'System\.Net\.Sockets\.TcpClient.*(\d+)', + r'Connect.*(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*(\d+)' + ], + 'python': [ + r'requests\.get\(\s*["\']([^"\']+)["\']', + r'urllib\.request\.urlopen\(\s*["\']([^"\']+)["\']', + r'socket\.connect\(\s*\(["\']([^"\']+)["\'],\s*(\d+)', + r'http\.client\.HTTPConnection\(\s*["\']([^"\']+)["\']' + ], + 'bash': [ + r'wget\s+["\']?([^"\';\s]+)', + r'curl\s+["\']?([^"\';\s]+)', + r'nc\s+([^\s]+)\s+(\d+)', + r'netcat\s+([^\s]+)\s+(\d+)' + ], + 'c_cpp': [ + r'connect\([^,]*inet_addr\(["\']([^"\']+)["\']', + r'gethostbyname\(["\']([^"\']+)["\']', + r'socket\(.*SOCK_STREAM' + ] + } + + # Extract network indicators + for pattern in network_patterns: + matches = re.finditer(pattern, content, re.IGNORECASE) + for match in matches: + network_indicator = match.group(1) if len(match.groups()) > 0 else match.group(0) + context = self._get_context(content, match.start(), match.end()) + + confidence = self._calculate_confidence(network_indicator, 'network', context) + if confidence > 0.3: + indicators.append(SecurityIndicator( + type='network', + value=network_indicator, + confidence=confidence, + context=context, + attack_technique=AttackTechnique.NETWORK_CONNECTION + )) + + return indicators + + def _extract_registry_indicators(self, content: str, language: str) -> List[SecurityIndicator]: + """Extract Windows registry indicators.""" + indicators = [] + + # Registry key patterns + registry_patterns = [ + r'(HKEY_[A-Z_]+\\[^"\';\s\]]+)', + r'(HKLM\\[^"\';\s\]]+)', + r'(HKCU\\[^"\';\s\]]+)', + r'(SOFTWARE\\[^"\';\s\]]+)', + r'(SYSTEM\\[^"\';\s\]]+)' + ] + + # Language-specific registry operations + operation_patterns = { + 'powershell': [ + r'New-ItemProperty.*Path.*["\']([^"\']+)["\']', + r'Set-ItemProperty.*Path.*["\']([^"\']+)["\']', + r'Get-ItemProperty.*Path.*["\']([^"\']+)["\']', + r'Remove-ItemProperty.*Path.*["\']([^"\']+)["\']' + ], + 'batch': [ + r'reg\s+add\s+["\']?([^"\';\s]+)', + r'reg\s+query\s+["\']?([^"\';\s]+)', + r'reg\s+delete\s+["\']?([^"\';\s]+)' + ], + 'c_cpp': [ + r'RegCreateKey[Ex]?[AW]?.*["\']([^"\']+)["\']', + r'RegSetValue[Ex]?[AW]?.*["\']([^"\']+)["\']', + r'RegOpenKey[Ex]?[AW]?.*["\']([^"\']+)["\']' + ], + 'csharp': [ + r'Registry\.[^.]+\.OpenSubKey\(["\']([^"\']+)["\']', + r'RegistryKey.*["\']([^"\']+)["\']' + ] + } + + # Extract registry keys + for pattern in registry_patterns: + matches = re.finditer(pattern, content, re.IGNORECASE) + for match in matches: + reg_key = match.group(1) + context = self._get_context(content, match.start(), match.end()) + + confidence = self._calculate_confidence(reg_key, 'registry', context) + if confidence > 0.4: + indicators.append(SecurityIndicator( + type='registry', + value=reg_key, + confidence=confidence, + context=context, + attack_technique=AttackTechnique.REGISTRY_MODIFICATION + )) + + return indicators + + def _extract_command_indicators(self, content: str, language: str) -> List[SecurityIndicator]: + """Extract command-line execution indicators.""" + indicators = [] + + # Command patterns + command_patterns = [ + r'(?:cmd|powershell|bash|sh)\s+[/-]c\s+["\']?([^"\';\n]+)', + r'(?:system|exec|shell_exec)\(\s*["\']([^"\']+)["\']', + r'[`]([^`]+)[`]', # Backticks + r'\$\(([^)]+)\)', # Command substitution + ] + + for pattern in command_patterns: + matches = re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE) + for match in matches: + command = match.group(1) + context = self._get_context(content, match.start(), match.end()) + + confidence = self._calculate_confidence(command, 'command', context) + if confidence > 0.4: + # Determine attack technique based on command content + technique = AttackTechnique.COMMAND_EXECUTION + if 'powershell' in command.lower(): + technique = AttackTechnique.POWERSHELL + elif any(word in command.lower() for word in ['cmd', 'bat', 'com']): + technique = AttackTechnique.COMMAND_LINE + + indicators.append(SecurityIndicator( + type='command', + value=command, + confidence=confidence, + context=context, + attack_technique=technique + )) + + return indicators + + def _extract_encoded_content(self, content: str) -> List[str]: + """Extract and decode obfuscated/encoded content.""" + decoded_content = [] + + # Base64 patterns + base64_patterns = [ + r'["\']([A-Za-z0-9+/]{20,}={0,2})["\']', # Base64 strings + r'FromBase64String\(["\']([^"\']+)["\']', # PowerShell + r'base64\.b64decode\(["\']([^"\']+)["\']', # Python + ] + + for pattern in base64_patterns: + matches = re.finditer(pattern, content, re.IGNORECASE) + for match in matches: + try: + encoded_str = match.group(1) + if len(encoded_str) > 20: # Only decode substantial content + decoded = base64.b64decode(encoded_str + '===').decode('utf-8', errors='ignore') + if decoded and len(decoded) > 10: + decoded_content.append(decoded) + except: + continue + + # Hex patterns + hex_patterns = [ + r'0x([0-9a-fA-F]{20,})', + r'["\']([0-9a-fA-F]{20,})["\']' + ] + + for pattern in hex_patterns: + matches = re.finditer(pattern, content) + for match in matches: + try: + hex_str = match.group(1) + if len(hex_str) % 2 == 0 and len(hex_str) > 20: + decoded = binascii.unhexlify(hex_str).decode('utf-8', errors='ignore') + if decoded and len(decoded) > 10: + decoded_content.append(decoded) + except: + continue + + return decoded_content + + def _calculate_confidence(self, indicator: str, indicator_type: str, context: str) -> float: + """Calculate confidence score for an indicator.""" + confidence = 0.5 # Base confidence + + # Length and complexity scoring + if len(indicator) > 5: + confidence += 0.1 + if len(indicator) > 20: + confidence += 0.1 + + # Context-based scoring + high_confidence_keywords = [ + 'exploit', 'payload', 'shell', 'inject', 'execute', 'run', + 'attack', 'malware', 'backdoor', 'trojan', 'virus' + ] + + context_lower = context.lower() + for keyword in high_confidence_keywords: + if keyword in context_lower: + confidence += 0.1 + break + + # Type-specific scoring + if indicator_type == 'process': + if indicator.endswith('.exe') or indicator.endswith('.dll'): + confidence += 0.2 + if any(word in indicator.lower() for word in ['cmd', 'powershell', 'bash', 'sh']): + confidence += 0.1 + + elif indicator_type == 'file': + if any(ext in indicator.lower() for ext in ['.exe', '.dll', '.bat', '.ps1', '.sh']): + confidence += 0.2 + if any(path in indicator.lower() for path in ['temp', 'tmp', 'appdata']): + confidence += 0.1 + + elif indicator_type == 'network': + if re.match(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', indicator): + confidence += 0.2 + if any(tld in indicator.lower() for tld in ['.com', '.net', '.org', '.ru', '.cn']): + confidence += 0.1 + + # Apply false positive filters + if self._is_false_positive(indicator, indicator_type): + confidence *= 0.3 + + return min(confidence, 1.0) + + def _is_false_positive(self, indicator: str, indicator_type: str) -> bool: + """Check if indicator is likely a false positive.""" + + if indicator_type in self.false_positive_filters: + fp_patterns = self.false_positive_filters[indicator_type] + for pattern in fp_patterns: + if re.search(pattern, indicator, re.IGNORECASE): + return True + + return False + + def _get_context(self, content: str, start: int, end: int, window: int = 100) -> str: + """Get context around a match.""" + context_start = max(0, start - window) + context_end = min(len(content), end + window) + return content[context_start:context_end].strip() + + def _deduplicate_and_rank(self, indicators: List[SecurityIndicator]) -> List[Dict]: + """Remove duplicates and rank indicators by confidence.""" + + # Deduplicate by value + seen = set() + unique_indicators = [] + + for indicator in sorted(indicators, key=lambda x: x.confidence, reverse=True): + if indicator.value not in seen: + seen.add(indicator.value) + unique_indicators.append(indicator) + + # Convert to dict format and return top indicators + return [ + { + 'value': ind.value, + 'confidence': round(ind.confidence, 2), + 'context': ind.context[:200] + '...' if len(ind.context) > 200 else ind.context, + 'attack_technique': ind.attack_technique.value if ind.attack_technique else None + } + for ind in unique_indicators[:10] # Top 10 indicators + ] + + def _analyze_attack_behaviors(self, content: str, language: str) -> List[Dict]: + """Analyze attack behaviors and patterns.""" + behaviors = [] + + behavior_patterns = { + 'persistence': [ + r'(?:startup|autorun|registry.*run)', + r'(?:scheduled.*task|cron|at\s+\d)', + r'(?:service.*create|sc.*create)' + ], + 'defense_evasion': [ + r'(?:disable.*antivirus|kill.*av)', + r'(?:encode|encrypt|obfuscat)', + r'(?:hide|stealth|invisible)' + ], + 'credential_access': [ + r'(?:password|credential|token)', + r'(?:keylog|steal.*key)', + r'(?:mimikatz|lsass)' + ], + 'lateral_movement': [ + r'(?:psexec|wmi.*exec|remote.*exec)', + r'(?:net\s+use|mount|smb)', + r'(?:ssh|rdp|vnc)' + ], + 'exfiltration': [ + r'(?:upload|ftp|http.*post)', + r'(?:compress|zip|archive)', + r'(?:steal|exfil|extract)' + ] + } + + content_lower = content.lower() + + for behavior, patterns in behavior_patterns.items(): + score = 0 + matches = [] + + for pattern in patterns: + pattern_matches = re.findall(pattern, content_lower) + if pattern_matches: + score += len(pattern_matches) + matches.extend(pattern_matches) + + if score > 0: + behaviors.append({ + 'behavior': behavior, + 'confidence': min(score * 0.2, 1.0), + 'indicators': matches[:5] # Top 5 matches + }) + + return sorted(behaviors, key=lambda x: x['confidence'], reverse=True) + + def _map_to_mitre_attack(self, indicators: List[SecurityIndicator]) -> List[str]: + """Map indicators to MITRE ATT&CK techniques.""" + techniques = set() + + for indicator in indicators: + if indicator.attack_technique: + techniques.add(indicator.attack_technique.value) + + return sorted(list(techniques)) + + def _assess_analysis_quality(self, content: str) -> Dict[str, any]: + """Assess the quality and completeness of the analysis.""" + + # Content metrics + lines = len(content.split('\n')) + chars = len(content) + + # Indicator density + total_indicators = len(self.indicators) + high_conf_indicators = len([i for i in self.indicators if i.confidence > 0.7]) + + # Calculate quality score + content_score = min(lines / 50, 1.0) * 0.3 # More lines = better + indicator_score = min(total_indicators / 20, 1.0) * 0.4 # More indicators = better + confidence_score = (high_conf_indicators / max(total_indicators, 1)) * 0.3 # Higher confidence = better + + overall_score = content_score + indicator_score + confidence_score + + return { + 'overall_score': round(overall_score, 2), + 'content_lines': lines, + 'content_chars': chars, + 'total_indicators': total_indicators, + 'high_confidence_indicators': high_conf_indicators, + 'recommendation': self._get_quality_recommendation(overall_score) + } + + def _get_quality_recommendation(self, score: float) -> str: + """Get recommendation based on quality score.""" + if score >= 0.8: + return "High quality PoC with excellent indicator extraction" + elif score >= 0.6: + return "Good quality PoC with adequate indicators" + elif score >= 0.4: + return "Moderate quality PoC, may need additional analysis" + else: + return "Low quality PoC, limited indicators extracted" + + def _initialize_language_patterns(self) -> Dict: + """Initialize language-specific patterns.""" + return { + # Patterns for different languages will be expanded + } + + def _initialize_attack_patterns(self) -> Dict: + """Initialize attack pattern recognition.""" + return { + # Attack patterns will be expanded + } + + def _initialize_fp_filters(self) -> Dict: + """Initialize false positive filters.""" + return { + 'process': [ + r'^(explorer|notepad|calc|windir|system32)\.exe$', + r'^[a-z]$', # Single characters + r'^\d+$' # Pure numbers + ], + 'file': [ + r'^[a-z]$', + r'^\d+$', + r'^(con|aux|prn|nul)$' + ], + 'network': [ + r'^(localhost|127\.0\.0\.1|0\.0\.0\.0)$', + r'^\d{1,2}$', # Port numbers without context + r'^(example\.com|test\.com|localhost)$' + ] + } + +# Example usage +if __name__ == "__main__": + analyzer = PoCAnalyzer() + + # Example PoC content + sample_poc = """ + import subprocess + import base64 + + # CVE-2024-1234 exploit + payload = base64.b64decode("Y21kIC9jIGVjaG8gSGVsbG8gV29ybGQ=") + subprocess.call("powershell.exe -enc " + payload.decode(), shell=True) + + # Create persistence + with open("C:\\temp\\malware.exe", "wb") as f: + f.write(malicious_bytes) + + # Network connection + import socket + s = socket.socket() + s.connect(("192.168.1.100", 4444)) + """ + + result = analyzer.analyze_poc(sample_poc, "CVE-2024-1234") + print(f"Analysis result: {result}")