diff --git a/backend/enhanced_sigma_generator.py b/backend/enhanced_sigma_generator.py index 19393d5..f763a4d 100644 --- a/backend/enhanced_sigma_generator.py +++ b/backend/enhanced_sigma_generator.py @@ -1,6 +1,8 @@ """ Enhanced SIGMA Rule Generator -Generates improved SIGMA rules using nomi-sec PoC data and traditional indicators +Generates improved SIGMA rules using a hybrid approach: +1. Generate YAML metadata with application code +2. Use LLM to create logsource and detection sections based on PoC analysis """ import json @@ -10,6 +12,8 @@ from typing import Dict, List, Optional, Tuple from sqlalchemy.orm import Session import re from llm_client import LLMClient +from enhanced_llm_client import EnhancedLLMClient +from yaml_metadata_generator import YAMLMetadataGenerator from cve2capec_client import CVE2CAPECClient from poc_analyzer import PoCAnalyzer @@ -22,11 +26,13 @@ class EnhancedSigmaGenerator: def __init__(self, db_session: Session, llm_provider: str = None, llm_model: str = None): self.db_session = db_session - self.llm_client = LLMClient(provider=llm_provider, model=llm_model) + self.llm_client = LLMClient(provider=llm_provider, model=llm_model) # Keep for backward compatibility + self.enhanced_llm_client = EnhancedLLMClient(provider=llm_provider, model=llm_model) + self.yaml_generator = YAMLMetadataGenerator(db_session) self.cve2capec_client = CVE2CAPECClient() self.poc_analyzer = PoCAnalyzer() - async def generate_enhanced_rule(self, cve, use_llm: bool = True) -> dict: + async def generate_enhanced_rule(self, cve, use_llm: bool = True, use_hybrid: bool = True) -> dict: """Generate enhanced SIGMA rule for a CVE using PoC data""" from main import SigmaRule, RuleTemplate @@ -39,12 +45,24 @@ class EnhancedSigmaGenerator: if poc_data: best_poc = max(poc_data, key=lambda x: x.get('quality_analysis', {}).get('quality_score', 0)) - # Try LLM-enhanced generation first if enabled and available + # Try hybrid approach first if enabled and available rule_content = None generation_method = "template" template = None - if use_llm and self.llm_client.is_available() and best_poc: + if use_hybrid and self.enhanced_llm_client.is_available() and best_poc: + logger.info(f"Attempting hybrid rule generation for {cve.cve_id} using {self.enhanced_llm_client.provider}") + rule_content = await self._generate_hybrid_rule(cve, best_poc, poc_data) + if rule_content: + generation_method = f"hybrid_{self.enhanced_llm_client.provider}" + # Create a dummy template object for hybrid-generated rules + class HybridTemplate: + def __init__(self, provider_name): + self.template_name = f"Hybrid Generated ({provider_name})" + template = HybridTemplate(self.enhanced_llm_client.provider) + + # Fallback to original LLM-enhanced generation + elif use_llm and self.llm_client.is_available() and best_poc: logger.info(f"Attempting LLM-enhanced rule generation for {cve.cve_id} using {self.llm_client.provider}") rule_content = await self._generate_llm_enhanced_rule(cve, best_poc, poc_data) if rule_content: @@ -127,6 +145,49 @@ class EnhancedSigmaGenerator: logger.error(f"Error generating enhanced rule for {cve.cve_id}: {e}") return {'success': False, 'error': str(e)} + async def _generate_hybrid_rule(self, cve, best_poc: dict, poc_data: list) -> Optional[str]: + """Generate SIGMA rule using hybrid approach: metadata + LLM detection.""" + try: + # Step 1: Generate YAML metadata using application code + logger.info(f"Generating YAML metadata for {cve.cve_id}") + yaml_metadata = self.yaml_generator.generate_metadata(cve, poc_data) + + # Step 2: Analyze PoC content with PoCAnalyzer + logger.info(f"Analyzing PoC content for {cve.cve_id}") + poc_content = await self._extract_poc_content(best_poc) + if not poc_content: + logger.warning(f"No PoC content available for {cve.cve_id}") + return None + + poc_analysis = self.poc_analyzer.analyze_poc(poc_content, cve.cve_id) + + # Step 3: Generate detection sections using LLM + logger.info(f"Generating detection sections for {cve.cve_id}") + detection_sections = await self.enhanced_llm_client.generate_detection_sections( + yaml_metadata, poc_analysis, cve.cve_id + ) + + if not detection_sections: + logger.warning(f"Failed to generate detection sections for {cve.cve_id}") + return None + + # Step 4: Combine metadata with detection sections + logger.info(f"Combining YAML sections for {cve.cve_id}") + complete_rule = self.enhanced_llm_client.combine_yaml_sections( + yaml_metadata, detection_sections + ) + + if complete_rule: + logger.info(f"Successfully generated hybrid rule for {cve.cve_id}") + return complete_rule + else: + logger.warning(f"Failed to combine YAML sections for {cve.cve_id}") + return None + + except Exception as e: + logger.error(f"Error generating hybrid rule for {cve.cve_id}: {e}") + return None + async def _generate_llm_enhanced_rule(self, cve, best_poc: dict, poc_data: list) -> Optional[str]: """Generate SIGMA rule using LLM API with PoC analysis""" try: diff --git a/backend/poc_analyzer.py b/backend/poc_analyzer.py index d6b7a47..2cdb949 100755 --- a/backend/poc_analyzer.py +++ b/backend/poc_analyzer.py @@ -166,33 +166,63 @@ class PoCAnalyzer: r'Start-Process\s+["\']?([^"\';\s]+)', r'Invoke-Expression\s+["\']?([^"\';\s]+)', r'&\s+["\']?([^"\';\s]+\.exe)', - r'\.\s+["\']?([^"\';\s]+\.exe)' + r'\.\s+["\']?([^"\';\s]+\.exe)', + r'Invoke-Command\s+[^}]*ScriptBlock\s*=\s*["\']([^"\']+)', + r'powershell\.exe\s+[^"\']*["\']([^"\']+)' ], 'python': [ r'subprocess\.call\(\s*["\']([^"\']+)', r'subprocess\.Popen\(\s*["\']([^"\']+)', + r'subprocess\.run\(\s*["\']([^"\']+)', + r'subprocess\.check_output\(\s*["\']([^"\']+)', + r'subprocess\.check_call\(\s*["\']([^"\']+)', + r'subprocess\.getoutput\(\s*["\']([^"\']+)', + r'subprocess\.getstatusoutput\(\s*["\']([^"\']+)', r'os\.system\(\s*["\']([^"\']+)', - r'os\.exec[vl]?p?\(\s*["\']([^"\']+)' + r'os\.exec[vl]?p?\(\s*["\']([^"\']+)', + r'os\.spawn[vl]?p?\(\s*[^,]*,\s*["\']([^"\']+)', + r'os\.popen\(\s*["\']([^"\']+)', + r'commands\.getoutput\(\s*["\']([^"\']+)', + r'commands\.getstatusoutput\(\s*["\']([^"\']+)', + r'pexpect\.spawn\(\s*["\']([^"\']+)', + r'pexpect\.run\(\s*["\']([^"\']+)', + r'multiprocessing\.Process\([^)]*target[^,]*,\s*["\']([^"\']+)', + r'threading\.Thread\([^)]*target[^,]*,\s*["\']([^"\']+)', + r'eval\(\s*["\']([^"\']+)', + r'exec\(\s*["\']([^"\']+)', + r'compile\(\s*["\']([^"\']+)', + r'__import__\(\s*["\']([^"\']+)', + r'importlib\.import_module\(\s*["\']([^"\']+)', + r'ctypes\.windll\.', + r'ctypes\.cdll\.', + r'win32api\.', + r'win32process\.CreateProcess' ], 'bash': [ r'exec\s+([^;\s&|]+)', r'/bin/sh\s+-c\s+["\']([^"\']+)', - r'system\(\s*["\']([^"\']+)' + r'system\(\s*["\']([^"\']+)', + r'bash\s+-c\s+["\']([^"\']+)', + r'\$\(([^)]+)\)' # Command substitution ], 'batch': [ r'start\s+["\']?([^"\';\s]+)', r'cmd\s*/c\s+["\']?([^"\']+)', - r'call\s+["\']?([^"\';\s]+)' + r'call\s+["\']?([^"\';\s]+)', + r'%COMSPEC%\s+[^"\']*["\']([^"\']+)' ], 'c_cpp': [ r'system\(\s*["\']([^"\']+)', r'execve?\(\s*["\']([^"\']+)', - r'CreateProcess[AW]?\([^,]*["\']([^"\']+)' + r'CreateProcess[AW]?\([^,]*["\']([^"\']+)', + r'WinExec\(\s*["\']([^"\']+)', + r'ShellExecute[AW]?\([^,]*["\']([^"\']+)' ], 'csharp': [ r'Process\.Start\(\s*["\']([^"\']+)', r'ProcessStartInfo.*FileName\s*=\s*["\']([^"\']+)', - r'new\s+Process.*["\']([^"\']+)' + r'new\s+Process.*["\']([^"\']+)', + r'Process\.Start\(\s*new\s+ProcessStartInfo[^}]*FileName\s*=\s*["\']([^"\']+)' ] } @@ -239,7 +269,30 @@ class PoCAnalyzer: 'python': [ r'open\(\s*["\']([^"\']+)["\']', r'with\s+open\(\s*["\']([^"\']+)["\']', - r'shutil\.copy.*["\']([^"\']+)["\']' + r'shutil\.copy.*["\']([^"\']+)["\']', + r'shutil\.copyfile\(\s*[^,]*,\s*["\']([^"\']+)["\']', + r'shutil\.move\(\s*[^,]*,\s*["\']([^"\']+)["\']', + r'shutil\.copytree\(\s*[^,]*,\s*["\']([^"\']+)["\']', + r'os\.rename\(\s*[^,]*,\s*["\']([^"\']+)["\']', + r'os\.remove\(\s*["\']([^"\']+)["\']', + r'os\.unlink\(\s*["\']([^"\']+)["\']', + r'os\.rmdir\(\s*["\']([^"\']+)["\']', + r'os\.makedirs\(\s*["\']([^"\']+)["\']', + r'os\.mkdir\(\s*["\']([^"\']+)["\']', + r'os\.path\.join\([^)]*["\']([^"\']+)["\']', + r'pathlib\.Path\(\s*["\']([^"\']+)["\']', + r'tempfile\.mktemp\(\s*[^)]*["\']([^"\']+)["\']', + r'tempfile\.NamedTemporaryFile\([^)]*dir\s*=\s*["\']([^"\']+)["\']', + r'io\.open\(\s*["\']([^"\']+)["\']', + r'codecs\.open\(\s*["\']([^"\']+)["\']', + r'pickle\.load\(\s*["\']([^"\']+)["\']', + r'pickle\.dump\([^,]*,\s*["\']([^"\']+)["\']', + r'json\.load\(\s*["\']([^"\']+)["\']', + r'json\.dump\([^,]*,\s*["\']([^"\']+)["\']', + r'zipfile\.ZipFile\(\s*["\']([^"\']+)["\']', + r'tarfile\.open\(\s*["\']([^"\']+)["\']', + r'gzip\.open\(\s*["\']([^"\']+)["\']', + r'bz2\.open\(\s*["\']([^"\']+)["\']' ], 'bash': [ r'touch\s+["\']?([^"\';\s]+)', @@ -295,13 +348,18 @@ class PoCAnalyzer: """Extract network communication indicators.""" indicators = [] - # Network patterns + # Network patterns - enhanced with more comprehensive patterns network_patterns = [ r'(?:http[s]?://)([^/\s"\']+)', # URLs r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # IP addresses r':(\d{2,5})\b', # Port numbers r'Host:\s*([^\s\r\n]+)', # HTTP Host headers r'User-Agent:\s*([^\r\n]+)', # User agents + r'ftp://([^/\s"\']+)', # FTP URLs + r'([a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}', # Domain names + r'(?:GET|POST|PUT|DELETE)\s+([^\s]+)', # HTTP methods with paths + r'Content-Type:\s*([^\r\n]+)', # Content types + r'Authorization:\s*([^\r\n]+)', # Auth headers ] # Language-specific network operations @@ -314,9 +372,26 @@ class PoCAnalyzer: ], 'python': [ r'requests\.get\(\s*["\']([^"\']+)["\']', + r'requests\.post\(\s*["\']([^"\']+)["\']', + r'requests\.put\(\s*["\']([^"\']+)["\']', + r'requests\.delete\(\s*["\']([^"\']+)["\']', + r'requests\.session\(\)', # Session creation + r'requests\.Session\(\)', # Session creation (capitalized) + r'session\.get\(\s*["\']([^"\']+)["\']', # Session-based requests + r'session\.post\(\s*["\']([^"\']+)["\']', + r'session\.put\(\s*["\']([^"\']+)["\']', + r'session\.delete\(\s*["\']([^"\']+)["\']', + r'session\.request\(\s*["\'][^"\']+["\'],\s*["\']([^"\']+)["\']', r'urllib\.request\.urlopen\(\s*["\']([^"\']+)["\']', + r'urllib\.request\.Request\(\s*["\']([^"\']+)["\']', + r'urllib2\.urlopen\(\s*["\']([^"\']+)["\']', + r'urllib2\.Request\(\s*["\']([^"\']+)["\']', r'socket\.connect\(\s*\(["\']([^"\']+)["\'],\s*(\d+)', - r'http\.client\.HTTPConnection\(\s*["\']([^"\']+)["\']' + r'socket\.connect\(\s*\(([^,]+),\s*(\d+)', + r'http\.client\.HTTPConnection\(\s*["\']([^"\']+)["\']', + r'http\.client\.HTTPSConnection\(\s*["\']([^"\']+)["\']', + r'httplib\.HTTPConnection\(\s*["\']([^"\']+)["\']', + r'httplib\.HTTPSConnection\(\s*["\']([^"\']+)["\']' ], 'bash': [ r'wget\s+["\']?([^"\';\s]+)', @@ -348,6 +423,45 @@ class PoCAnalyzer: attack_technique=AttackTechnique.NETWORK_CONNECTION )) + # Extract language-specific network operations + if language in operation_patterns: + for pattern in operation_patterns[language]: + matches = re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE) + for match in matches: + # Handle different match group scenarios + if len(match.groups()) > 0: + network_indicator = match.group(1) if match.group(1) else match.group(0) + else: + network_indicator = match.group(0) + + context = self._get_context(content, match.start(), match.end()) + + # Special handling for session-based patterns + if 'session' in pattern.lower(): + # For session patterns, we want to capture the session usage + if 'session.post' in match.group(0).lower() or 'session.get' in match.group(0).lower(): + # Extract URL from session call if available + if len(match.groups()) > 0 and match.group(1): + network_indicator = match.group(1) + else: + network_indicator = "session-based-request" + else: + network_indicator = "requests-session" + + confidence = self._calculate_confidence(network_indicator, 'network', context) + if confidence > 0.3: + # Boost confidence for session-based attacks + if 'session' in context.lower(): + confidence = min(confidence + 0.2, 1.0) + + indicators.append(SecurityIndicator( + type='network', + value=network_indicator, + confidence=confidence, + context=context, + attack_technique=AttackTechnique.NETWORK_CONNECTION + )) + return indicators def _extract_registry_indicators(self, content: str, language: str) -> List[SecurityIndicator]: @@ -410,12 +524,33 @@ class PoCAnalyzer: """Extract command-line execution indicators.""" indicators = [] - # Command patterns + # Command patterns - enhanced with Python-specific patterns command_patterns = [ r'(?:cmd|powershell|bash|sh)\s+[/-]c\s+["\']?([^"\';\n]+)', r'(?:system|exec|shell_exec)\(\s*["\']([^"\']+)["\']', r'[`]([^`]+)[`]', # Backticks r'\$\(([^)]+)\)', # Command substitution + # Python-specific command execution patterns + r'subprocess\.call\(\s*\[([^\]]+)\]', # subprocess.call with list + r'subprocess\.Popen\(\s*\[([^\]]+)\]', # subprocess.Popen with list + r'subprocess\.run\(\s*\[([^\]]+)\]', # subprocess.run with list + r'os\.system\(\s*f["\']([^"\']+)["\']', # f-string commands + r'os\.system\(\s*["\']([^"\']+)["\']\.format\(', # .format() commands + r'os\.system\(\s*["\']([^"\']+)["\']\.%', # % formatting + r'subprocess\.call\(\s*f["\']([^"\']+)["\']', # f-string subprocess + r'subprocess\.Popen\(\s*f["\']([^"\']+)["\']', # f-string Popen + r'pexpect\.spawn\(\s*f["\']([^"\']+)["\']', # f-string pexpect + r'commands\.getoutput\(\s*f["\']([^"\']+)["\']', # f-string commands + r'eval\(\s*["\']([^"\']+)["\']', # eval() calls + r'exec\(\s*["\']([^"\']+)["\']', # exec() calls + r'compile\(\s*["\']([^"\']+)["\']', # compile() calls + r'__import__\(\s*["\']([^"\']+)["\']', # dynamic imports + r'importlib\.import_module\(\s*["\']([^"\']+)["\']', # importlib + r'ctypes\.windll\.kernel32\.WinExec\(', # WinExec via ctypes + r'ctypes\.windll\.shell32\.ShellExecute[AW]?\(', # ShellExecute + r'win32process\.CreateProcess\(', # pywin32 CreateProcess + r'win32api\.ShellExecute\(', # pywin32 ShellExecute + r'win32api\.WinExec\(', # pywin32 WinExec ] for pattern in command_patterns: @@ -447,11 +582,20 @@ class PoCAnalyzer: """Extract and decode obfuscated/encoded content.""" decoded_content = [] - # Base64 patterns + # Base64 patterns - enhanced with more Python patterns base64_patterns = [ r'["\']([A-Za-z0-9+/]{20,}={0,2})["\']', # Base64 strings r'FromBase64String\(["\']([^"\']+)["\']', # PowerShell r'base64\.b64decode\(["\']([^"\']+)["\']', # Python + r'base64\.b64encode\(["\']([^"\']+)["\']', # Python encode + r'base64\.standard_b64decode\(["\']([^"\']+)["\']', # Python standard + r'base64\.urlsafe_b64decode\(["\']([^"\']+)["\']', # Python URL-safe + r'base64\.decodebytes\(["\']([^"\']+)["\']', # Python 3 + r'base64\.encodebytes\(["\']([^"\']+)["\']', # Python 3 + r'codecs\.decode\(["\']([^"\']+)["\'],\s*["\']base64["\']', # codecs + r'codecs\.encode\(["\']([^"\']+)["\'],\s*["\']base64["\']', # codecs + r'\.decode\(["\']base64["\']', # .decode('base64') + r'\.encode\(["\']base64["\']', # .encode('base64') ] for pattern in base64_patterns: @@ -466,10 +610,41 @@ class PoCAnalyzer: except: continue - # Hex patterns + # Hex patterns - enhanced with Python-specific patterns hex_patterns = [ r'0x([0-9a-fA-F]{20,})', - r'["\']([0-9a-fA-F]{20,})["\']' + r'["\']([0-9a-fA-F]{20,})["\']', + r'bytes\.fromhex\(["\']([0-9a-fA-F]+)["\']', # Python bytes.fromhex + r'binascii\.hexlify\(["\']([^"\']+)["\']', # Python binascii + r'binascii\.unhexlify\(["\']([0-9a-fA-F]+)["\']', # Python binascii + r'codecs\.decode\(["\']([0-9a-fA-F]+)["\'],\s*["\']hex["\']', # codecs hex + r'codecs\.encode\(["\']([^"\']+)["\'],\s*["\']hex["\']', # codecs hex + r'\.decode\(["\']hex["\']', # .decode('hex') + r'\.encode\(["\']hex["\']', # .encode('hex') + ] + + # Additional Python encoding patterns + other_encoding_patterns = [ + r'codecs\.decode\(["\']([^"\']+)["\'],\s*["\']rot13["\']', # ROT13 + r'codecs\.encode\(["\']([^"\']+)["\'],\s*["\']rot13["\']', # ROT13 + r'\.decode\(["\']utf-8["\']', # UTF-8 decode + r'\.encode\(["\']utf-8["\']', # UTF-8 encode + r'\.decode\(["\']ascii["\']', # ASCII decode + r'\.encode\(["\']ascii["\']', # ASCII encode + r'urllib\.parse\.quote\(["\']([^"\']+)["\']', # URL encoding + r'urllib\.parse\.unquote\(["\']([^"\']+)["\']', # URL decoding + r'urllib\.parse\.quote_plus\(["\']([^"\']+)["\']', # URL encoding + r'urllib\.parse\.unquote_plus\(["\']([^"\']+)["\']', # URL decoding + r'html\.escape\(["\']([^"\']+)["\']', # HTML escape + r'html\.unescape\(["\']([^"\']+)["\']', # HTML unescape + r'json\.dumps\(["\']([^"\']+)["\']', # JSON encoding + r'json\.loads\(["\']([^"\']+)["\']', # JSON decoding + r'pickle\.dumps\(["\']([^"\']+)["\']', # Pickle serialization + r'pickle\.loads\(["\']([^"\']+)["\']', # Pickle deserialization + r'zlib\.compress\(["\']([^"\']+)["\']', # Zlib compression + r'zlib\.decompress\(["\']([^"\']+)["\']', # Zlib decompression + r'gzip\.compress\(["\']([^"\']+)["\']', # Gzip compression + r'gzip\.decompress\(["\']([^"\']+)["\']', # Gzip decompression ] for pattern in hex_patterns: @@ -484,6 +659,20 @@ class PoCAnalyzer: except: continue + # Process additional encoding patterns + for pattern in other_encoding_patterns: + matches = re.finditer(pattern, content, re.IGNORECASE) + for match in matches: + try: + if len(match.groups()) > 0: + encoded_str = match.group(1) + if len(encoded_str) > 10: # Only process substantial content + # For now, just add the pattern as an indicator + # Real decoding would depend on the specific encoding + decoded_content.append(f"encoded_content: {encoded_str[:50]}...") + except: + continue + return decoded_content def _calculate_confidence(self, indicator: str, indicator_type: str, context: str) -> float: @@ -496,10 +685,17 @@ class PoCAnalyzer: if len(indicator) > 20: confidence += 0.1 - # Context-based scoring + # Context-based scoring - enhanced with Python-specific keywords high_confidence_keywords = [ 'exploit', 'payload', 'shell', 'inject', 'execute', 'run', - 'attack', 'malware', 'backdoor', 'trojan', 'virus' + 'attack', 'malware', 'backdoor', 'trojan', 'virus', + # Python-specific exploit keywords + 'subprocess', 'popen', 'system', 'exec', 'eval', 'compile', + 'import', 'ctypes', 'win32api', 'win32process', 'pexpect', + 'base64', 'decode', 'encode', 'pickle', 'marshal', + 'requests', 'urllib', 'socket', 'connect', 'bind', + 'reverse', 'shell', 'backdoor', 'persistence', 'privilege', + 'escalation', 'bypass', 'evasion', 'obfuscation' ] context_lower = context.lower() @@ -508,24 +704,48 @@ class PoCAnalyzer: confidence += 0.1 break - # Type-specific scoring + # Type-specific scoring - enhanced for Python if indicator_type == 'process': if indicator.endswith('.exe') or indicator.endswith('.dll'): confidence += 0.2 if any(word in indicator.lower() for word in ['cmd', 'powershell', 'bash', 'sh']): confidence += 0.1 + # Python-specific process indicators + if any(word in indicator.lower() for word in ['python', 'py', 'subprocess', 'popen']): + confidence += 0.15 + if any(word in indicator.lower() for word in ['eval', 'exec', 'compile', 'import']): + confidence += 0.2 elif indicator_type == 'file': if any(ext in indicator.lower() for ext in ['.exe', '.dll', '.bat', '.ps1', '.sh']): confidence += 0.2 if any(path in indicator.lower() for path in ['temp', 'tmp', 'appdata']): confidence += 0.1 + # Python-specific file indicators + if any(ext in indicator.lower() for ext in ['.py', '.pyc', '.pyo', '.pyd']): + confidence += 0.15 + if any(path in indicator.lower() for path in ['__pycache__', '.python', 'site-packages']): + confidence += 0.1 elif indicator_type == 'network': if re.match(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', indicator): confidence += 0.2 if any(tld in indicator.lower() for tld in ['.com', '.net', '.org', '.ru', '.cn']): confidence += 0.1 + # Python-specific network indicators + if any(word in indicator.lower() for word in ['requests', 'urllib', 'session', 'socket']): + confidence += 0.15 + if 'session' in indicator.lower(): + confidence += 0.1 + + elif indicator_type == 'command': + # Python-specific command indicators + if any(word in indicator.lower() for word in ['python', 'py', 'subprocess', 'os.system']): + confidence += 0.15 + if any(word in indicator.lower() for word in ['eval', 'exec', 'compile', 'import']): + confidence += 0.2 + if any(word in indicator.lower() for word in ['base64', 'decode', 'encode', 'pickle']): + confidence += 0.1 # Apply false positive filters if self._is_false_positive(indicator, indicator_type): @@ -692,17 +912,39 @@ class PoCAnalyzer: 'process': [ r'^(explorer|notepad|calc|windir|system32)\.exe$', r'^[a-z]$', # Single characters - r'^\d+$' # Pure numbers + r'^\d+$', # Pure numbers + # Python-specific false positives + r'^(print|len|str|int|float|list|dict|tuple|set)$', # Built-in functions + r'^(import|from|def|class|if|else|elif|for|while|try|except)$', # Keywords + r'^(sys|os|re|json|time|datetime|random|math)$', # Common modules ], 'file': [ r'^[a-z]$', r'^\d+$', - r'^(con|aux|prn|nul)$' + r'^(con|aux|prn|nul)$', + # Python-specific false positives + r'^(sys|os|re|json|time|datetime|random|math)\.py$', # Common modules + r'^__init__\.py$', # Python package files + r'^setup\.py$', # Python setup files + r'^test.*\.py$', # Test files + r'^.*_test\.py$', # Test files ], 'network': [ r'^(localhost|127\.0\.0\.1|0\.0\.0\.0)$', r'^\d{1,2}$', # Port numbers without context - r'^(example\.com|test\.com|localhost)$' + r'^(example\.com|test\.com|localhost)$', + # Python-specific false positives + r'^(requests|urllib|socket|http)$', # Module names without context + r'^(session|connection|client|server)$', # Generic terms + r'^(get|post|put|delete|head|options)$', # HTTP methods without context + ], + 'command': [ + r'^[a-z]$', + r'^\d+$', + # Python-specific false positives + r'^(print|len|str|int|float|list|dict|tuple|set)$', # Built-in functions + r'^(import|from|def|class|if|else|elif|for|while|try|except)$', # Keywords + r'^(help|dir|type|vars|globals|locals)$', # Introspection functions ] }