auto_sigma_rule_generator/backend/poc_analyzer.py

975 lines
44 KiB
Python
Executable file

"""
Advanced PoC (Proof of Concept) analyzer for extracting security indicators
from exploit code across multiple programming languages and attack vectors.
"""
import re
import base64
import binascii
from typing import Dict, List, Set, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class AttackTechnique(Enum):
PROCESS_INJECTION = "T1055"
COMMAND_EXECUTION = "T1059"
POWERSHELL = "T1059.001"
COMMAND_LINE = "T1059.003"
FILE_CREATION = "T1105"
REGISTRY_MODIFICATION = "T1112"
NETWORK_CONNECTION = "T1071"
PRIVILEGE_ESCALATION = "T1068"
DLL_INJECTION = "T1055.001"
PROCESS_HOLLOWING = "T1055.012"
SERVICE_CREATION = "T1543.003"
@dataclass
class SecurityIndicator:
"""Represents a security indicator extracted from PoC code."""
type: str # process, file, network, registry, command
value: str
confidence: float # 0.0 to 1.0
context: str # surrounding code context
attack_technique: Optional[AttackTechnique] = None
metadata: Dict = None
class PoCAnalyzer:
"""Advanced analyzer for extracting security indicators from PoC code."""
def __init__(self):
self.indicators: List[SecurityIndicator] = []
self.language_patterns = self._initialize_language_patterns()
self.attack_patterns = self._initialize_attack_patterns()
self.false_positive_filters = self._initialize_fp_filters()
def analyze_poc(self, poc_content: str, cve_id: str = None) -> Dict[str, any]:
"""
Main analysis function that extracts all security indicators.
Args:
poc_content: The PoC source code
cve_id: Optional CVE identifier for context
Returns:
Dictionary containing categorized indicators and analysis
"""
self.indicators = []
# Detect programming language
language = self._detect_language(poc_content)
# Extract indicators by category
processes = self._extract_process_indicators(poc_content, language)
files = self._extract_file_indicators(poc_content, language)
network = self._extract_network_indicators(poc_content, language)
registry = self._extract_registry_indicators(poc_content, language)
commands = self._extract_command_indicators(poc_content, language)
# Extract encoded/obfuscated content
decoded_content = self._extract_encoded_content(poc_content)
if decoded_content:
# Recursively analyze decoded content
for content in decoded_content:
sub_analysis = self.analyze_poc(content)
processes.extend(sub_analysis['processes'])
files.extend(sub_analysis['files'])
network.extend(sub_analysis['network'])
registry.extend(sub_analysis['registry'])
commands.extend(sub_analysis['commands'])
# Behavioral analysis
behaviors = self._analyze_attack_behaviors(poc_content, language)
# MITRE ATT&CK technique mapping
techniques = self._map_to_mitre_attack(
processes + files + network + registry + commands
)
# Quality assessment
analysis_quality = self._assess_analysis_quality(poc_content)
return {
'language': language,
'processes': self._deduplicate_and_rank(processes),
'files': self._deduplicate_and_rank(files),
'network': self._deduplicate_and_rank(network),
'registry': self._deduplicate_and_rank(registry),
'commands': self._deduplicate_and_rank(commands),
'behaviors': behaviors,
'mitre_techniques': techniques,
'quality_score': analysis_quality,
'total_indicators': len(self.indicators),
'high_confidence_indicators': len([i for i in self.indicators if i.confidence > 0.7])
}
def _detect_language(self, content: str) -> str:
"""Detect the primary programming language of the PoC."""
language_indicators = {
'powershell': [
r'\$[a-zA-Z_][a-zA-Z0-9_]*', r'Get-\w+', r'Set-\w+', r'New-\w+',
r'Invoke-\w+', r'Add-Type', r'\[System\.\w+\]'
],
'python': [
r'import\s+\w+', r'from\s+\w+\s+import', r'def\s+\w+\(',
r'subprocess\.', r'os\.system', r'__name__\s*==\s*["\']__main__["\']'
],
'bash': [
r'#!/bin/bash', r'#!/bin/sh', r'\$\{[^}]+\}', r'chmod\s+\+x',
r'wget\s+', r'curl\s+', r'echo\s+.*\|'
],
'batch': [
r'@echo\s+off', r'%[^%]+%', r'goto\s+\w+', r'if\s+exist',
r'cmd\s*/c', r'start\s+'
],
'c_cpp': [
r'#include\s*<[^>]+>', r'int\s+main\s*\(', r'printf\s*\(',
r'malloc\s*\(', r'free\s*\(', r'system\s*\('
],
'csharp': [
r'using\s+System', r'namespace\s+\w+', r'class\s+\w+',
r'Process\.Start', r'Registry\.', r'new\s+ProcessStartInfo'
],
'javascript': [
r'function\s+\w+\s*\(', r'var\s+\w+\s*=', r'console\.log',
r'require\s*\(', r'=>', r'new\s+XMLHttpRequest'
],
'php': [
r'<\?php', r'\$[a-zA-Z_][a-zA-Z0-9_]*', r'echo\s+',
r'exec\s*\(', r'system\s*\(', r'shell_exec'
]
}
scores = {}
content_lower = content.lower()
for lang, patterns in language_indicators.items():
score = 0
for pattern in patterns:
matches = len(re.findall(pattern, content, re.IGNORECASE | re.MULTILINE))
score += matches
scores[lang] = score
if not scores or max(scores.values()) == 0:
return 'unknown'
return max(scores, key=scores.get)
def _extract_process_indicators(self, content: str, language: str) -> List[SecurityIndicator]:
"""Extract process execution indicators."""
indicators = []
patterns = {
'powershell': [
r'Start-Process\s+["\']?([^"\';\s]+)',
r'Invoke-Expression\s+["\']?([^"\';\s]+)',
r'&\s+["\']?([^"\';\s]+\.exe)',
r'\.\s+["\']?([^"\';\s]+\.exe)',
r'Invoke-Command\s+[^}]*ScriptBlock\s*=\s*["\']([^"\']+)',
r'powershell\.exe\s+[^"\']*["\']([^"\']+)'
],
'python': [
r'subprocess\.call\(\s*["\']([^"\']+)',
r'subprocess\.Popen\(\s*["\']([^"\']+)',
r'subprocess\.run\(\s*["\']([^"\']+)',
r'subprocess\.check_output\(\s*["\']([^"\']+)',
r'subprocess\.check_call\(\s*["\']([^"\']+)',
r'subprocess\.getoutput\(\s*["\']([^"\']+)',
r'subprocess\.getstatusoutput\(\s*["\']([^"\']+)',
r'os\.system\(\s*["\']([^"\']+)',
r'os\.exec[vl]?p?\(\s*["\']([^"\']+)',
r'os\.spawn[vl]?p?\(\s*[^,]*,\s*["\']([^"\']+)',
r'os\.popen\(\s*["\']([^"\']+)',
r'commands\.getoutput\(\s*["\']([^"\']+)',
r'commands\.getstatusoutput\(\s*["\']([^"\']+)',
r'pexpect\.spawn\(\s*["\']([^"\']+)',
r'pexpect\.run\(\s*["\']([^"\']+)',
r'multiprocessing\.Process\([^)]*target[^,]*,\s*["\']([^"\']+)',
r'threading\.Thread\([^)]*target[^,]*,\s*["\']([^"\']+)',
r'eval\(\s*["\']([^"\']+)',
r'exec\(\s*["\']([^"\']+)',
r'compile\(\s*["\']([^"\']+)',
r'__import__\(\s*["\']([^"\']+)',
r'importlib\.import_module\(\s*["\']([^"\']+)',
r'ctypes\.windll\.',
r'ctypes\.cdll\.',
r'win32api\.',
r'win32process\.CreateProcess'
],
'bash': [
r'exec\s+([^;\s&|]+)',
r'/bin/sh\s+-c\s+["\']([^"\']+)',
r'system\(\s*["\']([^"\']+)',
r'bash\s+-c\s+["\']([^"\']+)',
r'\$\(([^)]+)\)' # Command substitution
],
'batch': [
r'start\s+["\']?([^"\';\s]+)',
r'cmd\s*/c\s+["\']?([^"\']+)',
r'call\s+["\']?([^"\';\s]+)',
r'%COMSPEC%\s+[^"\']*["\']([^"\']+)'
],
'c_cpp': [
r'system\(\s*["\']([^"\']+)',
r'execve?\(\s*["\']([^"\']+)',
r'CreateProcess[AW]?\([^,]*["\']([^"\']+)',
r'WinExec\(\s*["\']([^"\']+)',
r'ShellExecute[AW]?\([^,]*["\']([^"\']+)'
],
'csharp': [
r'Process\.Start\(\s*["\']([^"\']+)',
r'ProcessStartInfo.*FileName\s*=\s*["\']([^"\']+)',
r'new\s+Process.*["\']([^"\']+)',
r'Process\.Start\(\s*new\s+ProcessStartInfo[^}]*FileName\s*=\s*["\']([^"\']+)'
]
}
if language in patterns:
for pattern in patterns[language]:
matches = re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE)
for match in matches:
process_name = match.group(1)
context = self._get_context(content, match.start(), match.end())
confidence = self._calculate_confidence(process_name, 'process', context)
if confidence > 0.3: # Filter low confidence matches
indicators.append(SecurityIndicator(
type='process',
value=process_name,
confidence=confidence,
context=context,
attack_technique=AttackTechnique.PROCESS_INJECTION if 'inject' in context.lower() else AttackTechnique.COMMAND_EXECUTION
))
return indicators
def _extract_file_indicators(self, content: str, language: str) -> List[SecurityIndicator]:
"""Extract file system indicators."""
indicators = []
# File path patterns
file_patterns = [
r'["\']([a-zA-Z]:\\[^"\'<>|*?]+\.[a-zA-Z0-9]+)["\']', # Windows paths
r'["\']([/][^"\'<>|*?\s]+\.[a-zA-Z0-9]+)["\']', # Unix paths
r'["\'](\./[^"\'<>|*?\s]+\.[a-zA-Z0-9]+)["\']', # Relative paths
r'%TEMP%\\([^"\'<>|*?\s]+\.[a-zA-Z0-9]+)', # Windows temp
r'/tmp/([^"\'<>|*?\s]+\.[a-zA-Z0-9]+)', # Unix temp
]
# Language-specific file operations
operation_patterns = {
'powershell': [
r'New-Item.*Path.*["\']([^"\']+)["\']',
r'Out-File.*["\']([^"\']+)["\']',
r'Set-Content.*["\']([^"\']+)["\']',
r'\|\s*Out-File\s+["\']([^"\']+)["\']'
],
'python': [
r'open\(\s*["\']([^"\']+)["\']',
r'with\s+open\(\s*["\']([^"\']+)["\']',
r'shutil\.copy.*["\']([^"\']+)["\']',
r'shutil\.copyfile\(\s*[^,]*,\s*["\']([^"\']+)["\']',
r'shutil\.move\(\s*[^,]*,\s*["\']([^"\']+)["\']',
r'shutil\.copytree\(\s*[^,]*,\s*["\']([^"\']+)["\']',
r'os\.rename\(\s*[^,]*,\s*["\']([^"\']+)["\']',
r'os\.remove\(\s*["\']([^"\']+)["\']',
r'os\.unlink\(\s*["\']([^"\']+)["\']',
r'os\.rmdir\(\s*["\']([^"\']+)["\']',
r'os\.makedirs\(\s*["\']([^"\']+)["\']',
r'os\.mkdir\(\s*["\']([^"\']+)["\']',
r'os\.path\.join\([^)]*["\']([^"\']+)["\']',
r'pathlib\.Path\(\s*["\']([^"\']+)["\']',
r'tempfile\.mktemp\(\s*[^)]*["\']([^"\']+)["\']',
r'tempfile\.NamedTemporaryFile\([^)]*dir\s*=\s*["\']([^"\']+)["\']',
r'io\.open\(\s*["\']([^"\']+)["\']',
r'codecs\.open\(\s*["\']([^"\']+)["\']',
r'pickle\.load\(\s*["\']([^"\']+)["\']',
r'pickle\.dump\([^,]*,\s*["\']([^"\']+)["\']',
r'json\.load\(\s*["\']([^"\']+)["\']',
r'json\.dump\([^,]*,\s*["\']([^"\']+)["\']',
r'zipfile\.ZipFile\(\s*["\']([^"\']+)["\']',
r'tarfile\.open\(\s*["\']([^"\']+)["\']',
r'gzip\.open\(\s*["\']([^"\']+)["\']',
r'bz2\.open\(\s*["\']([^"\']+)["\']'
],
'bash': [
r'touch\s+["\']?([^"\';\s]+)',
r'cp\s+[^"\';\s]+\s+["\']?([^"\';\s]+)',
r'mv\s+[^"\';\s]+\s+["\']?([^"\';\s]+)',
r'echo.*>\s*["\']?([^"\';\s]+)'
],
'c_cpp': [
r'fopen\(\s*["\']([^"\']+)["\']',
r'CreateFile[AW]?\([^,]*["\']([^"\']+)["\']',
r'WriteFile.*["\']([^"\']+)["\']'
]
}
# Extract file paths
for pattern in file_patterns:
matches = re.finditer(pattern, content, re.IGNORECASE)
for match in matches:
file_path = match.group(1)
context = self._get_context(content, match.start(), match.end())
confidence = self._calculate_confidence(file_path, 'file', context)
if confidence > 0.4:
indicators.append(SecurityIndicator(
type='file',
value=file_path,
confidence=confidence,
context=context,
attack_technique=AttackTechnique.FILE_CREATION
))
# Extract file operations
if language in operation_patterns:
for pattern in operation_patterns[language]:
matches = re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE)
for match in matches:
file_path = match.group(1)
context = self._get_context(content, match.start(), match.end())
confidence = self._calculate_confidence(file_path, 'file', context)
if confidence > 0.4:
indicators.append(SecurityIndicator(
type='file',
value=file_path,
confidence=confidence,
context=context,
attack_technique=AttackTechnique.FILE_CREATION
))
return indicators
def _extract_network_indicators(self, content: str, language: str) -> List[SecurityIndicator]:
"""Extract network communication indicators."""
indicators = []
# Network patterns - enhanced with more comprehensive patterns
network_patterns = [
r'(?:http[s]?://)([^/\s"\']+)', # URLs
r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # IP addresses
r':(\d{2,5})\b', # Port numbers
r'Host:\s*([^\s\r\n]+)', # HTTP Host headers
r'User-Agent:\s*([^\r\n]+)', # User agents
r'ftp://([^/\s"\']+)', # FTP URLs
r'([a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}', # Domain names
r'(?:GET|POST|PUT|DELETE)\s+([^\s]+)', # HTTP methods with paths
r'Content-Type:\s*([^\r\n]+)', # Content types
r'Authorization:\s*([^\r\n]+)', # Auth headers
]
# Language-specific network operations
operation_patterns = {
'powershell': [
r'Invoke-WebRequest.*Uri.*["\']([^"\']+)["\']',
r'New-Object.*WebClient.*DownloadString.*["\']([^"\']+)["\']',
r'System\.Net\.Sockets\.TcpClient.*(\d+)',
r'Connect.*(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*(\d+)'
],
'python': [
r'requests\.get\(\s*["\']([^"\']+)["\']',
r'requests\.post\(\s*["\']([^"\']+)["\']',
r'requests\.put\(\s*["\']([^"\']+)["\']',
r'requests\.delete\(\s*["\']([^"\']+)["\']',
r'requests\.session\(\)', # Session creation
r'requests\.Session\(\)', # Session creation (capitalized)
r'session\.get\(\s*["\']([^"\']+)["\']', # Session-based requests
r'session\.post\(\s*["\']([^"\']+)["\']',
r'session\.put\(\s*["\']([^"\']+)["\']',
r'session\.delete\(\s*["\']([^"\']+)["\']',
r'session\.request\(\s*["\'][^"\']+["\'],\s*["\']([^"\']+)["\']',
r'urllib\.request\.urlopen\(\s*["\']([^"\']+)["\']',
r'urllib\.request\.Request\(\s*["\']([^"\']+)["\']',
r'urllib2\.urlopen\(\s*["\']([^"\']+)["\']',
r'urllib2\.Request\(\s*["\']([^"\']+)["\']',
r'socket\.connect\(\s*\(["\']([^"\']+)["\'],\s*(\d+)',
r'socket\.connect\(\s*\(([^,]+),\s*(\d+)',
r'http\.client\.HTTPConnection\(\s*["\']([^"\']+)["\']',
r'http\.client\.HTTPSConnection\(\s*["\']([^"\']+)["\']',
r'httplib\.HTTPConnection\(\s*["\']([^"\']+)["\']',
r'httplib\.HTTPSConnection\(\s*["\']([^"\']+)["\']'
],
'bash': [
r'wget\s+["\']?([^"\';\s]+)',
r'curl\s+["\']?([^"\';\s]+)',
r'nc\s+([^\s]+)\s+(\d+)',
r'netcat\s+([^\s]+)\s+(\d+)'
],
'c_cpp': [
r'connect\([^,]*inet_addr\(["\']([^"\']+)["\']',
r'gethostbyname\(["\']([^"\']+)["\']',
r'socket\(.*SOCK_STREAM'
]
}
# Extract network indicators
for pattern in network_patterns:
matches = re.finditer(pattern, content, re.IGNORECASE)
for match in matches:
network_indicator = match.group(1) if len(match.groups()) > 0 else match.group(0)
context = self._get_context(content, match.start(), match.end())
confidence = self._calculate_confidence(network_indicator, 'network', context)
if confidence > 0.3:
indicators.append(SecurityIndicator(
type='network',
value=network_indicator,
confidence=confidence,
context=context,
attack_technique=AttackTechnique.NETWORK_CONNECTION
))
# Extract language-specific network operations
if language in operation_patterns:
for pattern in operation_patterns[language]:
matches = re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE)
for match in matches:
# Handle different match group scenarios
if len(match.groups()) > 0:
network_indicator = match.group(1) if match.group(1) else match.group(0)
else:
network_indicator = match.group(0)
context = self._get_context(content, match.start(), match.end())
# Special handling for session-based patterns
if 'session' in pattern.lower():
# For session patterns, we want to capture the session usage
if 'session.post' in match.group(0).lower() or 'session.get' in match.group(0).lower():
# Extract URL from session call if available
if len(match.groups()) > 0 and match.group(1):
network_indicator = match.group(1)
else:
network_indicator = "session-based-request"
else:
network_indicator = "requests-session"
confidence = self._calculate_confidence(network_indicator, 'network', context)
if confidence > 0.3:
# Boost confidence for session-based attacks
if 'session' in context.lower():
confidence = min(confidence + 0.2, 1.0)
indicators.append(SecurityIndicator(
type='network',
value=network_indicator,
confidence=confidence,
context=context,
attack_technique=AttackTechnique.NETWORK_CONNECTION
))
return indicators
def _extract_registry_indicators(self, content: str, language: str) -> List[SecurityIndicator]:
"""Extract Windows registry indicators."""
indicators = []
# Registry key patterns
registry_patterns = [
r'(HKEY_[A-Z_]+\\[^"\';\s\]]+)',
r'(HKLM\\[^"\';\s\]]+)',
r'(HKCU\\[^"\';\s\]]+)',
r'(SOFTWARE\\[^"\';\s\]]+)',
r'(SYSTEM\\[^"\';\s\]]+)'
]
# Language-specific registry operations
operation_patterns = {
'powershell': [
r'New-ItemProperty.*Path.*["\']([^"\']+)["\']',
r'Set-ItemProperty.*Path.*["\']([^"\']+)["\']',
r'Get-ItemProperty.*Path.*["\']([^"\']+)["\']',
r'Remove-ItemProperty.*Path.*["\']([^"\']+)["\']'
],
'batch': [
r'reg\s+add\s+["\']?([^"\';\s]+)',
r'reg\s+query\s+["\']?([^"\';\s]+)',
r'reg\s+delete\s+["\']?([^"\';\s]+)'
],
'c_cpp': [
r'RegCreateKey[Ex]?[AW]?.*["\']([^"\']+)["\']',
r'RegSetValue[Ex]?[AW]?.*["\']([^"\']+)["\']',
r'RegOpenKey[Ex]?[AW]?.*["\']([^"\']+)["\']'
],
'csharp': [
r'Registry\.[^.]+\.OpenSubKey\(["\']([^"\']+)["\']',
r'RegistryKey.*["\']([^"\']+)["\']'
]
}
# Extract registry keys
for pattern in registry_patterns:
matches = re.finditer(pattern, content, re.IGNORECASE)
for match in matches:
reg_key = match.group(1)
context = self._get_context(content, match.start(), match.end())
confidence = self._calculate_confidence(reg_key, 'registry', context)
if confidence > 0.4:
indicators.append(SecurityIndicator(
type='registry',
value=reg_key,
confidence=confidence,
context=context,
attack_technique=AttackTechnique.REGISTRY_MODIFICATION
))
return indicators
def _extract_command_indicators(self, content: str, language: str) -> List[SecurityIndicator]:
"""Extract command-line execution indicators."""
indicators = []
# Command patterns - enhanced with Python-specific patterns
command_patterns = [
r'(?:cmd|powershell|bash|sh)\s+[/-]c\s+["\']?([^"\';\n]+)',
r'(?:system|exec|shell_exec)\(\s*["\']([^"\']+)["\']',
r'[`]([^`]+)[`]', # Backticks
r'\$\(([^)]+)\)', # Command substitution
# Python-specific command execution patterns
r'subprocess\.call\(\s*\[([^\]]+)\]', # subprocess.call with list
r'subprocess\.Popen\(\s*\[([^\]]+)\]', # subprocess.Popen with list
r'subprocess\.run\(\s*\[([^\]]+)\]', # subprocess.run with list
r'os\.system\(\s*f["\']([^"\']+)["\']', # f-string commands
r'os\.system\(\s*["\']([^"\']+)["\']\.format\(', # .format() commands
r'os\.system\(\s*["\']([^"\']+)["\']\.%', # % formatting
r'subprocess\.call\(\s*f["\']([^"\']+)["\']', # f-string subprocess
r'subprocess\.Popen\(\s*f["\']([^"\']+)["\']', # f-string Popen
r'pexpect\.spawn\(\s*f["\']([^"\']+)["\']', # f-string pexpect
r'commands\.getoutput\(\s*f["\']([^"\']+)["\']', # f-string commands
r'eval\(\s*["\']([^"\']+)["\']', # eval() calls
r'exec\(\s*["\']([^"\']+)["\']', # exec() calls
r'compile\(\s*["\']([^"\']+)["\']', # compile() calls
r'__import__\(\s*["\']([^"\']+)["\']', # dynamic imports
r'importlib\.import_module\(\s*["\']([^"\']+)["\']', # importlib
r'ctypes\.windll\.kernel32\.WinExec\(', # WinExec via ctypes
r'ctypes\.windll\.shell32\.ShellExecute[AW]?\(', # ShellExecute
r'win32process\.CreateProcess\(', # pywin32 CreateProcess
r'win32api\.ShellExecute\(', # pywin32 ShellExecute
r'win32api\.WinExec\(', # pywin32 WinExec
]
for pattern in command_patterns:
matches = re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE)
for match in matches:
command = match.group(1)
context = self._get_context(content, match.start(), match.end())
confidence = self._calculate_confidence(command, 'command', context)
if confidence > 0.4:
# Determine attack technique based on command content
technique = AttackTechnique.COMMAND_EXECUTION
if 'powershell' in command.lower():
technique = AttackTechnique.POWERSHELL
elif any(word in command.lower() for word in ['cmd', 'bat', 'com']):
technique = AttackTechnique.COMMAND_LINE
indicators.append(SecurityIndicator(
type='command',
value=command,
confidence=confidence,
context=context,
attack_technique=technique
))
return indicators
def _extract_encoded_content(self, content: str) -> List[str]:
"""Extract and decode obfuscated/encoded content."""
decoded_content = []
# Base64 patterns - enhanced with more Python patterns
base64_patterns = [
r'["\']([A-Za-z0-9+/]{20,}={0,2})["\']', # Base64 strings
r'FromBase64String\(["\']([^"\']+)["\']', # PowerShell
r'base64\.b64decode\(["\']([^"\']+)["\']', # Python
r'base64\.b64encode\(["\']([^"\']+)["\']', # Python encode
r'base64\.standard_b64decode\(["\']([^"\']+)["\']', # Python standard
r'base64\.urlsafe_b64decode\(["\']([^"\']+)["\']', # Python URL-safe
r'base64\.decodebytes\(["\']([^"\']+)["\']', # Python 3
r'base64\.encodebytes\(["\']([^"\']+)["\']', # Python 3
r'codecs\.decode\(["\']([^"\']+)["\'],\s*["\']base64["\']', # codecs
r'codecs\.encode\(["\']([^"\']+)["\'],\s*["\']base64["\']', # codecs
r'\.decode\(["\']base64["\']', # .decode('base64')
r'\.encode\(["\']base64["\']', # .encode('base64')
]
for pattern in base64_patterns:
matches = re.finditer(pattern, content, re.IGNORECASE)
for match in matches:
try:
encoded_str = match.group(1)
if len(encoded_str) > 20: # Only decode substantial content
decoded = base64.b64decode(encoded_str + '===').decode('utf-8', errors='ignore')
if decoded and len(decoded) > 10:
decoded_content.append(decoded)
except:
continue
# Hex patterns - enhanced with Python-specific patterns
hex_patterns = [
r'0x([0-9a-fA-F]{20,})',
r'["\']([0-9a-fA-F]{20,})["\']',
r'bytes\.fromhex\(["\']([0-9a-fA-F]+)["\']', # Python bytes.fromhex
r'binascii\.hexlify\(["\']([^"\']+)["\']', # Python binascii
r'binascii\.unhexlify\(["\']([0-9a-fA-F]+)["\']', # Python binascii
r'codecs\.decode\(["\']([0-9a-fA-F]+)["\'],\s*["\']hex["\']', # codecs hex
r'codecs\.encode\(["\']([^"\']+)["\'],\s*["\']hex["\']', # codecs hex
r'\.decode\(["\']hex["\']', # .decode('hex')
r'\.encode\(["\']hex["\']', # .encode('hex')
]
# Additional Python encoding patterns
other_encoding_patterns = [
r'codecs\.decode\(["\']([^"\']+)["\'],\s*["\']rot13["\']', # ROT13
r'codecs\.encode\(["\']([^"\']+)["\'],\s*["\']rot13["\']', # ROT13
r'\.decode\(["\']utf-8["\']', # UTF-8 decode
r'\.encode\(["\']utf-8["\']', # UTF-8 encode
r'\.decode\(["\']ascii["\']', # ASCII decode
r'\.encode\(["\']ascii["\']', # ASCII encode
r'urllib\.parse\.quote\(["\']([^"\']+)["\']', # URL encoding
r'urllib\.parse\.unquote\(["\']([^"\']+)["\']', # URL decoding
r'urllib\.parse\.quote_plus\(["\']([^"\']+)["\']', # URL encoding
r'urllib\.parse\.unquote_plus\(["\']([^"\']+)["\']', # URL decoding
r'html\.escape\(["\']([^"\']+)["\']', # HTML escape
r'html\.unescape\(["\']([^"\']+)["\']', # HTML unescape
r'json\.dumps\(["\']([^"\']+)["\']', # JSON encoding
r'json\.loads\(["\']([^"\']+)["\']', # JSON decoding
r'pickle\.dumps\(["\']([^"\']+)["\']', # Pickle serialization
r'pickle\.loads\(["\']([^"\']+)["\']', # Pickle deserialization
r'zlib\.compress\(["\']([^"\']+)["\']', # Zlib compression
r'zlib\.decompress\(["\']([^"\']+)["\']', # Zlib decompression
r'gzip\.compress\(["\']([^"\']+)["\']', # Gzip compression
r'gzip\.decompress\(["\']([^"\']+)["\']', # Gzip decompression
]
for pattern in hex_patterns:
matches = re.finditer(pattern, content)
for match in matches:
try:
hex_str = match.group(1)
if len(hex_str) % 2 == 0 and len(hex_str) > 20:
decoded = binascii.unhexlify(hex_str).decode('utf-8', errors='ignore')
if decoded and len(decoded) > 10:
decoded_content.append(decoded)
except:
continue
# Process additional encoding patterns
for pattern in other_encoding_patterns:
matches = re.finditer(pattern, content, re.IGNORECASE)
for match in matches:
try:
if len(match.groups()) > 0:
encoded_str = match.group(1)
if len(encoded_str) > 10: # Only process substantial content
# For now, just add the pattern as an indicator
# Real decoding would depend on the specific encoding
decoded_content.append(f"encoded_content: {encoded_str[:50]}...")
except:
continue
return decoded_content
def _calculate_confidence(self, indicator: str, indicator_type: str, context: str) -> float:
"""Calculate confidence score for an indicator."""
confidence = 0.5 # Base confidence
# Length and complexity scoring
if len(indicator) > 5:
confidence += 0.1
if len(indicator) > 20:
confidence += 0.1
# Context-based scoring - enhanced with Python-specific keywords
high_confidence_keywords = [
'exploit', 'payload', 'shell', 'inject', 'execute', 'run',
'attack', 'malware', 'backdoor', 'trojan', 'virus',
# Python-specific exploit keywords
'subprocess', 'popen', 'system', 'exec', 'eval', 'compile',
'import', 'ctypes', 'win32api', 'win32process', 'pexpect',
'base64', 'decode', 'encode', 'pickle', 'marshal',
'requests', 'urllib', 'socket', 'connect', 'bind',
'reverse', 'shell', 'backdoor', 'persistence', 'privilege',
'escalation', 'bypass', 'evasion', 'obfuscation'
]
context_lower = context.lower()
for keyword in high_confidence_keywords:
if keyword in context_lower:
confidence += 0.1
break
# Type-specific scoring - enhanced for Python
if indicator_type == 'process':
if indicator.endswith('.exe') or indicator.endswith('.dll'):
confidence += 0.2
if any(word in indicator.lower() for word in ['cmd', 'powershell', 'bash', 'sh']):
confidence += 0.1
# Python-specific process indicators
if any(word in indicator.lower() for word in ['python', 'py', 'subprocess', 'popen']):
confidence += 0.15
if any(word in indicator.lower() for word in ['eval', 'exec', 'compile', 'import']):
confidence += 0.2
elif indicator_type == 'file':
if any(ext in indicator.lower() for ext in ['.exe', '.dll', '.bat', '.ps1', '.sh']):
confidence += 0.2
if any(path in indicator.lower() for path in ['temp', 'tmp', 'appdata']):
confidence += 0.1
# Python-specific file indicators
if any(ext in indicator.lower() for ext in ['.py', '.pyc', '.pyo', '.pyd']):
confidence += 0.15
if any(path in indicator.lower() for path in ['__pycache__', '.python', 'site-packages']):
confidence += 0.1
elif indicator_type == 'network':
if re.match(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', indicator):
confidence += 0.2
if any(tld in indicator.lower() for tld in ['.com', '.net', '.org', '.ru', '.cn']):
confidence += 0.1
# Python-specific network indicators
if any(word in indicator.lower() for word in ['requests', 'urllib', 'session', 'socket']):
confidence += 0.15
if 'session' in indicator.lower():
confidence += 0.1
elif indicator_type == 'command':
# Python-specific command indicators
if any(word in indicator.lower() for word in ['python', 'py', 'subprocess', 'os.system']):
confidence += 0.15
if any(word in indicator.lower() for word in ['eval', 'exec', 'compile', 'import']):
confidence += 0.2
if any(word in indicator.lower() for word in ['base64', 'decode', 'encode', 'pickle']):
confidence += 0.1
# Apply false positive filters
if self._is_false_positive(indicator, indicator_type):
confidence *= 0.3
return min(confidence, 1.0)
def _is_false_positive(self, indicator: str, indicator_type: str) -> bool:
"""Check if indicator is likely a false positive."""
if indicator_type in self.false_positive_filters:
fp_patterns = self.false_positive_filters[indicator_type]
for pattern in fp_patterns:
if re.search(pattern, indicator, re.IGNORECASE):
return True
return False
def _get_context(self, content: str, start: int, end: int, window: int = 100) -> str:
"""Get context around a match."""
context_start = max(0, start - window)
context_end = min(len(content), end + window)
return content[context_start:context_end].strip()
def _deduplicate_and_rank(self, indicators: List[SecurityIndicator]) -> List[Dict]:
"""Remove duplicates and rank indicators by confidence."""
# Deduplicate by value
seen = set()
unique_indicators = []
for indicator in sorted(indicators, key=lambda x: x.confidence, reverse=True):
if indicator.value not in seen:
seen.add(indicator.value)
unique_indicators.append(indicator)
# Convert to dict format and return top indicators
return [
{
'value': ind.value,
'confidence': round(ind.confidence, 2),
'context': ind.context[:200] + '...' if len(ind.context) > 200 else ind.context,
'attack_technique': ind.attack_technique.value if ind.attack_technique else None
}
for ind in unique_indicators[:10] # Top 10 indicators
]
def _analyze_attack_behaviors(self, content: str, language: str) -> List[Dict]:
"""Analyze attack behaviors and patterns."""
behaviors = []
behavior_patterns = {
'persistence': [
r'(?:startup|autorun|registry.*run)',
r'(?:scheduled.*task|cron|at\s+\d)',
r'(?:service.*create|sc.*create)'
],
'defense_evasion': [
r'(?:disable.*antivirus|kill.*av)',
r'(?:encode|encrypt|obfuscat)',
r'(?:hide|stealth|invisible)'
],
'credential_access': [
r'(?:password|credential|token)',
r'(?:keylog|steal.*key)',
r'(?:mimikatz|lsass)'
],
'lateral_movement': [
r'(?:psexec|wmi.*exec|remote.*exec)',
r'(?:net\s+use|mount|smb)',
r'(?:ssh|rdp|vnc)'
],
'exfiltration': [
r'(?:upload|ftp|http.*post)',
r'(?:compress|zip|archive)',
r'(?:steal|exfil|extract)'
]
}
content_lower = content.lower()
for behavior, patterns in behavior_patterns.items():
score = 0
matches = []
for pattern in patterns:
pattern_matches = re.findall(pattern, content_lower)
if pattern_matches:
score += len(pattern_matches)
matches.extend(pattern_matches)
if score > 0:
behaviors.append({
'behavior': behavior,
'confidence': min(score * 0.2, 1.0),
'indicators': matches[:5] # Top 5 matches
})
return sorted(behaviors, key=lambda x: x['confidence'], reverse=True)
def _map_to_mitre_attack(self, indicators: List[SecurityIndicator]) -> List[str]:
"""Map indicators to MITRE ATT&CK techniques."""
techniques = set()
for indicator in indicators:
if indicator.attack_technique:
techniques.add(indicator.attack_technique.value)
return sorted(list(techniques))
def _assess_analysis_quality(self, content: str) -> Dict[str, any]:
"""Assess the quality and completeness of the analysis."""
# Content metrics
lines = len(content.split('\n'))
chars = len(content)
# Indicator density
total_indicators = len(self.indicators)
high_conf_indicators = len([i for i in self.indicators if i.confidence > 0.7])
# Calculate quality score
content_score = min(lines / 50, 1.0) * 0.3 # More lines = better
indicator_score = min(total_indicators / 20, 1.0) * 0.4 # More indicators = better
confidence_score = (high_conf_indicators / max(total_indicators, 1)) * 0.3 # Higher confidence = better
overall_score = content_score + indicator_score + confidence_score
return {
'overall_score': round(overall_score, 2),
'content_lines': lines,
'content_chars': chars,
'total_indicators': total_indicators,
'high_confidence_indicators': high_conf_indicators,
'recommendation': self._get_quality_recommendation(overall_score)
}
def _get_quality_recommendation(self, score: float) -> str:
"""Get recommendation based on quality score."""
if score >= 0.8:
return "High quality PoC with excellent indicator extraction"
elif score >= 0.6:
return "Good quality PoC with adequate indicators"
elif score >= 0.4:
return "Moderate quality PoC, may need additional analysis"
else:
return "Low quality PoC, limited indicators extracted"
def _initialize_language_patterns(self) -> Dict:
"""Initialize language-specific patterns."""
return {
# Patterns for different languages will be expanded
}
def _initialize_attack_patterns(self) -> Dict:
"""Initialize attack pattern recognition."""
return {
# Attack patterns will be expanded
}
def _initialize_fp_filters(self) -> Dict:
"""Initialize false positive filters."""
return {
'process': [
r'^(explorer|notepad|calc|windir|system32)\.exe$',
r'^[a-z]$', # Single characters
r'^\d+$', # Pure numbers
# Python-specific false positives
r'^(print|len|str|int|float|list|dict|tuple|set)$', # Built-in functions
r'^(import|from|def|class|if|else|elif|for|while|try|except)$', # Keywords
r'^(sys|os|re|json|time|datetime|random|math)$', # Common modules
],
'file': [
r'^[a-z]$',
r'^\d+$',
r'^(con|aux|prn|nul)$',
# Python-specific false positives
r'^(sys|os|re|json|time|datetime|random|math)\.py$', # Common modules
r'^__init__\.py$', # Python package files
r'^setup\.py$', # Python setup files
r'^test.*\.py$', # Test files
r'^.*_test\.py$', # Test files
],
'network': [
r'^(localhost|127\.0\.0\.1|0\.0\.0\.0)$',
r'^\d{1,2}$', # Port numbers without context
r'^(example\.com|test\.com|localhost)$',
# Python-specific false positives
r'^(requests|urllib|socket|http)$', # Module names without context
r'^(session|connection|client|server)$', # Generic terms
r'^(get|post|put|delete|head|options)$', # HTTP methods without context
],
'command': [
r'^[a-z]$',
r'^\d+$',
# Python-specific false positives
r'^(print|len|str|int|float|list|dict|tuple|set)$', # Built-in functions
r'^(import|from|def|class|if|else|elif|for|while|try|except)$', # Keywords
r'^(help|dir|type|vars|globals|locals)$', # Introspection functions
]
}
# Example usage
if __name__ == "__main__":
analyzer = PoCAnalyzer()
# Example PoC content
sample_poc = """
import subprocess
import base64
# CVE-2024-1234 exploit
payload = base64.b64decode("Y21kIC9jIGVjaG8gSGVsbG8gV29ybGQ=")
subprocess.call("powershell.exe -enc " + payload.decode(), shell=True)
# Create persistence
with open("C:\\temp\\malware.exe", "wb") as f:
f.write(malicious_bytes)
# Network connection
import socket
s = socket.socket()
s.connect(("192.168.1.100", 4444))
"""
result = analyzer.analyze_poc(sample_poc, "CVE-2024-1234")
print(f"Analysis result: {result}")