import re import os from typing import List, Optional from github import Github import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from config.settings import settings class GitHubExploitAnalyzer: """Service for analyzing GitHub repositories for exploit code""" def __init__(self): self.github_token = settings.GITHUB_TOKEN self.github = Github(self.github_token) if self.github_token else None async def search_exploits_for_cve(self, cve_id: str) -> List[dict]: """Search GitHub for exploit code related to a CVE""" if not self.github: print(f"No GitHub token configured, skipping exploit search for {cve_id}") return [] try: print(f"Searching GitHub for exploits for {cve_id}") # Search queries to find exploit code search_queries = [ f"{cve_id} exploit", f"{cve_id} poc", f"{cve_id} vulnerability", f'"{cve_id}" exploit code', f"{cve_id.replace('-', '_')} exploit" ] exploits = [] seen_repos = set() for query in search_queries[:2]: # Limit to 2 queries to avoid rate limits try: # Search repositories repos = self.github.search_repositories( query=query, sort="updated", order="desc" ) # Get top 5 results per query for repo in repos[:5]: if repo.full_name in seen_repos: continue seen_repos.add(repo.full_name) # Analyze repository exploit_info = await self._analyze_repository(repo, cve_id) if exploit_info: exploits.append(exploit_info) if len(exploits) >= settings.MAX_GITHUB_RESULTS: break if len(exploits) >= settings.MAX_GITHUB_RESULTS: break except Exception as e: print(f"Error searching GitHub with query '{query}': {str(e)}") continue print(f"Found {len(exploits)} potential exploits for {cve_id}") return exploits except Exception as e: print(f"Error searching GitHub for {cve_id}: {str(e)}") return [] async def _analyze_repository(self, repo, cve_id: str) -> Optional[dict]: """Analyze a GitHub repository for exploit code""" try: # Check if repo name or description mentions the CVE repo_text = f"{repo.name} {repo.description or ''}".lower() if cve_id.lower() not in repo_text and cve_id.replace('-', '_').lower() not in repo_text: return None # Get repository contents exploit_files = [] indicators = { 'processes': set(), 'files': set(), 'registry': set(), 'network': set(), 'commands': set(), 'powershell': set(), 'urls': set() } try: contents = repo.get_contents("") for content in contents[:20]: # Limit files to analyze if content.type == "file" and self._is_exploit_file(content.name): file_analysis = await self._analyze_file_content(repo, content, cve_id) if file_analysis: exploit_files.append(file_analysis) # Merge indicators for key, values in file_analysis.get('indicators', {}).items(): if key in indicators: indicators[key].update(values) except Exception as e: print(f"Error analyzing repo contents for {repo.full_name}: {str(e)}") if not exploit_files: return None return { 'repo_name': repo.full_name, 'repo_url': repo.html_url, 'description': repo.description, 'language': repo.language, 'stars': repo.stargazers_count, 'updated': repo.updated_at.isoformat(), 'files': exploit_files, 'indicators': {k: list(v) for k, v in indicators.items()} } except Exception as e: print(f"Error analyzing repository {repo.full_name}: {str(e)}") return None def _is_exploit_file(self, filename: str) -> bool: """Check if a file is likely to contain exploit code""" exploit_extensions = ['.py', '.ps1', '.sh', '.c', '.cpp', '.js', '.rb', '.pl', '.php', '.java'] exploit_names = ['exploit', 'poc', 'payload', 'shell', 'reverse', 'bind', 'attack'] filename_lower = filename.lower() # Check extension if not any(filename_lower.endswith(ext) for ext in exploit_extensions): return False # Check filename for exploit-related terms return any(term in filename_lower for term in exploit_names) or 'cve' in filename_lower async def _analyze_file_content(self, repo, file_content, cve_id: str) -> Optional[dict]: """Analyze individual file content for exploit indicators""" try: if file_content.size > 100000: # Skip files larger than 100KB return None # Decode file content content = file_content.decoded_content.decode('utf-8', errors='ignore') # Check if file actually mentions the CVE if cve_id.lower() not in content.lower() and cve_id.replace('-', '_').lower() not in content.lower(): return None indicators = self._extract_indicators_from_code(content, file_content.name) if not any(indicators.values()): return None return { 'filename': file_content.name, 'path': file_content.path, 'size': file_content.size, 'indicators': indicators } except Exception as e: print(f"Error analyzing file {file_content.name}: {str(e)}") return None def _extract_indicators_from_code(self, content: str, filename: str) -> dict: """Extract security indicators from exploit code""" indicators = { 'processes': set(), 'files': set(), 'registry': set(), 'network': set(), 'commands': set(), 'powershell': set(), 'urls': set() } # Process patterns process_patterns = [ r'CreateProcess[AW]?\s*\(\s*["\']([^"\']+)["\']', r'ShellExecute[AW]?\s*\([^,]*,\s*["\']([^"\']+)["\']', r'system\s*\(\s*["\']([^"\']+)["\']', r'exec\s*\(\s*["\']([^"\']+)["\']', r'subprocess\.(?:call|run|Popen)\s*\(\s*["\']([^"\']+)["\']' ] # File patterns file_patterns = [ r'(?:fopen|CreateFile|WriteFile|ReadFile)\s*\(\s*["\']([^"\']+\.[a-zA-Z0-9]+)["\']', r'(?:copy|move|del|rm)\s+["\']?([^\s"\']+\.[a-zA-Z0-9]+)["\']?', r'\\\\[^\\]+\\[^\\]+\\([^\\]+\.[a-zA-Z0-9]+)', r'[C-Z]:\\\\[^\\]+\\\\([^\\]+\.[a-zA-Z0-9]+)' ] # Registry patterns registry_patterns = [ r'(?:RegOpenKey|RegSetValue|RegCreateKey)\s*\([^,]*,\s*["\']([^"\']+)["\']', r'HKEY_[A-Z_]+\\\\([^"\'\\]+)', r'reg\s+add\s+["\']?([^"\'\\]+\\\\[^"\']+)["\']?' ] # Network patterns network_patterns = [ r'(?:connect|bind|listen)\s*\([^,]*,\s*(\d+)', r'socket\.connect\s*\(\s*\(["\']?([^"\']+)["\']?,\s*(\d+)\)', r'(?:http|https|ftp)://([^\s"\'<>]+)', r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):(\d+)' ] # PowerShell patterns powershell_patterns = [ r'(?:powershell|pwsh)\s+(?:-[a-zA-Z]+\s+)*["\']?([^"\']+)["\']?', r'Invoke-(?:Expression|Command|WebRequest|RestMethod)\s+["\']?([^"\']+)["\']?', r'Start-Process\s+["\']?([^"\']+)["\']?', r'Get-Process\s+["\']?([^"\']+)["\']?' ] # Command patterns command_patterns = [ r'(?:cmd|command)\s+(?:/[a-zA-Z]+\s+)*["\']?([^"\']+)["\']?', r'(?:ping|nslookup|netstat|tasklist|wmic)\s+([^\s"\']+)', r'(?:net|sc|schtasks)\s+[a-zA-Z]+\s+([^\s"\']+)' ] # Extract indicators using regex patterns patterns = { 'processes': process_patterns, 'files': file_patterns, 'registry': registry_patterns, 'powershell': powershell_patterns, 'commands': command_patterns } for category, pattern_list in patterns.items(): for pattern in pattern_list: matches = re.findall(pattern, content, re.IGNORECASE | re.MULTILINE) for match in matches: if isinstance(match, tuple): indicators[category].add(match[0]) else: indicators[category].add(match) # Special handling for network indicators for pattern in network_patterns: matches = re.findall(pattern, content, re.IGNORECASE) for match in matches: if isinstance(match, tuple): if len(match) >= 2: indicators['network'].add(f"{match[0]}:{match[1]}") else: indicators['network'].add(match[0]) else: indicators['network'].add(match) # Convert sets to lists and filter out empty/invalid indicators cleaned_indicators = {} for key, values in indicators.items(): cleaned_values = [v for v in values if v and len(v.strip()) > 2 and len(v) < 200] if cleaned_values: cleaned_indicators[key] = cleaned_values[:10] # Limit to 10 per category return cleaned_indicators