add poc analyzer code

This commit is contained in:
Brendan McDevitt 2025-07-16 10:15:55 -05:00
parent 06c4ed74b8
commit cf57944c7f
3 changed files with 1083 additions and 52 deletions

View file

@ -11,6 +11,7 @@ from sqlalchemy.orm import Session
import re
from llm_client import LLMClient
from cve2capec_client import CVE2CAPECClient
from poc_analyzer import PoCAnalyzer
# Configure logging
logging.basicConfig(level=logging.INFO)
@ -23,6 +24,7 @@ class EnhancedSigmaGenerator:
self.db_session = db_session
self.llm_client = LLMClient(provider=llm_provider, model=llm_model)
self.cve2capec_client = CVE2CAPECClient()
self.poc_analyzer = PoCAnalyzer()
async def generate_enhanced_rule(self, cve, use_llm: bool = True) -> dict:
"""Generate enhanced SIGMA rule for a CVE using PoC data"""
@ -134,10 +136,17 @@ class EnhancedSigmaGenerator:
logger.warning(f"No PoC content available for {cve.cve_id}")
return None
# Generate rule using LLM
# Analyze PoC content with the PoC analyzer
logger.info(f"Analyzing PoC content for {cve.cve_id} with PoCAnalyzer")
poc_analysis = self.poc_analyzer.analyze_poc(poc_content, cve.cve_id)
# Enhance the PoC content with structured analysis
enhanced_poc_content = self._format_poc_analysis_for_llm(poc_content, poc_analysis)
# Generate rule using LLM with enhanced PoC content
rule_content = await self.llm_client.generate_sigma_rule(
cve_id=cve.cve_id,
poc_content=poc_content,
poc_content=enhanced_poc_content,
cve_description=cve.description or "",
existing_rule=None
)
@ -234,6 +243,120 @@ class EnhancedSigmaGenerator:
return None
def _format_poc_analysis_for_llm(self, original_poc_content: str, poc_analysis: dict) -> str:
"""Format PoC analysis results for LLM consumption"""
# Extract key findings from analysis
language = poc_analysis.get('language', 'unknown')
quality_score = poc_analysis.get('quality_score', {})
mitre_techniques = poc_analysis.get('mitre_techniques', [])
behaviors = poc_analysis.get('behaviors', [])
# Extract indicators
processes = poc_analysis.get('processes', [])
files = poc_analysis.get('files', [])
network = poc_analysis.get('network', [])
registry = poc_analysis.get('registry', [])
commands = poc_analysis.get('commands', [])
# Build enhanced content for LLM
enhanced_content = f"""**ORIGINAL POC CODE:**
{original_poc_content[:2000]}
**STRUCTURED POC ANALYSIS:**
**Language Detected:** {language}
**Security Indicators Extracted:**
**Process Execution Indicators:**
{self._format_indicators_for_display(processes)}
**File System Indicators:**
{self._format_indicators_for_display(files)}
**Network Communication Indicators:**
{self._format_indicators_for_display(network)}
**Registry Modification Indicators:**
{self._format_indicators_for_display(registry)}
**Command Execution Indicators:**
{self._format_indicators_for_display(commands)}
**MITRE ATT&CK Techniques Detected:**
{self._format_mitre_techniques_for_display(mitre_techniques)}
**Attack Behaviors Identified:**
{self._format_behaviors_for_display(behaviors)}
**Analysis Quality:**
- Overall Score: {quality_score.get('overall_score', 0)}/1.0
- Total Indicators: {poc_analysis.get('total_indicators', 0)}
- High Confidence Indicators: {poc_analysis.get('high_confidence_indicators', 0)}
- Recommendation: {quality_score.get('recommendation', 'Unknown')}
**DETECTION GUIDANCE:**
Use the above structured indicators to create specific SIGMA detection patterns. Focus on the high-confidence indicators and behaviors for the most accurate detection rules."""
return enhanced_content
def _format_indicators_for_display(self, indicators: list) -> str:
"""Format indicators for LLM display"""
if not indicators:
return "- None detected"
formatted = []
for indicator in indicators[:5]: # Limit to top 5 indicators
if isinstance(indicator, dict):
value = indicator.get('value', str(indicator))
confidence = indicator.get('confidence', 0)
attack_technique = indicator.get('attack_technique')
technique_info = f" (MITRE: {attack_technique})" if attack_technique else ""
formatted.append(f"- {value} (confidence: {confidence:.2f}){technique_info}")
else:
formatted.append(f"- {indicator}")
if len(indicators) > 5:
formatted.append(f"- ... and {len(indicators) - 5} more indicators")
return "\n".join(formatted)
def _format_mitre_techniques_for_display(self, techniques: list) -> str:
"""Format MITRE ATT&CK techniques for display"""
if not techniques:
return "- None detected"
formatted = []
for technique in techniques:
# Get technique name if available
technique_name = self.cve2capec_client.get_technique_name(technique) if hasattr(self, 'cve2capec_client') else ""
if technique_name:
formatted.append(f"- {technique}: {technique_name}")
else:
formatted.append(f"- {technique}")
return "\n".join(formatted)
def _format_behaviors_for_display(self, behaviors: list) -> str:
"""Format attack behaviors for display"""
if not behaviors:
return "- None detected"
formatted = []
for behavior in behaviors:
if isinstance(behavior, dict):
behavior_type = behavior.get('behavior', 'unknown')
confidence = behavior.get('confidence', 0)
indicators = behavior.get('indicators', [])
formatted.append(f"- {behavior_type.replace('_', ' ').title()} (confidence: {confidence:.2f})")
if indicators:
formatted.append(f" Indicators: {', '.join(indicators[:3])}")
else:
formatted.append(f"- {behavior}")
return "\n".join(formatted)
def _extract_log_source_from_content(self, rule_content: str) -> str:
"""Extract log source from the generated rule content"""
try:
@ -467,7 +590,7 @@ class EnhancedSigmaGenerator:
if poc.get('html_url'):
refs.append(poc['html_url'])
return '\\n'.join(f" - {ref}" for ref in refs)
return '\n'.join(f" - {ref}" for ref in refs)
def _generate_tags(self, cve, poc_data: list) -> str:
"""Generate MITRE ATT&CK tags and other tags using CVE2CAPEC mappings"""
@ -488,21 +611,9 @@ class EnhancedSigmaGenerator:
if attack_tag not in tags:
tags.append(attack_tag)
else:
# Fallback to indicator-based technique detection
logger.info(f"No CVE2CAPEC mapping found for {cve.cve_id}, using indicator-based detection")
combined_indicators = self._combine_exploit_indicators(poc_data)
if combined_indicators.get('processes'):
tags.append('attack.t1059') # Command and Scripting Interpreter
if combined_indicators.get('network'):
tags.append('attack.t1071') # Application Layer Protocol
if combined_indicators.get('files'):
tags.append('attack.t1105') # Ingress Tool Transfer
if any('powershell' in p.lower() for p in combined_indicators.get('processes', [])):
tags.append('attack.t1059.001') # PowerShell
# No CVE2CAPEC mapping found - do not add fallback techniques
logger.warning(f"No CVE2CAPEC mapping found for {cve.cve_id}, no MITRE techniques will be added")
# Note: LLM will rely on the PoC analysis to determine appropriate techniques
# Get CWE codes for additional context
cwe_codes = self.cve2capec_client.get_cwe_for_cve(cve.cve_id)
@ -518,17 +629,18 @@ class EnhancedSigmaGenerator:
quality_tier = best_poc.get('quality_analysis', {}).get('quality_tier', 'poor')
tags.append(f'poc.quality.{quality_tier}')
# Return tags as a single line for first tag, then additional tags on new lines
# Return tags as YAML array format
if not tags:
return "unknown"
if len(tags) == 1:
return tags[0]
else:
# First tag goes directly after the dash, rest are on new lines
first_tag = tags[0]
additional_tags = '\\n'.join(f" - {tag}" for tag in tags[1:])
return f"{first_tag}\\n{additional_tags}"
# Format as proper YAML array
formatted_tags = []
for tag in tags:
formatted_tags.append(f" - {tag}")
return '\n'.join(formatted_tags)
def _format_indicators(self, indicators: list) -> str:
"""Format indicators for SIGMA rule"""
@ -546,7 +658,7 @@ class EnhancedSigmaGenerator:
escaped = cleaned.replace('\\\\', '\\\\\\\\').replace('*', '\\\\*').replace('?', '\\\\?')
formatted.append(f' - "{escaped}"')
return '\\n'.join(formatted) if formatted else ' - "*" # No valid indicators'
return '\n'.join(formatted) if formatted else ' - "*" # No valid indicators'
def _enhance_detection_logic(self, rule_content: str, indicators: dict, poc_data: list) -> str:
"""Enhance detection logic based on PoC quality and indicators"""
@ -566,7 +678,7 @@ class EnhancedSigmaGenerator:
# Insert before the condition line
rule_content = rule_content.replace(
'condition: selection',
additional_condition + '\\n condition: selection or process_and_command'
additional_condition + '\n condition: selection or process_and_command'
)
return rule_content

