""" ExploitDB Local Filesystem Integration Client Interfaces with the local ExploitDB submodule at exploit-db-mirror/ """ import os import re import json import logging from datetime import datetime from typing import Dict, List, Optional, Tuple from sqlalchemy.orm import Session from pathlib import Path # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class ExploitDBLocalClient: """Client for interfacing with local ExploitDB mirror filesystem""" def __init__(self, db_session: Session): self.db_session = db_session # Path to the local exploit-db-mirror submodule (in container: /app/exploit-db-mirror) self.exploitdb_path = Path("/app/exploit-db-mirror") self.exploits_path = self.exploitdb_path / "exploits" # ExploitDB URL pattern for mapping self.exploit_url_pattern = re.compile(r'https?://(?:www\.)?exploit-db\.com/exploits/(\d+)') # Cache for file searches self.file_cache = {} # Build file index on initialization self._build_file_index() def _build_file_index(self): """Build an index of exploit ID to file path for fast lookups""" logger.info("Building ExploitDB file index...") self.file_index = {} if not self.exploits_path.exists(): logger.error(f"ExploitDB path not found: {self.exploits_path}") return # Walk through all exploit files for root, dirs, files in os.walk(self.exploits_path): for file in files: # Extract exploit ID from filename (e.g., "12345.py" -> "12345") match = re.match(r'^(\d+)\.(\w+)$', file) if match: exploit_id = match.group(1) file_extension = match.group(2) file_path = Path(root) / file # Store in index self.file_index[exploit_id] = { 'path': file_path, 'filename': file, 'extension': file_extension, 'category': self._extract_category_from_path(file_path), 'subcategory': self._extract_subcategory_from_path(file_path) } logger.info(f"Built index with {len(self.file_index)} exploits") def _extract_category_from_path(self, file_path: Path) -> str: """Extract category from file path (e.g., linux, windows, etc.)""" parts = file_path.parts exploits_index = None for i, part in enumerate(parts): if part == "exploits": exploits_index = i break if exploits_index and exploits_index + 1 < len(parts): return parts[exploits_index + 1] return "unknown" def _extract_subcategory_from_path(self, file_path: Path) -> str: """Extract subcategory from file path (e.g., local, remote, webapps, etc.)""" parts = file_path.parts exploits_index = None for i, part in enumerate(parts): if part == "exploits": exploits_index = i break if exploits_index and exploits_index + 2 < len(parts): return parts[exploits_index + 2] return "unknown" def extract_exploit_id_from_url(self, url: str) -> Optional[str]: """Extract exploit ID from ExploitDB URL""" match = self.exploit_url_pattern.search(url) if match: return match.group(1) return None def get_exploit_details(self, exploit_id: str) -> Optional[dict]: """Get exploit details from local filesystem""" if exploit_id not in self.file_index: logger.debug(f"Exploit {exploit_id} not found in local index") return None file_info = self.file_index[exploit_id] file_path = file_info['path'] try: # Read file content with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() # Get file stats stat = file_path.stat() return { 'id': exploit_id, 'filename': file_info['filename'], 'path': str(file_path.relative_to(self.exploitdb_path)), 'full_path': str(file_path), 'category': file_info['category'], 'subcategory': file_info['subcategory'], 'extension': file_info['extension'], 'content': content, 'size': stat.st_size, 'modified_time': datetime.fromtimestamp(stat.st_mtime), 'local_url': f"file://{file_path}" } except Exception as e: logger.error(f"Error reading exploit file {file_path}: {e}") return None def analyze_exploit_content(self, exploit_data: dict) -> dict: """Analyze exploit content to extract indicators""" if not exploit_data or not exploit_data.get('content'): return {} content = exploit_data['content'] indicators = { 'processes': [], 'files': [], 'network': [], 'registry': [], 'commands': [], 'urls': [], 'techniques': [], 'languages': [], 'platforms': [], 'syscalls': [], 'functions': [] } # Determine programming language from extension extension = exploit_data.get('extension', '').lower() language_map = { 'py': 'python', 'rb': 'ruby', 'pl': 'perl', 'c': 'c', 'cpp': 'cpp', 'cc': 'cpp', 'cxx': 'cpp', 'sh': 'bash', 'ps1': 'powershell', 'java': 'java', 'js': 'javascript', 'php': 'php', 'asp': 'asp', 'aspx': 'aspx', 'jsp': 'jsp', 'go': 'go', 'rs': 'rust', 'asm': 'assembly', 's': 'assembly', 'nasm': 'assembly' } if extension in language_map: indicators['languages'].append(language_map[extension]) # Extract platform from path category = exploit_data.get('category', '').lower() if category in ['linux', 'windows', 'osx', 'macos', 'android', 'freebsd', 'solaris']: indicators['platforms'].append(category) # Extract indicators from content content_lower = content.lower() # Process patterns - enhanced for different languages process_patterns = [ r'\b(cmd\.exe|powershell\.exe|bash|sh|python|ruby|perl|java)\b', r'\b(system|exec|popen|subprocess|shell_exec|eval|execve|execl|execlp)\b', r'\b(createprocess|shellexecute|winexec|createthread)\b', r'\b(mshta|rundll32|regsvr32|wscript|cscript|certutil|bitsadmin)\b', r'\b(/bin/sh|/bin/bash|/usr/bin/python|/usr/bin/perl)\b' ] for pattern in process_patterns: matches = re.findall(pattern, content, re.IGNORECASE) indicators['processes'].extend(matches) # File patterns - enhanced file_patterns = [ r'\b([a-zA-Z]:\\[^\\\s"\']+\\[^\\\s"\']+\.[a-zA-Z0-9]+)\b', # Windows paths r'\b(/[^/\s"\']+/[^/\s"\']+\.[a-zA-Z0-9]+)\b', # Unix paths r'\b(\w+\.(exe|dll|so|dylib|bat|ps1|py|sh|jar|war|php|jsp|asp|aspx|txt|log|conf))\b', r'\b(/tmp/[^\s"\']+)\b', # Temp files r'\b(/etc/[^\s"\']+)\b', # Config files r'\b(/var/[^\s"\']+)\b', # Var files r'\b(/proc/[^\s"\']+)\b' # Proc files ] for pattern in file_patterns: matches = re.findall(pattern, content, re.IGNORECASE) if matches and isinstance(matches[0], tuple): indicators['files'].extend([m[0] for m in matches]) else: indicators['files'].extend(matches) # Network patterns - enhanced network_patterns = [ r'\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b', # IP addresses r'\b(https?://[^\s<>"\']+)\b', # URLs r'\b([a-zA-Z0-9-]+\.[a-zA-Z]{2,})\b', # Domain names r'\b(bind|connect|listen|socket|recv|send|accept)\b', # Network functions r'\b(AF_INET|SOCK_STREAM|SOCK_DGRAM)\b', # Socket constants r'\b(nc|netcat|ncat|telnet|ssh|ftp|wget|curl)\b' # Network tools ] for pattern in network_patterns: matches = re.findall(pattern, content, re.IGNORECASE) if 'http' in pattern: indicators['urls'].extend(matches) else: indicators['network'].extend(matches) # Command patterns - enhanced command_patterns = [ r'\b(curl|wget|nc|netcat|telnet|ssh|ftp)\b', r'\b(whoami|id|uname|systeminfo|ipconfig|ifconfig|netstat|ps|top|lsof)\b', r'\b(cat|type|dir|ls|find|grep|awk|sed|sort|uniq)\b', r'\b(echo|printf|print)\b', r'\b(base64|decode|encode|openssl|gpg)\b', r'\b(sudo|su|chmod|chown|mount|umount)\b', r'\b(service|systemctl|chkconfig|update-rc\.d)\b' ] for pattern in command_patterns: matches = re.findall(pattern, content, re.IGNORECASE) indicators['commands'].extend(matches) # Registry patterns (Windows-specific) registry_patterns = [ r'\b(HKEY_[A-Z_]+)\b', r'\b(HKLM|HKCU|HKCR|HKU|HKCC)\b', r'\b(reg\s+add|reg\s+query|reg\s+delete|regedit)\b', r'\b(SOFTWARE\\\\[^\\\s"\']+)\b', r'\b(SYSTEM\\\\[^\\\s"\']+)\b', r'\b(CurrentVersion\\\\[^\\\s"\']+)\b' ] for pattern in registry_patterns: matches = re.findall(pattern, content, re.IGNORECASE) indicators['registry'].extend(matches) # System call patterns (Linux/Unix) syscall_patterns = [ r'\b(open|close|read|write|lseek|stat|fstat|lstat)\b', r'\b(fork|vfork|clone|execve|wait|waitpid)\b', r'\b(socket|bind|listen|accept|connect|send|recv)\b', r'\b(mmap|munmap|mprotect|brk|sbrk)\b', r'\b(ptrace|kill|signal|alarm)\b' ] for pattern in syscall_patterns: matches = re.findall(pattern, content, re.IGNORECASE) indicators['syscalls'].extend(matches) # Function patterns function_patterns = [ r'\b(main|printf|scanf|malloc|free|strcpy|strcat|strlen)\b', r'\b(gets|puts|fgets|fputs|fopen|fclose|fread|fwrite)\b', r'\b(sprintf|snprintf|memcpy|memset|strcmp|strncmp)\b' ] for pattern in function_patterns: matches = re.findall(pattern, content, re.IGNORECASE) indicators['functions'].extend(matches) # Clean up and deduplicate for key in indicators: # Remove empty strings and duplicates indicators[key] = list(set([item.strip() for item in indicators[key] if item and len(item.strip()) > 1])) # Limit to reasonable number of indicators indicators[key] = indicators[key][:25] return indicators def calculate_exploit_quality_score(self, exploit_data: dict) -> dict: """Calculate quality score for an exploit""" quality_score = 0 factors = {} # File size factor (0-25 points) file_size = exploit_data.get('size', 0) if file_size > 0: if file_size > 10000: # Large files (10KB+) size_score = 25 elif file_size > 5000: # Medium files (5KB+) size_score = 20 elif file_size > 1000: # Small files (1KB+) size_score = 15 elif file_size > 500: # Very small files (500B+) size_score = 10 else: # Tiny files size_score = 5 quality_score += size_score factors['size_score'] = size_score # Content analysis factor (0-30 points) content = exploit_data.get('content', '') if content: content_score = 0 lines = content.split('\n') # Check for comments and documentation comment_lines = 0 for line in lines: stripped = line.strip() if (stripped.startswith('#') or stripped.startswith('//') or stripped.startswith('/*') or stripped.startswith('*') or stripped.startswith('"""') or stripped.startswith("'''")): comment_lines += 1 comment_score = min(comment_lines, 10) # Up to 10 points content_score += comment_score # Check for function definitions function_patterns = [ r'\bdef\s+\w+', # Python r'\bfunction\s+\w+', # JavaScript r'\bvoid\s+\w+', # C/C++ r'\bint\s+\w+', # C/C++ r'\bchar\s+\w+', # C/C++ r'\bsub\s+\w+', # Perl r'^\w+\s*\(', # Generic function calls ] function_count = 0 for pattern in function_patterns: matches = re.findall(pattern, content, re.IGNORECASE | re.MULTILINE) function_count += len(matches) function_score = min(function_count, 15) # Up to 15 points content_score += function_score # Check for include/import statements include_patterns = [ r'#include\s*[<"]', # C/C++ r'import\s+\w+', # Python/Java r'require\s+\w+', # Ruby/Perl r'use\s+\w+', # Perl ] include_count = 0 for pattern in include_patterns: matches = re.findall(pattern, content, re.IGNORECASE) include_count += len(matches) include_score = min(include_count, 5) # Up to 5 points content_score += include_score quality_score += content_score factors['content_score'] = content_score # Platform/category factor (0-20 points) category = exploit_data.get('category', '').lower() subcategory = exploit_data.get('subcategory', '').lower() platform_score = 0 if category in ['linux', 'windows', 'osx', 'macos']: platform_score += 10 elif category in ['android', 'freebsd', 'solaris']: platform_score += 8 elif category in ['multiple', 'unix']: platform_score += 6 if subcategory in ['local', 'remote']: platform_score += 10 elif subcategory in ['webapps', 'dos']: platform_score += 8 elif subcategory in ['shellcode']: platform_score += 6 quality_score += platform_score factors['platform_score'] = platform_score # Language factor (0-15 points) extension = exploit_data.get('extension', '').lower() lang_score = 0 if extension in ['c', 'cpp', 'cc', 'cxx']: # Compiled languages lang_score = 15 elif extension in ['py', 'rb', 'pl', 'java']: # High-level languages lang_score = 12 elif extension in ['sh', 'ps1', 'bat']: # Scripting languages lang_score = 8 elif extension in ['asm', 's', 'nasm']: # Assembly lang_score = 10 elif extension in ['php', 'asp', 'aspx', 'jsp']: # Web languages lang_score = 6 elif extension in ['txt', 'html']: # Text/docs lang_score = 3 quality_score += lang_score factors['language_score'] = lang_score # File age factor (0-10 points) - newer exploits might be more relevant modified_time = exploit_data.get('modified_time') if modified_time: days_old = (datetime.now() - modified_time).days if days_old < 365: # Less than 1 year age_score = 10 elif days_old < 365 * 3: # Less than 3 years age_score = 8 elif days_old < 365 * 5: # Less than 5 years age_score = 6 elif days_old < 365 * 10: # Less than 10 years age_score = 4 else: # Very old age_score = 2 quality_score += age_score factors['age_score'] = age_score # Determine quality tier quality_tier = self._get_exploit_quality_tier(quality_score) return { 'quality_score': quality_score, 'factors': factors, 'quality_tier': quality_tier } def _get_exploit_quality_tier(self, score: int) -> str: """Get quality tier based on score""" if score >= 80: return 'excellent' elif score >= 65: return 'good' elif score >= 45: return 'fair' elif score >= 25: return 'poor' else: return 'very_poor' async def sync_cve_exploits(self, cve_id: str) -> dict: """Synchronize ExploitDB data for a specific CVE using local filesystem""" from main import CVE, SigmaRule # Get existing CVE cve = self.db_session.query(CVE).filter(CVE.cve_id == cve_id).first() if not cve: logger.warning(f"CVE {cve_id} not found in database") return {"error": "CVE not found"} # Extract ExploitDB URLs from reference URLs exploit_urls = [] if cve.reference_urls: for url in cve.reference_urls: exploit_id = self.extract_exploit_id_from_url(url) if exploit_id: exploit_urls.append((url, exploit_id)) if not exploit_urls: logger.info(f"No ExploitDB URLs found for {cve_id}") return {"cve_id": cve_id, "exploits_found": 0} # Fetch exploit details from local filesystem exploit_data = [] total_quality_score = 0 for url, exploit_id in exploit_urls: try: details = self.get_exploit_details(exploit_id) if details: # Analyze exploit content indicators = self.analyze_exploit_content(details) quality_analysis = self.calculate_exploit_quality_score(details) exploit_entry = { 'id': exploit_id, 'url': url, 'filename': details.get('filename'), 'path': details.get('path'), 'category': details.get('category'), 'subcategory': details.get('subcategory'), 'extension': details.get('extension'), 'size': details.get('size'), 'modified_time': details.get('modified_time').isoformat() if details.get('modified_time') else None, 'local_url': details.get('local_url'), 'indicators': indicators, 'quality_analysis': quality_analysis } exploit_data.append(exploit_entry) total_quality_score += quality_analysis['quality_score'] logger.info(f"Successfully processed exploit {exploit_id} from local filesystem") except Exception as e: logger.error(f"Error processing exploit {exploit_id}: {e}") # Update CVE with ExploitDB data if exploit_data: # Store in existing poc_data field if not cve.poc_data: cve.poc_data = {} cve.poc_data['exploitdb'] = { 'exploits': exploit_data, 'total_exploits': len(exploit_data), 'average_quality': total_quality_score // len(exploit_data) if exploit_data else 0, 'synced_at': datetime.utcnow().isoformat(), 'source': 'local_filesystem' } cve.updated_at = datetime.utcnow() # Update SIGMA rule with ExploitDB data sigma_rule = self.db_session.query(SigmaRule).filter( SigmaRule.cve_id == cve_id ).first() if sigma_rule: # Combine indicators from all exploits combined_indicators = {} for exploit in exploit_data: for key, values in exploit['indicators'].items(): if key not in combined_indicators: combined_indicators[key] = [] combined_indicators[key].extend(values) # Deduplicate for key in combined_indicators: combined_indicators[key] = list(set(combined_indicators[key])) # Update rule with ExploitDB data if not sigma_rule.nomi_sec_data: sigma_rule.nomi_sec_data = {} sigma_rule.nomi_sec_data['exploitdb'] = { 'total_exploits': len(exploit_data), 'average_quality': total_quality_score // len(exploit_data) if exploit_data else 0, 'best_exploit': max(exploit_data, key=lambda x: x['quality_analysis']['quality_score']) if exploit_data else None, 'indicators': combined_indicators, 'source': 'local_filesystem' } # Update exploit indicators existing_indicators = json.loads(sigma_rule.exploit_indicators) if sigma_rule.exploit_indicators else {} for key, values in combined_indicators.items(): if key not in existing_indicators: existing_indicators[key] = [] existing_indicators[key].extend(values) existing_indicators[key] = list(set(existing_indicators[key])) sigma_rule.exploit_indicators = json.dumps(existing_indicators) sigma_rule.updated_at = datetime.utcnow() self.db_session.commit() logger.info(f"Synchronized {len(exploit_data)} ExploitDB exploits for {cve_id} from local filesystem") return { "cve_id": cve_id, "exploits_found": len(exploit_data), "total_quality_score": total_quality_score, "average_quality": total_quality_score // len(exploit_data) if exploit_data else 0, "exploit_urls": [e['url'] for e in exploit_data], "source": "local_filesystem" } async def bulk_sync_exploitdb(self, batch_size: int = 50, cancellation_flag: Optional[callable] = None) -> dict: """Synchronize ExploitDB data for all CVEs with ExploitDB references using local filesystem""" from main import CVE, BulkProcessingJob from sqlalchemy import text # Create bulk processing job job = BulkProcessingJob( job_type='exploitdb_sync_local', status='running', started_at=datetime.utcnow(), job_metadata={'batch_size': batch_size, 'source': 'local_filesystem'} ) self.db_session.add(job) self.db_session.commit() total_processed = 0 total_found = 0 results = [] try: # Get all CVEs with ExploitDB references using text search cves = self.db_session.query(CVE).filter( text("reference_urls::text LIKE '%exploit-db%'") ).all() job.total_items = len(cves) self.db_session.commit() logger.info(f"Found {len(cves)} CVEs with ExploitDB references for local sync") # Process in batches for i in range(0, len(cves), batch_size): # Check for cancellation if cancellation_flag and cancellation_flag(): logger.info("ExploitDB local sync cancelled by user") job.status = 'cancelled' job.cancelled_at = datetime.utcnow() job.error_message = "Job cancelled by user" break batch = cves[i:i + batch_size] for cve in batch: # Check for cancellation if cancellation_flag and cancellation_flag(): logger.info("ExploitDB local sync cancelled by user") job.status = 'cancelled' job.cancelled_at = datetime.utcnow() job.error_message = "Job cancelled by user" break try: result = await self.sync_cve_exploits(cve.cve_id) total_processed += 1 if result.get("exploits_found", 0) > 0: total_found += result["exploits_found"] results.append(result) job.processed_items += 1 # Very small delay for responsiveness # No need for long delays with local filesystem except Exception as e: logger.error(f"Error syncing ExploitDB for {cve.cve_id}: {e}") job.failed_items += 1 # Break out of outer loop if cancelled if job.status == 'cancelled': break # Commit after each batch self.db_session.commit() logger.info(f"Processed ExploitDB local batch {i//batch_size + 1}/{(len(cves) + batch_size - 1)//batch_size}") # Update job status if job.status != 'cancelled': job.status = 'completed' job.completed_at = datetime.utcnow() job.job_metadata.update({ 'total_processed': total_processed, 'total_exploits_found': total_found, 'cves_with_exploits': len(results), 'source': 'local_filesystem' }) except Exception as e: job.status = 'failed' job.error_message = str(e) job.completed_at = datetime.utcnow() logger.error(f"Bulk ExploitDB local sync job failed: {e}") finally: self.db_session.commit() return { 'job_id': str(job.id), 'status': job.status, 'total_processed': total_processed, 'total_exploits_found': total_found, 'cves_with_exploits': len(results), 'source': 'local_filesystem' } async def get_exploitdb_sync_status(self) -> dict: """Get ExploitDB synchronization status for local filesystem""" from main import CVE from sqlalchemy import text # Count CVEs with ExploitDB references total_cves = self.db_session.query(CVE).count() # Count CVEs with ExploitDB data result = self.db_session.execute( text("SELECT COUNT(*) FROM cves WHERE poc_data::text LIKE '%\"exploitdb\"%'") ) cves_with_exploitdb = result.scalar() # Count CVEs with ExploitDB URLs in references result2 = self.db_session.execute( text("SELECT COUNT(*) FROM cves WHERE reference_urls::text LIKE '%exploit-db%'") ) cves_with_exploitdb_refs = result2.scalar() return { 'total_cves': total_cves, 'cves_with_exploitdb_refs': cves_with_exploitdb_refs, 'cves_with_exploitdb_data': cves_with_exploitdb, 'exploitdb_coverage': (cves_with_exploitdb / cves_with_exploitdb_refs * 100) if cves_with_exploitdb_refs > 0 else 0, 'exploitdb_sync_status': 'active' if cves_with_exploitdb > 0 else 'pending', 'exploitdb_local_index_size': len(self.file_index), 'source': 'local_filesystem' }