""" Advanced PoC (Proof of Concept) analyzer for extracting security indicators from exploit code across multiple programming languages and attack vectors. """ import re import base64 import binascii from typing import Dict, List, Set, Optional, Tuple from dataclasses import dataclass from enum import Enum import logging logger = logging.getLogger(__name__) class AttackTechnique(Enum): PROCESS_INJECTION = "T1055" COMMAND_EXECUTION = "T1059" POWERSHELL = "T1059.001" COMMAND_LINE = "T1059.003" FILE_CREATION = "T1105" REGISTRY_MODIFICATION = "T1112" NETWORK_CONNECTION = "T1071" PRIVILEGE_ESCALATION = "T1068" DLL_INJECTION = "T1055.001" PROCESS_HOLLOWING = "T1055.012" SERVICE_CREATION = "T1543.003" @dataclass class SecurityIndicator: """Represents a security indicator extracted from PoC code.""" type: str # process, file, network, registry, command value: str confidence: float # 0.0 to 1.0 context: str # surrounding code context attack_technique: Optional[AttackTechnique] = None metadata: Dict = None class PoCAnalyzer: """Advanced analyzer for extracting security indicators from PoC code.""" def __init__(self): self.indicators: List[SecurityIndicator] = [] self.language_patterns = self._initialize_language_patterns() self.attack_patterns = self._initialize_attack_patterns() self.false_positive_filters = self._initialize_fp_filters() def analyze_poc(self, poc_content: str, cve_id: str = None) -> Dict[str, any]: """ Main analysis function that extracts all security indicators. Args: poc_content: The PoC source code cve_id: Optional CVE identifier for context Returns: Dictionary containing categorized indicators and analysis """ self.indicators = [] # Detect programming language language = self._detect_language(poc_content) # Extract indicators by category processes = self._extract_process_indicators(poc_content, language) files = self._extract_file_indicators(poc_content, language) network = self._extract_network_indicators(poc_content, language) registry = self._extract_registry_indicators(poc_content, language) commands = self._extract_command_indicators(poc_content, language) # Extract encoded/obfuscated content decoded_content = self._extract_encoded_content(poc_content) if decoded_content: # Recursively analyze decoded content for content in decoded_content: sub_analysis = self.analyze_poc(content) processes.extend(sub_analysis['processes']) files.extend(sub_analysis['files']) network.extend(sub_analysis['network']) registry.extend(sub_analysis['registry']) commands.extend(sub_analysis['commands']) # Behavioral analysis behaviors = self._analyze_attack_behaviors(poc_content, language) # MITRE ATT&CK technique mapping techniques = self._map_to_mitre_attack( processes + files + network + registry + commands ) # Quality assessment analysis_quality = self._assess_analysis_quality(poc_content) return { 'language': language, 'processes': self._deduplicate_and_rank(processes), 'files': self._deduplicate_and_rank(files), 'network': self._deduplicate_and_rank(network), 'registry': self._deduplicate_and_rank(registry), 'commands': self._deduplicate_and_rank(commands), 'behaviors': behaviors, 'mitre_techniques': techniques, 'quality_score': analysis_quality, 'total_indicators': len(self.indicators), 'high_confidence_indicators': len([i for i in self.indicators if i.confidence > 0.7]) } def _detect_language(self, content: str) -> str: """Detect the primary programming language of the PoC.""" language_indicators = { 'powershell': [ r'\$[a-zA-Z_][a-zA-Z0-9_]*', r'Get-\w+', r'Set-\w+', r'New-\w+', r'Invoke-\w+', r'Add-Type', r'\[System\.\w+\]' ], 'python': [ r'import\s+\w+', r'from\s+\w+\s+import', r'def\s+\w+\(', r'subprocess\.', r'os\.system', r'__name__\s*==\s*["\']__main__["\']' ], 'bash': [ r'#!/bin/bash', r'#!/bin/sh', r'\$\{[^}]+\}', r'chmod\s+\+x', r'wget\s+', r'curl\s+', r'echo\s+.*\|' ], 'batch': [ r'@echo\s+off', r'%[^%]+%', r'goto\s+\w+', r'if\s+exist', r'cmd\s*/c', r'start\s+' ], 'c_cpp': [ r'#include\s*<[^>]+>', r'int\s+main\s*\(', r'printf\s*\(', r'malloc\s*\(', r'free\s*\(', r'system\s*\(' ], 'csharp': [ r'using\s+System', r'namespace\s+\w+', r'class\s+\w+', r'Process\.Start', r'Registry\.', r'new\s+ProcessStartInfo' ], 'javascript': [ r'function\s+\w+\s*\(', r'var\s+\w+\s*=', r'console\.log', r'require\s*\(', r'=>', r'new\s+XMLHttpRequest' ], 'php': [ r'<\?php', r'\$[a-zA-Z_][a-zA-Z0-9_]*', r'echo\s+', r'exec\s*\(', r'system\s*\(', r'shell_exec' ] } scores = {} content_lower = content.lower() for lang, patterns in language_indicators.items(): score = 0 for pattern in patterns: matches = len(re.findall(pattern, content, re.IGNORECASE | re.MULTILINE)) score += matches scores[lang] = score if not scores or max(scores.values()) == 0: return 'unknown' return max(scores, key=scores.get) def _extract_process_indicators(self, content: str, language: str) -> List[SecurityIndicator]: """Extract process execution indicators.""" indicators = [] patterns = { 'powershell': [ r'Start-Process\s+["\']?([^"\';\s]+)', r'Invoke-Expression\s+["\']?([^"\';\s]+)', r'&\s+["\']?([^"\';\s]+\.exe)', r'\.\s+["\']?([^"\';\s]+\.exe)', r'Invoke-Command\s+[^}]*ScriptBlock\s*=\s*["\']([^"\']+)', r'powershell\.exe\s+[^"\']*["\']([^"\']+)' ], 'python': [ r'subprocess\.call\(\s*["\']([^"\']+)', r'subprocess\.Popen\(\s*["\']([^"\']+)', r'subprocess\.run\(\s*["\']([^"\']+)', r'subprocess\.check_output\(\s*["\']([^"\']+)', r'subprocess\.check_call\(\s*["\']([^"\']+)', r'subprocess\.getoutput\(\s*["\']([^"\']+)', r'subprocess\.getstatusoutput\(\s*["\']([^"\']+)', r'os\.system\(\s*["\']([^"\']+)', r'os\.exec[vl]?p?\(\s*["\']([^"\']+)', r'os\.spawn[vl]?p?\(\s*[^,]*,\s*["\']([^"\']+)', r'os\.popen\(\s*["\']([^"\']+)', r'commands\.getoutput\(\s*["\']([^"\']+)', r'commands\.getstatusoutput\(\s*["\']([^"\']+)', r'pexpect\.spawn\(\s*["\']([^"\']+)', r'pexpect\.run\(\s*["\']([^"\']+)', r'multiprocessing\.Process\([^)]*target[^,]*,\s*["\']([^"\']+)', r'threading\.Thread\([^)]*target[^,]*,\s*["\']([^"\']+)', r'eval\(\s*["\']([^"\']+)', r'exec\(\s*["\']([^"\']+)', r'compile\(\s*["\']([^"\']+)', r'__import__\(\s*["\']([^"\']+)', r'importlib\.import_module\(\s*["\']([^"\']+)', r'ctypes\.windll\.', r'ctypes\.cdll\.', r'win32api\.', r'win32process\.CreateProcess' ], 'bash': [ r'exec\s+([^;\s&|]+)', r'/bin/sh\s+-c\s+["\']([^"\']+)', r'system\(\s*["\']([^"\']+)', r'bash\s+-c\s+["\']([^"\']+)', r'\$\(([^)]+)\)' # Command substitution ], 'batch': [ r'start\s+["\']?([^"\';\s]+)', r'cmd\s*/c\s+["\']?([^"\']+)', r'call\s+["\']?([^"\';\s]+)', r'%COMSPEC%\s+[^"\']*["\']([^"\']+)' ], 'c_cpp': [ r'system\(\s*["\']([^"\']+)', r'execve?\(\s*["\']([^"\']+)', r'CreateProcess[AW]?\([^,]*["\']([^"\']+)', r'WinExec\(\s*["\']([^"\']+)', r'ShellExecute[AW]?\([^,]*["\']([^"\']+)' ], 'csharp': [ r'Process\.Start\(\s*["\']([^"\']+)', r'ProcessStartInfo.*FileName\s*=\s*["\']([^"\']+)', r'new\s+Process.*["\']([^"\']+)', r'Process\.Start\(\s*new\s+ProcessStartInfo[^}]*FileName\s*=\s*["\']([^"\']+)' ] } if language in patterns: for pattern in patterns[language]: matches = re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE) for match in matches: process_name = match.group(1) context = self._get_context(content, match.start(), match.end()) confidence = self._calculate_confidence(process_name, 'process', context) if confidence > 0.3: # Filter low confidence matches indicators.append(SecurityIndicator( type='process', value=process_name, confidence=confidence, context=context, attack_technique=AttackTechnique.PROCESS_INJECTION if 'inject' in context.lower() else AttackTechnique.COMMAND_EXECUTION )) return indicators def _extract_file_indicators(self, content: str, language: str) -> List[SecurityIndicator]: """Extract file system indicators.""" indicators = [] # File path patterns file_patterns = [ r'["\']([a-zA-Z]:\\[^"\'<>|*?]+\.[a-zA-Z0-9]+)["\']', # Windows paths r'["\']([/][^"\'<>|*?\s]+\.[a-zA-Z0-9]+)["\']', # Unix paths r'["\'](\./[^"\'<>|*?\s]+\.[a-zA-Z0-9]+)["\']', # Relative paths r'%TEMP%\\([^"\'<>|*?\s]+\.[a-zA-Z0-9]+)', # Windows temp r'/tmp/([^"\'<>|*?\s]+\.[a-zA-Z0-9]+)', # Unix temp ] # Language-specific file operations operation_patterns = { 'powershell': [ r'New-Item.*Path.*["\']([^"\']+)["\']', r'Out-File.*["\']([^"\']+)["\']', r'Set-Content.*["\']([^"\']+)["\']', r'\|\s*Out-File\s+["\']([^"\']+)["\']' ], 'python': [ r'open\(\s*["\']([^"\']+)["\']', r'with\s+open\(\s*["\']([^"\']+)["\']', r'shutil\.copy.*["\']([^"\']+)["\']', r'shutil\.copyfile\(\s*[^,]*,\s*["\']([^"\']+)["\']', r'shutil\.move\(\s*[^,]*,\s*["\']([^"\']+)["\']', r'shutil\.copytree\(\s*[^,]*,\s*["\']([^"\']+)["\']', r'os\.rename\(\s*[^,]*,\s*["\']([^"\']+)["\']', r'os\.remove\(\s*["\']([^"\']+)["\']', r'os\.unlink\(\s*["\']([^"\']+)["\']', r'os\.rmdir\(\s*["\']([^"\']+)["\']', r'os\.makedirs\(\s*["\']([^"\']+)["\']', r'os\.mkdir\(\s*["\']([^"\']+)["\']', r'os\.path\.join\([^)]*["\']([^"\']+)["\']', r'pathlib\.Path\(\s*["\']([^"\']+)["\']', r'tempfile\.mktemp\(\s*[^)]*["\']([^"\']+)["\']', r'tempfile\.NamedTemporaryFile\([^)]*dir\s*=\s*["\']([^"\']+)["\']', r'io\.open\(\s*["\']([^"\']+)["\']', r'codecs\.open\(\s*["\']([^"\']+)["\']', r'pickle\.load\(\s*["\']([^"\']+)["\']', r'pickle\.dump\([^,]*,\s*["\']([^"\']+)["\']', r'json\.load\(\s*["\']([^"\']+)["\']', r'json\.dump\([^,]*,\s*["\']([^"\']+)["\']', r'zipfile\.ZipFile\(\s*["\']([^"\']+)["\']', r'tarfile\.open\(\s*["\']([^"\']+)["\']', r'gzip\.open\(\s*["\']([^"\']+)["\']', r'bz2\.open\(\s*["\']([^"\']+)["\']' ], 'bash': [ r'touch\s+["\']?([^"\';\s]+)', r'cp\s+[^"\';\s]+\s+["\']?([^"\';\s]+)', r'mv\s+[^"\';\s]+\s+["\']?([^"\';\s]+)', r'echo.*>\s*["\']?([^"\';\s]+)' ], 'c_cpp': [ r'fopen\(\s*["\']([^"\']+)["\']', r'CreateFile[AW]?\([^,]*["\']([^"\']+)["\']', r'WriteFile.*["\']([^"\']+)["\']' ] } # Extract file paths for pattern in file_patterns: matches = re.finditer(pattern, content, re.IGNORECASE) for match in matches: file_path = match.group(1) context = self._get_context(content, match.start(), match.end()) confidence = self._calculate_confidence(file_path, 'file', context) if confidence > 0.4: indicators.append(SecurityIndicator( type='file', value=file_path, confidence=confidence, context=context, attack_technique=AttackTechnique.FILE_CREATION )) # Extract file operations if language in operation_patterns: for pattern in operation_patterns[language]: matches = re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE) for match in matches: file_path = match.group(1) context = self._get_context(content, match.start(), match.end()) confidence = self._calculate_confidence(file_path, 'file', context) if confidence > 0.4: indicators.append(SecurityIndicator( type='file', value=file_path, confidence=confidence, context=context, attack_technique=AttackTechnique.FILE_CREATION )) return indicators def _extract_network_indicators(self, content: str, language: str) -> List[SecurityIndicator]: """Extract network communication indicators.""" indicators = [] # Network patterns - enhanced with more comprehensive patterns network_patterns = [ r'(?:http[s]?://)([^/\s"\']+)', # URLs r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # IP addresses r':(\d{2,5})\b', # Port numbers r'Host:\s*([^\s\r\n]+)', # HTTP Host headers r'User-Agent:\s*([^\r\n]+)', # User agents r'ftp://([^/\s"\']+)', # FTP URLs r'([a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}', # Domain names r'(?:GET|POST|PUT|DELETE)\s+([^\s]+)', # HTTP methods with paths r'Content-Type:\s*([^\r\n]+)', # Content types r'Authorization:\s*([^\r\n]+)', # Auth headers ] # Language-specific network operations operation_patterns = { 'powershell': [ r'Invoke-WebRequest.*Uri.*["\']([^"\']+)["\']', r'New-Object.*WebClient.*DownloadString.*["\']([^"\']+)["\']', r'System\.Net\.Sockets\.TcpClient.*(\d+)', r'Connect.*(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*(\d+)' ], 'python': [ r'requests\.get\(\s*["\']([^"\']+)["\']', r'requests\.post\(\s*["\']([^"\']+)["\']', r'requests\.put\(\s*["\']([^"\']+)["\']', r'requests\.delete\(\s*["\']([^"\']+)["\']', r'requests\.session\(\)', # Session creation r'requests\.Session\(\)', # Session creation (capitalized) r'session\.get\(\s*["\']([^"\']+)["\']', # Session-based requests r'session\.post\(\s*["\']([^"\']+)["\']', r'session\.put\(\s*["\']([^"\']+)["\']', r'session\.delete\(\s*["\']([^"\']+)["\']', r'session\.request\(\s*["\'][^"\']+["\'],\s*["\']([^"\']+)["\']', r'urllib\.request\.urlopen\(\s*["\']([^"\']+)["\']', r'urllib\.request\.Request\(\s*["\']([^"\']+)["\']', r'urllib2\.urlopen\(\s*["\']([^"\']+)["\']', r'urllib2\.Request\(\s*["\']([^"\']+)["\']', r'socket\.connect\(\s*\(["\']([^"\']+)["\'],\s*(\d+)', r'socket\.connect\(\s*\(([^,]+),\s*(\d+)', r'http\.client\.HTTPConnection\(\s*["\']([^"\']+)["\']', r'http\.client\.HTTPSConnection\(\s*["\']([^"\']+)["\']', r'httplib\.HTTPConnection\(\s*["\']([^"\']+)["\']', r'httplib\.HTTPSConnection\(\s*["\']([^"\']+)["\']' ], 'bash': [ r'wget\s+["\']?([^"\';\s]+)', r'curl\s+["\']?([^"\';\s]+)', r'nc\s+([^\s]+)\s+(\d+)', r'netcat\s+([^\s]+)\s+(\d+)' ], 'c_cpp': [ r'connect\([^,]*inet_addr\(["\']([^"\']+)["\']', r'gethostbyname\(["\']([^"\']+)["\']', r'socket\(.*SOCK_STREAM' ] } # Extract network indicators for pattern in network_patterns: matches = re.finditer(pattern, content, re.IGNORECASE) for match in matches: network_indicator = match.group(1) if len(match.groups()) > 0 else match.group(0) context = self._get_context(content, match.start(), match.end()) confidence = self._calculate_confidence(network_indicator, 'network', context) if confidence > 0.3: indicators.append(SecurityIndicator( type='network', value=network_indicator, confidence=confidence, context=context, attack_technique=AttackTechnique.NETWORK_CONNECTION )) # Extract language-specific network operations if language in operation_patterns: for pattern in operation_patterns[language]: matches = re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE) for match in matches: # Handle different match group scenarios if len(match.groups()) > 0: network_indicator = match.group(1) if match.group(1) else match.group(0) else: network_indicator = match.group(0) context = self._get_context(content, match.start(), match.end()) # Special handling for session-based patterns if 'session' in pattern.lower(): # For session patterns, we want to capture the session usage if 'session.post' in match.group(0).lower() or 'session.get' in match.group(0).lower(): # Extract URL from session call if available if len(match.groups()) > 0 and match.group(1): network_indicator = match.group(1) else: network_indicator = "session-based-request" else: network_indicator = "requests-session" confidence = self._calculate_confidence(network_indicator, 'network', context) if confidence > 0.3: # Boost confidence for session-based attacks if 'session' in context.lower(): confidence = min(confidence + 0.2, 1.0) indicators.append(SecurityIndicator( type='network', value=network_indicator, confidence=confidence, context=context, attack_technique=AttackTechnique.NETWORK_CONNECTION )) return indicators def _extract_registry_indicators(self, content: str, language: str) -> List[SecurityIndicator]: """Extract Windows registry indicators.""" indicators = [] # Registry key patterns registry_patterns = [ r'(HKEY_[A-Z_]+\\[^"\';\s\]]+)', r'(HKLM\\[^"\';\s\]]+)', r'(HKCU\\[^"\';\s\]]+)', r'(SOFTWARE\\[^"\';\s\]]+)', r'(SYSTEM\\[^"\';\s\]]+)' ] # Language-specific registry operations operation_patterns = { 'powershell': [ r'New-ItemProperty.*Path.*["\']([^"\']+)["\']', r'Set-ItemProperty.*Path.*["\']([^"\']+)["\']', r'Get-ItemProperty.*Path.*["\']([^"\']+)["\']', r'Remove-ItemProperty.*Path.*["\']([^"\']+)["\']' ], 'batch': [ r'reg\s+add\s+["\']?([^"\';\s]+)', r'reg\s+query\s+["\']?([^"\';\s]+)', r'reg\s+delete\s+["\']?([^"\';\s]+)' ], 'c_cpp': [ r'RegCreateKey[Ex]?[AW]?.*["\']([^"\']+)["\']', r'RegSetValue[Ex]?[AW]?.*["\']([^"\']+)["\']', r'RegOpenKey[Ex]?[AW]?.*["\']([^"\']+)["\']' ], 'csharp': [ r'Registry\.[^.]+\.OpenSubKey\(["\']([^"\']+)["\']', r'RegistryKey.*["\']([^"\']+)["\']' ] } # Extract registry keys for pattern in registry_patterns: matches = re.finditer(pattern, content, re.IGNORECASE) for match in matches: reg_key = match.group(1) context = self._get_context(content, match.start(), match.end()) confidence = self._calculate_confidence(reg_key, 'registry', context) if confidence > 0.4: indicators.append(SecurityIndicator( type='registry', value=reg_key, confidence=confidence, context=context, attack_technique=AttackTechnique.REGISTRY_MODIFICATION )) return indicators def _extract_command_indicators(self, content: str, language: str) -> List[SecurityIndicator]: """Extract command-line execution indicators.""" indicators = [] # Command patterns - enhanced with Python-specific patterns command_patterns = [ r'(?:cmd|powershell|bash|sh)\s+[/-]c\s+["\']?([^"\';\n]+)', r'(?:system|exec|shell_exec)\(\s*["\']([^"\']+)["\']', r'[`]([^`]+)[`]', # Backticks r'\$\(([^)]+)\)', # Command substitution # Python-specific command execution patterns r'subprocess\.call\(\s*\[([^\]]+)\]', # subprocess.call with list r'subprocess\.Popen\(\s*\[([^\]]+)\]', # subprocess.Popen with list r'subprocess\.run\(\s*\[([^\]]+)\]', # subprocess.run with list r'os\.system\(\s*f["\']([^"\']+)["\']', # f-string commands r'os\.system\(\s*["\']([^"\']+)["\']\.format\(', # .format() commands r'os\.system\(\s*["\']([^"\']+)["\']\.%', # % formatting r'subprocess\.call\(\s*f["\']([^"\']+)["\']', # f-string subprocess r'subprocess\.Popen\(\s*f["\']([^"\']+)["\']', # f-string Popen r'pexpect\.spawn\(\s*f["\']([^"\']+)["\']', # f-string pexpect r'commands\.getoutput\(\s*f["\']([^"\']+)["\']', # f-string commands r'eval\(\s*["\']([^"\']+)["\']', # eval() calls r'exec\(\s*["\']([^"\']+)["\']', # exec() calls r'compile\(\s*["\']([^"\']+)["\']', # compile() calls r'__import__\(\s*["\']([^"\']+)["\']', # dynamic imports r'importlib\.import_module\(\s*["\']([^"\']+)["\']', # importlib r'ctypes\.windll\.kernel32\.WinExec\(', # WinExec via ctypes r'ctypes\.windll\.shell32\.ShellExecute[AW]?\(', # ShellExecute r'win32process\.CreateProcess\(', # pywin32 CreateProcess r'win32api\.ShellExecute\(', # pywin32 ShellExecute r'win32api\.WinExec\(', # pywin32 WinExec ] for pattern in command_patterns: matches = re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE) for match in matches: command = match.group(1) context = self._get_context(content, match.start(), match.end()) confidence = self._calculate_confidence(command, 'command', context) if confidence > 0.4: # Determine attack technique based on command content technique = AttackTechnique.COMMAND_EXECUTION if 'powershell' in command.lower(): technique = AttackTechnique.POWERSHELL elif any(word in command.lower() for word in ['cmd', 'bat', 'com']): technique = AttackTechnique.COMMAND_LINE indicators.append(SecurityIndicator( type='command', value=command, confidence=confidence, context=context, attack_technique=technique )) return indicators def _extract_encoded_content(self, content: str) -> List[str]: """Extract and decode obfuscated/encoded content.""" decoded_content = [] # Base64 patterns - enhanced with more Python patterns base64_patterns = [ r'["\']([A-Za-z0-9+/]{20,}={0,2})["\']', # Base64 strings r'FromBase64String\(["\']([^"\']+)["\']', # PowerShell r'base64\.b64decode\(["\']([^"\']+)["\']', # Python r'base64\.b64encode\(["\']([^"\']+)["\']', # Python encode r'base64\.standard_b64decode\(["\']([^"\']+)["\']', # Python standard r'base64\.urlsafe_b64decode\(["\']([^"\']+)["\']', # Python URL-safe r'base64\.decodebytes\(["\']([^"\']+)["\']', # Python 3 r'base64\.encodebytes\(["\']([^"\']+)["\']', # Python 3 r'codecs\.decode\(["\']([^"\']+)["\'],\s*["\']base64["\']', # codecs r'codecs\.encode\(["\']([^"\']+)["\'],\s*["\']base64["\']', # codecs r'\.decode\(["\']base64["\']', # .decode('base64') r'\.encode\(["\']base64["\']', # .encode('base64') ] for pattern in base64_patterns: matches = re.finditer(pattern, content, re.IGNORECASE) for match in matches: try: encoded_str = match.group(1) if len(encoded_str) > 20: # Only decode substantial content decoded = base64.b64decode(encoded_str + '===').decode('utf-8', errors='ignore') if decoded and len(decoded) > 10: decoded_content.append(decoded) except: continue # Hex patterns - enhanced with Python-specific patterns hex_patterns = [ r'0x([0-9a-fA-F]{20,})', r'["\']([0-9a-fA-F]{20,})["\']', r'bytes\.fromhex\(["\']([0-9a-fA-F]+)["\']', # Python bytes.fromhex r'binascii\.hexlify\(["\']([^"\']+)["\']', # Python binascii r'binascii\.unhexlify\(["\']([0-9a-fA-F]+)["\']', # Python binascii r'codecs\.decode\(["\']([0-9a-fA-F]+)["\'],\s*["\']hex["\']', # codecs hex r'codecs\.encode\(["\']([^"\']+)["\'],\s*["\']hex["\']', # codecs hex r'\.decode\(["\']hex["\']', # .decode('hex') r'\.encode\(["\']hex["\']', # .encode('hex') ] # Additional Python encoding patterns other_encoding_patterns = [ r'codecs\.decode\(["\']([^"\']+)["\'],\s*["\']rot13["\']', # ROT13 r'codecs\.encode\(["\']([^"\']+)["\'],\s*["\']rot13["\']', # ROT13 r'\.decode\(["\']utf-8["\']', # UTF-8 decode r'\.encode\(["\']utf-8["\']', # UTF-8 encode r'\.decode\(["\']ascii["\']', # ASCII decode r'\.encode\(["\']ascii["\']', # ASCII encode r'urllib\.parse\.quote\(["\']([^"\']+)["\']', # URL encoding r'urllib\.parse\.unquote\(["\']([^"\']+)["\']', # URL decoding r'urllib\.parse\.quote_plus\(["\']([^"\']+)["\']', # URL encoding r'urllib\.parse\.unquote_plus\(["\']([^"\']+)["\']', # URL decoding r'html\.escape\(["\']([^"\']+)["\']', # HTML escape r'html\.unescape\(["\']([^"\']+)["\']', # HTML unescape r'json\.dumps\(["\']([^"\']+)["\']', # JSON encoding r'json\.loads\(["\']([^"\']+)["\']', # JSON decoding r'pickle\.dumps\(["\']([^"\']+)["\']', # Pickle serialization r'pickle\.loads\(["\']([^"\']+)["\']', # Pickle deserialization r'zlib\.compress\(["\']([^"\']+)["\']', # Zlib compression r'zlib\.decompress\(["\']([^"\']+)["\']', # Zlib decompression r'gzip\.compress\(["\']([^"\']+)["\']', # Gzip compression r'gzip\.decompress\(["\']([^"\']+)["\']', # Gzip decompression ] for pattern in hex_patterns: matches = re.finditer(pattern, content) for match in matches: try: hex_str = match.group(1) if len(hex_str) % 2 == 0 and len(hex_str) > 20: decoded = binascii.unhexlify(hex_str).decode('utf-8', errors='ignore') if decoded and len(decoded) > 10: decoded_content.append(decoded) except: continue # Process additional encoding patterns for pattern in other_encoding_patterns: matches = re.finditer(pattern, content, re.IGNORECASE) for match in matches: try: if len(match.groups()) > 0: encoded_str = match.group(1) if len(encoded_str) > 10: # Only process substantial content # For now, just add the pattern as an indicator # Real decoding would depend on the specific encoding decoded_content.append(f"encoded_content: {encoded_str[:50]}...") except: continue return decoded_content def _calculate_confidence(self, indicator: str, indicator_type: str, context: str) -> float: """Calculate confidence score for an indicator.""" confidence = 0.5 # Base confidence # Length and complexity scoring if len(indicator) > 5: confidence += 0.1 if len(indicator) > 20: confidence += 0.1 # Context-based scoring - enhanced with Python-specific keywords high_confidence_keywords = [ 'exploit', 'payload', 'shell', 'inject', 'execute', 'run', 'attack', 'malware', 'backdoor', 'trojan', 'virus', # Python-specific exploit keywords 'subprocess', 'popen', 'system', 'exec', 'eval', 'compile', 'import', 'ctypes', 'win32api', 'win32process', 'pexpect', 'base64', 'decode', 'encode', 'pickle', 'marshal', 'requests', 'urllib', 'socket', 'connect', 'bind', 'reverse', 'shell', 'backdoor', 'persistence', 'privilege', 'escalation', 'bypass', 'evasion', 'obfuscation' ] context_lower = context.lower() for keyword in high_confidence_keywords: if keyword in context_lower: confidence += 0.1 break # Type-specific scoring - enhanced for Python if indicator_type == 'process': if indicator.endswith('.exe') or indicator.endswith('.dll'): confidence += 0.2 if any(word in indicator.lower() for word in ['cmd', 'powershell', 'bash', 'sh']): confidence += 0.1 # Python-specific process indicators if any(word in indicator.lower() for word in ['python', 'py', 'subprocess', 'popen']): confidence += 0.15 if any(word in indicator.lower() for word in ['eval', 'exec', 'compile', 'import']): confidence += 0.2 elif indicator_type == 'file': if any(ext in indicator.lower() for ext in ['.exe', '.dll', '.bat', '.ps1', '.sh']): confidence += 0.2 if any(path in indicator.lower() for path in ['temp', 'tmp', 'appdata']): confidence += 0.1 # Python-specific file indicators if any(ext in indicator.lower() for ext in ['.py', '.pyc', '.pyo', '.pyd']): confidence += 0.15 if any(path in indicator.lower() for path in ['__pycache__', '.python', 'site-packages']): confidence += 0.1 elif indicator_type == 'network': if re.match(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', indicator): confidence += 0.2 if any(tld in indicator.lower() for tld in ['.com', '.net', '.org', '.ru', '.cn']): confidence += 0.1 # Python-specific network indicators if any(word in indicator.lower() for word in ['requests', 'urllib', 'session', 'socket']): confidence += 0.15 if 'session' in indicator.lower(): confidence += 0.1 elif indicator_type == 'command': # Python-specific command indicators if any(word in indicator.lower() for word in ['python', 'py', 'subprocess', 'os.system']): confidence += 0.15 if any(word in indicator.lower() for word in ['eval', 'exec', 'compile', 'import']): confidence += 0.2 if any(word in indicator.lower() for word in ['base64', 'decode', 'encode', 'pickle']): confidence += 0.1 # Apply false positive filters if self._is_false_positive(indicator, indicator_type): confidence *= 0.3 return min(confidence, 1.0) def _is_false_positive(self, indicator: str, indicator_type: str) -> bool: """Check if indicator is likely a false positive.""" if indicator_type in self.false_positive_filters: fp_patterns = self.false_positive_filters[indicator_type] for pattern in fp_patterns: if re.search(pattern, indicator, re.IGNORECASE): return True return False def _get_context(self, content: str, start: int, end: int, window: int = 100) -> str: """Get context around a match.""" context_start = max(0, start - window) context_end = min(len(content), end + window) return content[context_start:context_end].strip() def _deduplicate_and_rank(self, indicators: List[SecurityIndicator]) -> List[Dict]: """Remove duplicates and rank indicators by confidence.""" # Deduplicate by value seen = set() unique_indicators = [] for indicator in sorted(indicators, key=lambda x: x.confidence, reverse=True): if indicator.value not in seen: seen.add(indicator.value) unique_indicators.append(indicator) # Convert to dict format and return top indicators return [ { 'value': ind.value, 'confidence': round(ind.confidence, 2), 'context': ind.context[:200] + '...' if len(ind.context) > 200 else ind.context, 'attack_technique': ind.attack_technique.value if ind.attack_technique else None } for ind in unique_indicators[:10] # Top 10 indicators ] def _analyze_attack_behaviors(self, content: str, language: str) -> List[Dict]: """Analyze attack behaviors and patterns.""" behaviors = [] behavior_patterns = { 'persistence': [ r'(?:startup|autorun|registry.*run)', r'(?:scheduled.*task|cron|at\s+\d)', r'(?:service.*create|sc.*create)' ], 'defense_evasion': [ r'(?:disable.*antivirus|kill.*av)', r'(?:encode|encrypt|obfuscat)', r'(?:hide|stealth|invisible)' ], 'credential_access': [ r'(?:password|credential|token)', r'(?:keylog|steal.*key)', r'(?:mimikatz|lsass)' ], 'lateral_movement': [ r'(?:psexec|wmi.*exec|remote.*exec)', r'(?:net\s+use|mount|smb)', r'(?:ssh|rdp|vnc)' ], 'exfiltration': [ r'(?:upload|ftp|http.*post)', r'(?:compress|zip|archive)', r'(?:steal|exfil|extract)' ] } content_lower = content.lower() for behavior, patterns in behavior_patterns.items(): score = 0 matches = [] for pattern in patterns: pattern_matches = re.findall(pattern, content_lower) if pattern_matches: score += len(pattern_matches) matches.extend(pattern_matches) if score > 0: behaviors.append({ 'behavior': behavior, 'confidence': min(score * 0.2, 1.0), 'indicators': matches[:5] # Top 5 matches }) return sorted(behaviors, key=lambda x: x['confidence'], reverse=True) def _map_to_mitre_attack(self, indicators: List[SecurityIndicator]) -> List[str]: """Map indicators to MITRE ATT&CK techniques.""" techniques = set() for indicator in indicators: if indicator.attack_technique: techniques.add(indicator.attack_technique.value) return sorted(list(techniques)) def _assess_analysis_quality(self, content: str) -> Dict[str, any]: """Assess the quality and completeness of the analysis.""" # Content metrics lines = len(content.split('\n')) chars = len(content) # Indicator density total_indicators = len(self.indicators) high_conf_indicators = len([i for i in self.indicators if i.confidence > 0.7]) # Calculate quality score content_score = min(lines / 50, 1.0) * 0.3 # More lines = better indicator_score = min(total_indicators / 20, 1.0) * 0.4 # More indicators = better confidence_score = (high_conf_indicators / max(total_indicators, 1)) * 0.3 # Higher confidence = better overall_score = content_score + indicator_score + confidence_score return { 'overall_score': round(overall_score, 2), 'content_lines': lines, 'content_chars': chars, 'total_indicators': total_indicators, 'high_confidence_indicators': high_conf_indicators, 'recommendation': self._get_quality_recommendation(overall_score) } def _get_quality_recommendation(self, score: float) -> str: """Get recommendation based on quality score.""" if score >= 0.8: return "High quality PoC with excellent indicator extraction" elif score >= 0.6: return "Good quality PoC with adequate indicators" elif score >= 0.4: return "Moderate quality PoC, may need additional analysis" else: return "Low quality PoC, limited indicators extracted" def _initialize_language_patterns(self) -> Dict: """Initialize language-specific patterns.""" return { # Patterns for different languages will be expanded } def _initialize_attack_patterns(self) -> Dict: """Initialize attack pattern recognition.""" return { # Attack patterns will be expanded } def _initialize_fp_filters(self) -> Dict: """Initialize false positive filters.""" return { 'process': [ r'^(explorer|notepad|calc|windir|system32)\.exe$', r'^[a-z]$', # Single characters r'^\d+$', # Pure numbers # Python-specific false positives r'^(print|len|str|int|float|list|dict|tuple|set)$', # Built-in functions r'^(import|from|def|class|if|else|elif|for|while|try|except)$', # Keywords r'^(sys|os|re|json|time|datetime|random|math)$', # Common modules ], 'file': [ r'^[a-z]$', r'^\d+$', r'^(con|aux|prn|nul)$', # Python-specific false positives r'^(sys|os|re|json|time|datetime|random|math)\.py$', # Common modules r'^__init__\.py$', # Python package files r'^setup\.py$', # Python setup files r'^test.*\.py$', # Test files r'^.*_test\.py$', # Test files ], 'network': [ r'^(localhost|127\.0\.0\.1|0\.0\.0\.0)$', r'^\d{1,2}$', # Port numbers without context r'^(example\.com|test\.com|localhost)$', # Python-specific false positives r'^(requests|urllib|socket|http)$', # Module names without context r'^(session|connection|client|server)$', # Generic terms r'^(get|post|put|delete|head|options)$', # HTTP methods without context ], 'command': [ r'^[a-z]$', r'^\d+$', # Python-specific false positives r'^(print|len|str|int|float|list|dict|tuple|set)$', # Built-in functions r'^(import|from|def|class|if|else|elif|for|while|try|except)$', # Keywords r'^(help|dir|type|vars|globals|locals)$', # Introspection functions ] } # Example usage if __name__ == "__main__": analyzer = PoCAnalyzer() # Example PoC content sample_poc = """ import subprocess import base64 # CVE-2024-1234 exploit payload = base64.b64decode("Y21kIC9jIGVjaG8gSGVsbG8gV29ybGQ=") subprocess.call("powershell.exe -enc " + payload.decode(), shell=True) # Create persistence with open("C:\\temp\\malware.exe", "wb") as f: f.write(malicious_bytes) # Network connection import socket s = socket.socket() s.connect(("192.168.1.100", 4444)) """ result = analyzer.analyze_poc(sample_poc, "CVE-2024-1234") print(f"Analysis result: {result}")