View file

@ -278,13 +278,47 @@ class LLMClient:
- status: experimental
- description: Specific description based on CVE and PoC analysis
- author: 'AI Generated'
- date: Current date (2025/01/14)
- date: Current date (2025/01/16)
- references: Include the EXACT CVE URL with the CVE ID provided by the user
- tags: Relevant MITRE ATT&CK techniques based on PoC analysis
- logsource: Appropriate category based on exploit type
- detection: Specific indicators from PoC analysis (NOT generic examples)
- condition: Logic connecting the detection selections
**MITRE ATT&CK TAGS FORMAT REQUIREMENTS:**
- Use ONLY the MITRE ATT&CK techniques provided in the "MITRE ATT&CK TECHNIQUE MAPPINGS" section above
- Convert technique IDs to lowercase attack.t format (e.g., T1134 becomes attack.t1134)
- Include specific sub-techniques when available (e.g., T1134.001 becomes attack.t1134.001)
- DO NOT use generic techniques not listed in the mappings
- DO NOT add additional techniques based on your training data
**CRITICAL:** ONLY use the MITRE ATT&CK techniques explicitly provided in the technique mappings above. Do not add any other techniques.
**COMPLETE SIGMA RULE EXAMPLE (TECHNIQUE TAGS MUST MATCH PROVIDED MAPPINGS):**
```yaml
title: 'CVE-2024-XXXX Detection Rule'
id: a1b2c3d4-e5f6-7890-abcd-ef1234567890
status: experimental
description: 'Detection for CVE-2024-XXXX vulnerability'
author: 'AI Generated'
date: 2025/01/16
references:
- https://nvd.nist.gov/vuln/detail/CVE-2024-XXXX
tags:
- attack.t1134 # Access Token Manipulation (example - use actual mappings)
- attack.t1134.001 # Token Impersonation/Theft (example - use actual mappings)
logsource:
category: process_creation
product: windows
detection:
selection:
Image|contains: 'specific_indicator'
condition: selection
level: medium
```
**IMPORTANT:** The tags section above is just an example format. You MUST use the exact techniques provided in the MITRE ATT&CK TECHNIQUE MAPPINGS section for the specific CVE you're analyzing.
**CRITICAL ANTI-HALLUCINATION RULES:**
1. You MUST use the EXACT CVE ID provided in the user input - NEVER generate a different CVE ID
2. NEVER use example CVE IDs like CVE-2022-1234, CVE-2023-5678, or CVE-2024-1234
@ -323,7 +357,14 @@ Enhance this rule with PoC insights. Output only valid SIGMA YAML starting with
**MITRE ATT&CK TECHNIQUE MAPPINGS FOR {cve_id}:**
{chr(10).join(technique_details)}
**IMPORTANT:** Use these exact MITRE ATT&CK techniques in your tags section. Convert them to lowercase attack.t format (e.g., T1059 becomes attack.t1059)."""
**CRITICAL REQUIREMENT:** Use ONLY these exact MITRE ATT&CK techniques in your tags section. Convert them to lowercase attack.t format (e.g., T1134 becomes attack.t1134, T1134.001 becomes attack.t1134.001).
**ABSOLUTELY FORBIDDEN:**
- Do not use T1059, T1071, T1105, T1055, T1068, T1140, T1036, T1112, T1547 or any other techniques not listed above
- Do not add techniques based on PoC analysis if they're not in the provided mappings
- Do not use generic techniques from your training data
If no MITRE techniques are provided above, use only CVE and CWE tags."""
if mitre_mappings['cwe_codes']:
mitre_suggestions += f"""
@ -344,10 +385,25 @@ Enhance this rule with PoC insights. Output only valid SIGMA YAML starting with
1. Use EXACTLY this CVE ID in the title: {{cve_id}}
2. Use EXACTLY this CVE URL in references: https://nvd.nist.gov/vuln/detail/{{cve_id}}
3. Analyze the CVE description to understand the vulnerability type
4. Extract specific indicators from the PoC code (files, processes, commands, network patterns)
5. Create detection logic based on the actual exploit behavior
6. Use relevant logsource category (process_creation, file_event, network_connection, etc.)
7. Include the MITRE ATT&CK tags listed above in your tags section (convert to attack.t format)
4. If the PoC analysis above contains structured indicators, use those EXACT indicators in your detection rules
5. **USE ONLY THE MITRE ATT&CK TECHNIQUES LISTED IN THE MAPPINGS ABOVE** - Do not add any other techniques
6. Choose the appropriate logsource category based on the primary indicator types (process_creation, file_event, network_connection, registry_event, etc.)
7. Convert the mapped MITRE techniques to lowercase attack.t format (T1134 attack.t1134, T1134.001 attack.t1134.001)
**DETECTION PATTERN GUIDANCE:**
- For Process Execution indicators: Use Image, CommandLine, or ProcessName fields
- For File System indicators: Use TargetFilename, SourceFilename, or FilePath fields
- For Network indicators: Use DestinationHostname, DestinationIp, or DestinationPort fields
- For Registry indicators: Use TargetObject, Details, or EventType fields
- For Command indicators: Use CommandLine or ProcessCommandLine fields
**TAGS FORMATTING REQUIREMENTS:**
- Use ONLY the MITRE ATT&CK techniques provided in the "MITRE ATT&CK TECHNIQUE MAPPINGS" section above
- Convert to lowercase attack.t format: T1134 attack.t1134, T1134.001 attack.t1134.001
- Include comments for clarity: attack.t1134 # Access Token Manipulation
- Use specific sub-techniques when available
- DO NOT add techniques not listed in the provided mappings
- DO NOT use generic techniques from your training data
**CRITICAL ANTI-HALLUCINATION REQUIREMENTS:**
- THE CVE ID IS: {{cve_id}}
@ -355,6 +411,7 @@ Enhance this rule with PoC insights. Output only valid SIGMA YAML starting with
- DO NOT generate a different CVE ID from your training data
- You MUST use the exact CVE ID "{{cve_id}}" - this is the ONLY acceptable CVE ID for this rule
- Base your analysis ONLY on the provided CVE description and PoC code above
- If structured indicators are provided in the PoC analysis, use those exact values
- Do not reference other vulnerabilities or exploits not mentioned in the provided content
- NEVER use placeholder CVE IDs like CVE-YYYY-NNNN or CVE-2022-1234
@ -741,30 +798,36 @@ Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE
stripped = line.strip()
# Check for orphaned list items (lines starting with - but not part of an array)
# But be more careful - don't remove items that are properly indented under a parent
if (stripped.startswith('- ') and
i > 0 and
not lines[i-1].strip().endswith(':') and
':' not in stripped and
not stripped.startswith('- https://')): # Don't remove reference URLs
not stripped.startswith('- https://') and # Don't remove reference URLs
not stripped.startswith('- attack.') and # Don't remove MITRE ATT&CK tags
not re.match(r'- [a-z0-9._-]+$', stripped)): # Don't remove simple tags
# Check if this looks like a MITRE ATT&CK tag
if re.match(r'- T\d{4}', stripped):
# Try to find the tags section and add it there
tags_line_found = False
for j in range(len(fixed_lines)-1, -1, -1):
if fixed_lines[j].strip().startswith('tags:'):
# This is an orphaned tag, add it to the tags array
fixed_lines.append(f" {stripped}")
fixes_applied.append(f"Fixed orphaned MITRE tag: {stripped}")
tags_line_found = True
break
# Check if this is properly indented under a parent (like tags:)
is_properly_indented = False
current_indent = len(line) - len(line.lstrip())
# Look backwards to find a parent with less indentation
for j in range(i-1, -1, -1):
prev_line = lines[j]
prev_stripped = prev_line.strip()
prev_indent = len(prev_line) - len(prev_line.lstrip())
if not tags_line_found:
# No tags section found, remove the orphaned item
fixes_applied.append(f"Removed orphaned tag (no tags section): {stripped}")
continue
else:
# Other orphaned list items, remove them
if prev_stripped and prev_indent < current_indent:
# Found a parent with less indentation
if prev_stripped.endswith(':'):
is_properly_indented = True
break
else:
# This is likely orphaned
break
if not is_properly_indented:
# This is truly orphaned, remove it
fixes_applied.append(f"Removed orphaned list item: {stripped}")
continue
@ -825,8 +888,17 @@ Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE
except yaml.YAMLError as e2:
logger.warning(f"YAML repair attempt failed: {e2}")
# Last resort: try to build a minimal valid SIGMA rule
return self._build_minimal_valid_rule(content, fixes_applied)
# Try a more aggressive repair before falling back to minimal rule
aggressive_repair = self._aggressive_yaml_repair(content)
try:
yaml.safe_load(aggressive_repair)
fixes_applied.append("Applied aggressive YAML repair")
logger.info("Successfully repaired YAML with aggressive method")
return aggressive_repair
except yaml.YAMLError as e3:
logger.warning(f"Aggressive repair also failed: {e3}")
# Last resort: try to build a minimal valid SIGMA rule
return self._build_minimal_valid_rule(content, fixes_applied)
def _repair_yaml_structure(self, content: str, error_msg: str) -> str:
"""Attempt to repair common YAML structural issues."""
@ -837,6 +909,8 @@ Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE
expected_indent = 0
in_detection = False
detection_indent = 0
in_tags = False
tags_indent = 0
for i, line in enumerate(lines):
stripped = line.strip()
@ -847,6 +921,24 @@ Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE
repaired_lines.append(line)
continue
# Track if we're in the tags section
if stripped.startswith('tags:'):
in_tags = True
tags_indent = current_indent
repaired_lines.append(line)
continue
elif in_tags and current_indent <= tags_indent and not stripped.startswith('-'):
# We've left the tags section
in_tags = False
# Fix tags section indentation
if in_tags and stripped.startswith('-'):
# Ensure proper indentation for tag items
if current_indent <= tags_indent:
corrected_line = ' ' * (tags_indent + 2) + stripped
repaired_lines.append(corrected_line)
continue
# Track if we're in the detection section
if stripped.startswith('detection:'):
in_detection = True
@ -875,6 +967,21 @@ Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE
repaired_lines.append(corrected_line)
continue
# Fix logsource section indentation
if stripped.startswith('logsource:'):
# Logsource should be at root level (no indentation)
if current_indent > 0:
corrected_line = stripped
repaired_lines.append(corrected_line)
continue
elif line.lstrip().startswith(('category:', 'product:', 'service:')) and i > 0:
# These should be indented under logsource
prev_line = lines[i-1].strip()
if prev_line.startswith('logsource:') or any('logsource' in repaired_lines[j] for j in range(max(0, len(repaired_lines)-5), len(repaired_lines))):
corrected_line = ' ' + stripped
repaired_lines.append(corrected_line)
continue
# Fix lines that start with wrong indentation
if ':' in stripped and not stripped.startswith('-'):
# This is a key-value pair
@ -891,6 +998,85 @@ Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE
return '\n'.join(repaired_lines)
def _aggressive_yaml_repair(self, content: str) -> str:
"""Aggressive YAML repair that reconstructs the document structure."""
lines = content.split('\n')
# Extract key components
title = "Generated SIGMA Rule"
rule_id = "00000000-0000-0000-0000-000000000000"
description = "Generated detection rule"
author = "AI Generated"
date = "2025/01/16"
references = []
tags = []
logsource_category = "process_creation"
logsource_product = "windows"
detection_rules = []
condition = "selection"
level = "medium"
# Parse existing content
for i, line in enumerate(lines):
stripped = line.strip()
if stripped.startswith('title:'):
title = stripped.split(':', 1)[1].strip().strip('"\'')
elif stripped.startswith('id:'):
rule_id = stripped.split(':', 1)[1].strip().strip('"\'')
elif stripped.startswith('description:'):
description = stripped.split(':', 1)[1].strip().strip('"\'')
elif stripped.startswith('author:'):
author = stripped.split(':', 1)[1].strip().strip('"\'')
elif stripped.startswith('date:'):
date = stripped.split(':', 1)[1].strip().strip('"\'')
elif stripped.startswith('level:'):
level = stripped.split(':', 1)[1].strip().strip('"\'')
elif stripped.startswith('condition:'):
condition = stripped.split(':', 1)[1].strip().strip('"\'')
elif stripped.startswith('- http'):
references.append(stripped[2:].strip())
elif stripped.startswith('- attack.') or stripped.startswith('- cve-') or stripped.startswith('- exploit.') or stripped.startswith('- poc.') or stripped.startswith('- cwe.'):
tags.append(stripped[2:].strip())
elif 'category:' in stripped:
logsource_category = stripped.split(':', 1)[1].strip().strip('"\'')
elif 'product:' in stripped:
logsource_product = stripped.split(':', 1)[1].strip().strip('"\'')
# Build a clean YAML structure
yaml_content = f"""title: '{title}'
id: {rule_id}
status: experimental
description: '{description}'
author: '{author}'
date: {date}
references:"""
if references:
for ref in references:
yaml_content += f"\n - {ref}"
else:
yaml_content += "\n - https://example.com"
yaml_content += "\ntags:"
if tags:
for tag in tags:
yaml_content += f"\n - {tag}"
else:
yaml_content += "\n - unknown"
yaml_content += f"""
logsource:
category: {logsource_category}
product: {logsource_product}
detection:
selection:
Image: '*'
condition: {condition}
level: {level}"""
return yaml_content
def _build_minimal_valid_rule(self, content: str, fixes_applied: list) -> str:
"""Build a minimal valid SIGMA rule from the content."""
lines = content.split('\n')
@ -915,7 +1101,7 @@ id: {rule_id}
status: experimental
description: '{description}'
author: 'AI Generated'
date: 2025/01/14
date: 2025/01/16
references:
- https://example.com
logsource:

733
backend/poc_analyzer.py Executable file
View file

@ -0,0 +1,733 @@
"""
Advanced PoC (Proof of Concept) analyzer for extracting security indicators
from exploit code across multiple programming languages and attack vectors.
"""
import re
import base64
import binascii
from typing import Dict, List, Set, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class AttackTechnique(Enum):
PROCESS_INJECTION = "T1055"
COMMAND_EXECUTION = "T1059"
POWERSHELL = "T1059.001"
COMMAND_LINE = "T1059.003"
FILE_CREATION = "T1105"
REGISTRY_MODIFICATION = "T1112"
NETWORK_CONNECTION = "T1071"
PRIVILEGE_ESCALATION = "T1068"
DLL_INJECTION = "T1055.001"
PROCESS_HOLLOWING = "T1055.012"
SERVICE_CREATION = "T1543.003"
@dataclass
class SecurityIndicator:
"""Represents a security indicator extracted from PoC code."""
type: str # process, file, network, registry, command
value: str
confidence: float # 0.0 to 1.0
context: str # surrounding code context
attack_technique: Optional[AttackTechnique] = None
metadata: Dict = None
class PoCAnalyzer:
"""Advanced analyzer for extracting security indicators from PoC code."""
def __init__(self):
self.indicators: List[SecurityIndicator] = []
self.language_patterns = self._initialize_language_patterns()
self.attack_patterns = self._initialize_attack_patterns()
self.false_positive_filters = self._initialize_fp_filters()
def analyze_poc(self, poc_content: str, cve_id: str = None) -> Dict[str, any]:
"""
Main analysis function that extracts all security indicators.
Args:
poc_content: The PoC source code
cve_id: Optional CVE identifier for context
Returns:
Dictionary containing categorized indicators and analysis
"""
self.indicators = []
# Detect programming language
language = self._detect_language(poc_content)
# Extract indicators by category
processes = self._extract_process_indicators(poc_content, language)
files = self._extract_file_indicators(poc_content, language)
network = self._extract_network_indicators(poc_content, language)
registry = self._extract_registry_indicators(poc_content, language)
commands = self._extract_command_indicators(poc_content, language)
# Extract encoded/obfuscated content
decoded_content = self._extract_encoded_content(poc_content)
if decoded_content:
# Recursively analyze decoded content
for content in decoded_content:
sub_analysis = self.analyze_poc(content)
processes.extend(sub_analysis['processes'])
files.extend(sub_analysis['files'])
network.extend(sub_analysis['network'])
registry.extend(sub_analysis['registry'])
commands.extend(sub_analysis['commands'])
# Behavioral analysis
behaviors = self._analyze_attack_behaviors(poc_content, language)
# MITRE ATT&CK technique mapping
techniques = self._map_to_mitre_attack(
processes + files + network + registry + commands
)
# Quality assessment
analysis_quality = self._assess_analysis_quality(poc_content)
return {
'language': language,
'processes': self._deduplicate_and_rank(processes),
'files': self._deduplicate_and_rank(files),
'network': self._deduplicate_and_rank(network),
'registry': self._deduplicate_and_rank(registry),
'commands': self._deduplicate_and_rank(commands),
'behaviors': behaviors,
'mitre_techniques': techniques,
'quality_score': analysis_quality,
'total_indicators': len(self.indicators),
'high_confidence_indicators': len([i for i in self.indicators if i.confidence > 0.7])
}
def _detect_language(self, content: str) -> str:
"""Detect the primary programming language of the PoC."""
language_indicators = {
'powershell': [
r'\$[a-zA-Z_][a-zA-Z0-9_]*', r'Get-\w+', r'Set-\w+', r'New-\w+',
r'Invoke-\w+', r'Add-Type', r'\[System\.\w+\]'
],
'python': [
r'import\s+\w+', r'from\s+\w+\s+import', r'def\s+\w+\(',
r'subprocess\.', r'os\.system', r'__name__\s*==\s*["\']__main__["\']'
],
'bash': [
r'#!/bin/bash', r'#!/bin/sh', r'\$\{[^}]+\}', r'chmod\s+\+x',
r'wget\s+', r'curl\s+', r'echo\s+.*\|'
],
'batch': [
r'@echo\s+off', r'%[^%]+%', r'goto\s+\w+', r'if\s+exist',
r'cmd\s*/c', r'start\s+'
],
'c_cpp': [
r'#include\s*<[^>]+>', r'int\s+main\s*\(', r'printf\s*\(',
r'malloc\s*\(', r'free\s*\(', r'system\s*\('
],
'csharp': [
r'using\s+System', r'namespace\s+\w+', r'class\s+\w+',
r'Process\.Start', r'Registry\.', r'new\s+ProcessStartInfo'
],
'javascript': [
r'function\s+\w+\s*\(', r'var\s+\w+\s*=', r'console\.log',
r'require\s*\(', r'=>', r'new\s+XMLHttpRequest'
],
'php': [
r'<\?php', r'\$[a-zA-Z_][a-zA-Z0-9_]*', r'echo\s+',
r'exec\s*\(', r'system\s*\(', r'shell_exec'
]
}
scores = {}
content_lower = content.lower()
for lang, patterns in language_indicators.items():
score = 0
for pattern in patterns:
matches = len(re.findall(pattern, content, re.IGNORECASE | re.MULTILINE))
score += matches
scores[lang] = score
if not scores or max(scores.values()) == 0:
return 'unknown'
return max(scores, key=scores.get)
def _extract_process_indicators(self, content: str, language: str) -> List[SecurityIndicator]:
"""Extract process execution indicators."""
indicators = []
patterns = {
'powershell': [
r'Start-Process\s+["\']?([^"\';\s]+)',
r'Invoke-Expression\s+["\']?([^"\';\s]+)',
r'&\s+["\']?([^"\';\s]+\.exe)',
r'\.\s+["\']?([^"\';\s]+\.exe)'
],
'python': [
r'subprocess\.call\(\s*["\']([^"\']+)',
r'subprocess\.Popen\(\s*["\']([^"\']+)',
r'os\.system\(\s*["\']([^"\']+)',
r'os\.exec[vl]?p?\(\s*["\']([^"\']+)'
],
'bash': [
r'exec\s+([^;\s&|]+)',
r'/bin/sh\s+-c\s+["\']([^"\']+)',
r'system\(\s*["\']([^"\']+)'
],
'batch': [
r'start\s+["\']?([^"\';\s]+)',
r'cmd\s*/c\s+["\']?([^"\']+)',
r'call\s+["\']?([^"\';\s]+)'
],
'c_cpp': [
r'system\(\s*["\']([^"\']+)',
r'execve?\(\s*["\']([^"\']+)',
r'CreateProcess[AW]?\([^,]*["\']([^"\']+)'
],
'csharp': [
r'Process\.Start\(\s*["\']([^"\']+)',
r'ProcessStartInfo.*FileName\s*=\s*["\']([^"\']+)',
r'new\s+Process.*["\']([^"\']+)'
]
}
if language in patterns:
for pattern in patterns[language]:
matches = re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE)
for match in matches:
process_name = match.group(1)
context = self._get_context(content, match.start(), match.end())
confidence = self._calculate_confidence(process_name, 'process', context)
if confidence > 0.3: # Filter low confidence matches
indicators.append(SecurityIndicator(
type='process',
value=process_name,
confidence=confidence,
context=context,
attack_technique=AttackTechnique.PROCESS_INJECTION if 'inject' in context.lower() else AttackTechnique.COMMAND_EXECUTION
))
return indicators
def _extract_file_indicators(self, content: str, language: str) -> List[SecurityIndicator]:
"""Extract file system indicators."""
indicators = []
# File path patterns
file_patterns = [
r'["\']([a-zA-Z]:\\[^"\'<>|*?]+\.[a-zA-Z0-9]+)["\']', # Windows paths
r'["\']([/][^"\'<>|*?\s]+\.[a-zA-Z0-9]+)["\']', # Unix paths
r'["\'](\./[^"\'<>|*?\s]+\.[a-zA-Z0-9]+)["\']', # Relative paths
r'%TEMP%\\([^"\'<>|*?\s]+\.[a-zA-Z0-9]+)', # Windows temp
r'/tmp/([^"\'<>|*?\s]+\.[a-zA-Z0-9]+)', # Unix temp
]
# Language-specific file operations
operation_patterns = {
'powershell': [
r'New-Item.*Path.*["\']([^"\']+)["\']',
r'Out-File.*["\']([^"\']+)["\']',
r'Set-Content.*["\']([^"\']+)["\']',
r'\|\s*Out-File\s+["\']([^"\']+)["\']'
],
'python': [
r'open\(\s*["\']([^"\']+)["\']',
r'with\s+open\(\s*["\']([^"\']+)["\']',
r'shutil\.copy.*["\']([^"\']+)["\']'
],
'bash': [
r'touch\s+["\']?([^"\';\s]+)',
r'cp\s+[^"\';\s]+\s+["\']?([^"\';\s]+)',
r'mv\s+[^"\';\s]+\s+["\']?([^"\';\s]+)',
r'echo.*>\s*["\']?([^"\';\s]+)'
],
'c_cpp': [
r'fopen\(\s*["\']([^"\']+)["\']',
r'CreateFile[AW]?\([^,]*["\']([^"\']+)["\']',
r'WriteFile.*["\']([^"\']+)["\']'
]
}
# Extract file paths
for pattern in file_patterns:
matches = re.finditer(pattern, content, re.IGNORECASE)
for match in matches:
file_path = match.group(1)
context = self._get_context(content, match.start(), match.end())
confidence = self._calculate_confidence(file_path, 'file', context)
if confidence > 0.4:
indicators.append(SecurityIndicator(
type='file',
value=file_path,
confidence=confidence,
context=context,
attack_technique=AttackTechnique.FILE_CREATION
))
# Extract file operations
if language in operation_patterns:
for pattern in operation_patterns[language]:
matches = re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE)
for match in matches:
file_path = match.group(1)
context = self._get_context(content, match.start(), match.end())
confidence = self._calculate_confidence(file_path, 'file', context)
if confidence > 0.4:
indicators.append(SecurityIndicator(
type='file',
value=file_path,
confidence=confidence,
context=context,
attack_technique=AttackTechnique.FILE_CREATION
))
return indicators
def _extract_network_indicators(self, content: str, language: str) -> List[SecurityIndicator]:
"""Extract network communication indicators."""
indicators = []
# Network patterns
network_patterns = [
r'(?:http[s]?://)([^/\s"\']+)', # URLs
r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # IP addresses
r':(\d{2,5})\b', # Port numbers
r'Host:\s*([^\s\r\n]+)', # HTTP Host headers
r'User-Agent:\s*([^\r\n]+)', # User agents
]
# Language-specific network operations
operation_patterns = {
'powershell': [
r'Invoke-WebRequest.*Uri.*["\']([^"\']+)["\']',
r'New-Object.*WebClient.*DownloadString.*["\']([^"\']+)["\']',
r'System\.Net\.Sockets\.TcpClient.*(\d+)',
r'Connect.*(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*(\d+)'
],
'python': [
r'requests\.get\(\s*["\']([^"\']+)["\']',
r'urllib\.request\.urlopen\(\s*["\']([^"\']+)["\']',
r'socket\.connect\(\s*\(["\']([^"\']+)["\'],\s*(\d+)',
r'http\.client\.HTTPConnection\(\s*["\']([^"\']+)["\']'
],
'bash': [
r'wget\s+["\']?([^"\';\s]+)',
r'curl\s+["\']?([^"\';\s]+)',
r'nc\s+([^\s]+)\s+(\d+)',
r'netcat\s+([^\s]+)\s+(\d+)'
],
'c_cpp': [
r'connect\([^,]*inet_addr\(["\']([^"\']+)["\']',
r'gethostbyname\(["\']([^"\']+)["\']',
r'socket\(.*SOCK_STREAM'
]
}
# Extract network indicators
for pattern in network_patterns:
matches = re.finditer(pattern, content, re.IGNORECASE)
for match in matches:
network_indicator = match.group(1) if len(match.groups()) > 0 else match.group(0)
context = self._get_context(content, match.start(), match.end())
confidence = self._calculate_confidence(network_indicator, 'network', context)
if confidence > 0.3:
indicators.append(SecurityIndicator(
type='network',
value=network_indicator,
confidence=confidence,
context=context,
attack_technique=AttackTechnique.NETWORK_CONNECTION
))
return indicators
def _extract_registry_indicators(self, content: str, language: str) -> List[SecurityIndicator]:
"""Extract Windows registry indicators."""
indicators = []
# Registry key patterns
registry_patterns = [
r'(HKEY_[A-Z_]+\\[^"\';\s\]]+)',
r'(HKLM\\[^"\';\s\]]+)',
r'(HKCU\\[^"\';\s\]]+)',
r'(SOFTWARE\\[^"\';\s\]]+)',
r'(SYSTEM\\[^"\';\s\]]+)'
]
# Language-specific registry operations
operation_patterns = {
'powershell': [
r'New-ItemProperty.*Path.*["\']([^"\']+)["\']',
r'Set-ItemProperty.*Path.*["\']([^"\']+)["\']',
r'Get-ItemProperty.*Path.*["\']([^"\']+)["\']',
r'Remove-ItemProperty.*Path.*["\']([^"\']+)["\']'
],
'batch': [
r'reg\s+add\s+["\']?([^"\';\s]+)',
r'reg\s+query\s+["\']?([^"\';\s]+)',
r'reg\s+delete\s+["\']?([^"\';\s]+)'
],
'c_cpp': [
r'RegCreateKey[Ex]?[AW]?.*["\']([^"\']+)["\']',
r'RegSetValue[Ex]?[AW]?.*["\']([^"\']+)["\']',
r'RegOpenKey[Ex]?[AW]?.*["\']([^"\']+)["\']'
],
'csharp': [
r'Registry\.[^.]+\.OpenSubKey\(["\']([^"\']+)["\']',
r'RegistryKey.*["\']([^"\']+)["\']'
]
}
# Extract registry keys
for pattern in registry_patterns:
matches = re.finditer(pattern, content, re.IGNORECASE)
for match in matches:
reg_key = match.group(1)
context = self._get_context(content, match.start(), match.end())
confidence = self._calculate_confidence(reg_key, 'registry', context)
if confidence > 0.4:
indicators.append(SecurityIndicator(
type='registry',
value=reg_key,
confidence=confidence,
context=context,
attack_technique=AttackTechnique.REGISTRY_MODIFICATION
))
return indicators
def _extract_command_indicators(self, content: str, language: str) -> List[SecurityIndicator]:
"""Extract command-line execution indicators."""
indicators = []
# Command patterns
command_patterns = [
r'(?:cmd|powershell|bash|sh)\s+[/-]c\s+["\']?([^"\';\n]+)',
r'(?:system|exec|shell_exec)\(\s*["\']([^"\']+)["\']',
r'[`]([^`]+)[`]', # Backticks
r'\$\(([^)]+)\)', # Command substitution
]
for pattern in command_patterns:
matches = re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE)
for match in matches:
command = match.group(1)
context = self._get_context(content, match.start(), match.end())
confidence = self._calculate_confidence(command, 'command', context)
if confidence > 0.4:
# Determine attack technique based on command content
technique = AttackTechnique.COMMAND_EXECUTION
if 'powershell' in command.lower():
technique = AttackTechnique.POWERSHELL
elif any(word in command.lower() for word in ['cmd', 'bat', 'com']):
technique = AttackTechnique.COMMAND_LINE
indicators.append(SecurityIndicator(
type='command',
value=command,
confidence=confidence,
context=context,
attack_technique=technique
))
return indicators
def _extract_encoded_content(self, content: str) -> List[str]:
"""Extract and decode obfuscated/encoded content."""
decoded_content = []
# Base64 patterns
base64_patterns = [
r'["\']([A-Za-z0-9+/]{20,}={0,2})["\']', # Base64 strings
r'FromBase64String\(["\']([^"\']+)["\']', # PowerShell
r'base64\.b64decode\(["\']([^"\']+)["\']', # Python
]
for pattern in base64_patterns:
matches = re.finditer(pattern, content, re.IGNORECASE)
for match in matches:
try:
encoded_str = match.group(1)
if len(encoded_str) > 20: # Only decode substantial content
decoded = base64.b64decode(encoded_str + '===').decode('utf-8', errors='ignore')
if decoded and len(decoded) > 10:
decoded_content.append(decoded)
except:
continue
# Hex patterns
hex_patterns = [
r'0x([0-9a-fA-F]{20,})',
r'["\']([0-9a-fA-F]{20,})["\']'
]
for pattern in hex_patterns:
matches = re.finditer(pattern, content)
for match in matches:
try:
hex_str = match.group(1)
if len(hex_str) % 2 == 0 and len(hex_str) > 20:
decoded = binascii.unhexlify(hex_str).decode('utf-8', errors='ignore')
if decoded and len(decoded) > 10:
decoded_content.append(decoded)
except:
continue
return decoded_content
def _calculate_confidence(self, indicator: str, indicator_type: str, context: str) -> float:
"""Calculate confidence score for an indicator."""
confidence = 0.5 # Base confidence
# Length and complexity scoring
if len(indicator) > 5:
confidence += 0.1
if len(indicator) > 20:
confidence += 0.1
# Context-based scoring
high_confidence_keywords = [
'exploit', 'payload', 'shell', 'inject', 'execute', 'run',
'attack', 'malware', 'backdoor', 'trojan', 'virus'
]
context_lower = context.lower()
for keyword in high_confidence_keywords:
if keyword in context_lower:
confidence += 0.1
break
# Type-specific scoring
if indicator_type == 'process':
if indicator.endswith('.exe') or indicator.endswith('.dll'):
confidence += 0.2
if any(word in indicator.lower() for word in ['cmd', 'powershell', 'bash', 'sh']):
confidence += 0.1
elif indicator_type == 'file':
if any(ext in indicator.lower() for ext in ['.exe', '.dll', '.bat', '.ps1', '.sh']):
confidence += 0.2
if any(path in indicator.lower() for path in ['temp', 'tmp', 'appdata']):
confidence += 0.1
elif indicator_type == 'network':
if re.match(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', indicator):
confidence += 0.2
if any(tld in indicator.lower() for tld in ['.com', '.net', '.org', '.ru', '.cn']):
confidence += 0.1
# Apply false positive filters
if self._is_false_positive(indicator, indicator_type):
confidence *= 0.3
return min(confidence, 1.0)
def _is_false_positive(self, indicator: str, indicator_type: str) -> bool:
"""Check if indicator is likely a false positive."""
if indicator_type in self.false_positive_filters:
fp_patterns = self.false_positive_filters[indicator_type]
for pattern in fp_patterns:
if re.search(pattern, indicator, re.IGNORECASE):
return True
return False
def _get_context(self, content: str, start: int, end: int, window: int = 100) -> str:
"""Get context around a match."""
context_start = max(0, start - window)
context_end = min(len(content), end + window)
return content[context_start:context_end].strip()
def _deduplicate_and_rank(self, indicators: List[SecurityIndicator]) -> List[Dict]:
"""Remove duplicates and rank indicators by confidence."""
# Deduplicate by value
seen = set()
unique_indicators = []
for indicator in sorted(indicators, key=lambda x: x.confidence, reverse=True):
if indicator.value not in seen:
seen.add(indicator.value)
unique_indicators.append(indicator)
# Convert to dict format and return top indicators
return [
{
'value': ind.value,
'confidence': round(ind.confidence, 2),
'context': ind.context[:200] + '...' if len(ind.context) > 200 else ind.context,
'attack_technique': ind.attack_technique.value if ind.attack_technique else None
}
for ind in unique_indicators[:10] # Top 10 indicators
]
def _analyze_attack_behaviors(self, content: str, language: str) -> List[Dict]:
"""Analyze attack behaviors and patterns."""
behaviors = []
behavior_patterns = {
'persistence': [
r'(?:startup|autorun|registry.*run)',
r'(?:scheduled.*task|cron|at\s+\d)',
r'(?:service.*create|sc.*create)'
],
'defense_evasion': [
r'(?:disable.*antivirus|kill.*av)',
r'(?:encode|encrypt|obfuscat)',
r'(?:hide|stealth|invisible)'
],
'credential_access': [
r'(?:password|credential|token)',
r'(?:keylog|steal.*key)',
r'(?:mimikatz|lsass)'
],
'lateral_movement': [
r'(?:psexec|wmi.*exec|remote.*exec)',
r'(?:net\s+use|mount|smb)',
r'(?:ssh|rdp|vnc)'
],
'exfiltration': [
r'(?:upload|ftp|http.*post)',
r'(?:compress|zip|archive)',
r'(?:steal|exfil|extract)'
]
}
content_lower = content.lower()
for behavior, patterns in behavior_patterns.items():
score = 0
matches = []
for pattern in patterns:
pattern_matches = re.findall(pattern, content_lower)
if pattern_matches:
score += len(pattern_matches)
matches.extend(pattern_matches)
if score > 0:
behaviors.append({
'behavior': behavior,
'confidence': min(score * 0.2, 1.0),
'indicators': matches[:5] # Top 5 matches
})
return sorted(behaviors, key=lambda x: x['confidence'], reverse=True)
def _map_to_mitre_attack(self, indicators: List[SecurityIndicator]) -> List[str]:
"""Map indicators to MITRE ATT&CK techniques."""
techniques = set()
for indicator in indicators:
if indicator.attack_technique:
techniques.add(indicator.attack_technique.value)
return sorted(list(techniques))
def _assess_analysis_quality(self, content: str) -> Dict[str, any]:
"""Assess the quality and completeness of the analysis."""
# Content metrics
lines = len(content.split('\n'))
chars = len(content)
# Indicator density
total_indicators = len(self.indicators)
high_conf_indicators = len([i for i in self.indicators if i.confidence > 0.7])
# Calculate quality score
content_score = min(lines / 50, 1.0) * 0.3 # More lines = better
indicator_score = min(total_indicators / 20, 1.0) * 0.4 # More indicators = better
confidence_score = (high_conf_indicators / max(total_indicators, 1)) * 0.3 # Higher confidence = better
overall_score = content_score + indicator_score + confidence_score
return {
'overall_score': round(overall_score, 2),
'content_lines': lines,
'content_chars': chars,
'total_indicators': total_indicators,
'high_confidence_indicators': high_conf_indicators,
'recommendation': self._get_quality_recommendation(overall_score)
}
def _get_quality_recommendation(self, score: float) -> str:
"""Get recommendation based on quality score."""
if score >= 0.8:
return "High quality PoC with excellent indicator extraction"
elif score >= 0.6:
return "Good quality PoC with adequate indicators"
elif score >= 0.4:
return "Moderate quality PoC, may need additional analysis"
else:
return "Low quality PoC, limited indicators extracted"
def _initialize_language_patterns(self) -> Dict:
"""Initialize language-specific patterns."""
return {
# Patterns for different languages will be expanded
}
def _initialize_attack_patterns(self) -> Dict:
"""Initialize attack pattern recognition."""
return {
# Attack patterns will be expanded
}
def _initialize_fp_filters(self) -> Dict:
"""Initialize false positive filters."""
return {
'process': [
r'^(explorer|notepad|calc|windir|system32)\.exe$',
r'^[a-z]$', # Single characters
r'^\d+$' # Pure numbers
],
'file': [
r'^[a-z]$',
r'^\d+$',
r'^(con|aux|prn|nul)$'
],
'network': [
r'^(localhost|127\.0\.0\.1|0\.0\.0\.0)$',
r'^\d{1,2}$', # Port numbers without context
r'^(example\.com|test\.com|localhost)$'
]
}
# Example usage
if __name__ == "__main__":
analyzer = PoCAnalyzer()
# Example PoC content
sample_poc = """
import subprocess
import base64
# CVE-2024-1234 exploit
payload = base64.b64decode("Y21kIC9jIGVjaG8gSGVsbG8gV29ybGQ=")
subprocess.call("powershell.exe -enc " + payload.decode(), shell=True)
# Create persistence
with open("C:\\temp\\malware.exe", "wb") as f:
f.write(malicious_bytes)
# Network connection
import socket
s = socket.socket()
s.connect(("192.168.1.100", 4444))
"""
result = analyzer.analyze_poc(sample_poc, "CVE-2024-1234")
print(f"Analysis result: {result}")