""" Search Commands Commands for searching CVEs and SIGMA rules in the file-based system. """ import re from typing import Dict, List, Optional, Tuple from .base_command import BaseCommand class SearchCommands(BaseCommand): """Commands for searching CVEs and rules""" async def search_cves(self, pattern: str, year: Optional[int], severity: Optional[str], has_poc: bool, has_rules: bool, limit: int): """Search for CVEs by pattern""" self.info(f"Searching CVEs with pattern: '{pattern}'") if year: self.info(f"Filtering by year: {year}") if severity: self.info(f"Filtering by severity: {severity}") if has_poc: self.info("Only showing CVEs with PoC data") if has_rules: self.info("Only showing CVEs with generated rules") # Get CVEs to search cves_to_search = self.get_all_cves(year) if not cves_to_search: self.warning("No CVEs found to search") return matches = [] pattern_regex = re.compile(pattern, re.IGNORECASE) for cve_id in cves_to_search: try: metadata = self.load_cve_metadata(cve_id) if not metadata: continue cve_info = metadata.get('cve_info', {}) poc_data = metadata.get('poc_data', {}) # Apply filters if severity and cve_info.get('severity', '').lower() != severity.lower(): continue if has_poc and poc_data.get('poc_count', 0) == 0: continue if has_rules: rules = self.list_cve_rules(cve_id) if not rules: continue # Check pattern match match_found = False # Search in CVE ID if pattern_regex.search(cve_id): match_found = True # Search in description description = cve_info.get('description', '') if description and pattern_regex.search(description): match_found = True # Search in affected products products = cve_info.get('affected_products', []) for product in products: if pattern_regex.search(product): match_found = True break if match_found: rule_count = len(self.list_cve_rules(cve_id)) matches.append({ 'cve_id': cve_id, 'severity': cve_info.get('severity', 'Unknown'), 'cvss_score': cve_info.get('cvss_score', 'N/A'), 'poc_count': poc_data.get('poc_count', 0), 'rule_count': rule_count, 'description': (description[:100] + '...') if len(description) > 100 else description }) if len(matches) >= limit: break except Exception as e: self.error(f"Error searching {cve_id}: {e}") # Display results if matches: headers = ["CVE ID", "Severity", "CVSS", "PoCs", "Rules", "Description"] rows = [] for match in matches: rows.append([ match['cve_id'], match['severity'], str(match['cvss_score']), str(match['poc_count']), str(match['rule_count']), match['description'] ]) self.print_table(headers, rows, f"CVE Search Results ({len(matches)} matches)") else: self.warning("No matching CVEs found") async def search_rules(self, pattern: str, rule_type: Optional[str], method: Optional[str], limit: int): """Search for SIGMA rules by pattern""" self.info(f"Searching SIGMA rules with pattern: '{pattern}'") if rule_type: self.info(f"Filtering by rule type: {rule_type}") if method: self.info(f"Filtering by generation method: {method}") matches = [] pattern_regex = re.compile(pattern, re.IGNORECASE) # Search through all CVEs and their rules all_cves = self.get_all_cves() for cve_id in all_cves: try: rules = self.list_cve_rules(cve_id) for rule_file in rules: # Apply method filter if method: rule_method = rule_file.replace('rule_', '').replace('.sigma', '') if method.lower() not in rule_method.lower(): continue # Load and search rule content rule_content = self.load_sigma_rule(cve_id, rule_file) if not rule_content: continue # Apply rule type filter (search in logsource) if rule_type: if f'category: {rule_type}' not in rule_content.lower() and \ f'product: {rule_type}' not in rule_content.lower(): continue # Check pattern match in rule content if pattern_regex.search(rule_content): # Extract rule title title_match = re.search(r'^title:\s*(.+)$', rule_content, re.MULTILINE) title = title_match.group(1) if title_match else 'Unknown' # Extract detection type from logsource logsource_match = re.search(r'category:\s*(\w+)', rule_content) detection_type = logsource_match.group(1) if logsource_match else 'Unknown' matches.append({ 'cve_id': cve_id, 'rule_file': rule_file, 'title': title, 'detection_type': detection_type, 'method': rule_file.replace('rule_', '').replace('.sigma', '') }) if len(matches) >= limit: break if len(matches) >= limit: break except Exception as e: self.error(f"Error searching rules for {cve_id}: {e}") # Display results if matches: headers = ["CVE ID", "Rule File", "Title", "Type", "Method"] rows = [] for match in matches: rows.append([ match['cve_id'], match['rule_file'], match['title'][:50] + '...' if len(match['title']) > 50 else match['title'], match['detection_type'], match['method'] ]) self.print_table(headers, rows, f"SIGMA Rule Search Results ({len(matches)} matches)") else: self.warning("No matching rules found")