""" Statistics Commands Commands for generating statistics and reports about CVEs and SIGMA rules. """ import json from datetime import datetime from collections import defaultdict, Counter from typing import Dict, List, Optional from .base_command import BaseCommand class StatsCommands(BaseCommand): """Commands for generating statistics""" async def overview(self, year: Optional[int], output: Optional[str]): """Generate overview statistics""" self.info("Generating overview statistics...") # Collect statistics stats = self._collect_overview_stats(year) # Display overview self._display_overview_stats(stats, year) # Save to file if requested if output: try: with open(output, 'w') as f: json.dump(stats, f, indent=2, default=str) self.success(f"Statistics saved to {output}") except Exception as e: self.error(f"Failed to save statistics: {e}") async def poc_stats(self, year: Optional[int]): """Generate PoC coverage statistics""" self.info("Generating PoC coverage statistics...") cves = self.get_all_cves(year) if not cves: self.warning("No CVEs found") return # Collect PoC statistics total_cves = len(cves) cves_with_pocs = 0 poc_sources = Counter() quality_distribution = Counter() severity_poc_breakdown = defaultdict(lambda: {'total': 0, 'with_poc': 0}) for cve_id in cves: try: metadata = self.load_cve_metadata(cve_id) if not metadata: continue cve_info = metadata.get('cve_info', {}) poc_data = metadata.get('poc_data', {}) severity = cve_info.get('severity', 'Unknown') severity_poc_breakdown[severity]['total'] += 1 poc_count = poc_data.get('poc_count', 0) if poc_count > 0: cves_with_pocs += 1 severity_poc_breakdown[severity]['with_poc'] += 1 # Count PoC sources if 'poc_data' in poc_data: poc_info = poc_data['poc_data'] if 'nomi_sec' in poc_info and poc_info['nomi_sec']: poc_sources['nomi_sec'] += len(poc_info['nomi_sec']) if 'github' in poc_info and poc_info['github']: poc_sources['github'] += len(poc_info['github']) if 'exploitdb' in poc_info and poc_info['exploitdb']: poc_sources['exploitdb'] += len(poc_info['exploitdb']) # Quality assessment based on PoC count if poc_count >= 5: quality_distribution['excellent'] += 1 elif poc_count >= 3: quality_distribution['good'] += 1 elif poc_count >= 1: quality_distribution['fair'] += 1 except Exception as e: self.error(f"Error processing {cve_id}: {e}") # Display PoC statistics coverage_percent = (cves_with_pocs / total_cves * 100) if total_cves > 0 else 0 title = f"PoC Coverage Statistics" if year: title += f" for {year}" self.info(f"\n{title}") self.info("=" * len(title)) self.info(f"Total CVEs: {total_cves}") self.info(f"CVEs with PoCs: {cves_with_pocs}") self.info(f"Coverage: {coverage_percent:.1f}%") if poc_sources: self.info(f"\nPoC Sources:") for source, count in poc_sources.most_common(): self.info(f" {source}: {count}") if quality_distribution: self.info(f"\nQuality Distribution:") for quality, count in quality_distribution.most_common(): self.info(f" {quality}: {count}") # Severity breakdown table if severity_poc_breakdown: headers = ["Severity", "Total CVEs", "With PoCs", "Coverage %"] rows = [] for severity, data in sorted(severity_poc_breakdown.items()): coverage = (data['with_poc'] / data['total'] * 100) if data['total'] > 0 else 0 rows.append([ severity, str(data['total']), str(data['with_poc']), f"{coverage:.1f}%" ]) self.print_table(headers, rows, "PoC Coverage by Severity") async def rule_stats(self, year: Optional[int], method: Optional[str]): """Generate rule generation statistics""" self.info("Generating rule generation statistics...") cves = self.get_all_cves(year) if not cves: self.warning("No CVEs found") return # Collect rule statistics total_cves = len(cves) cves_with_rules = 0 method_counts = Counter() rules_per_cve = [] for cve_id in cves: try: rules = self.list_cve_rules(cve_id) if method: # Filter rules by method rules = [r for r in rules if method.lower() in r.lower()] if rules: cves_with_rules += 1 rules_per_cve.append(len(rules)) for rule_file in rules: rule_method = rule_file.replace('rule_', '').replace('.sigma', '') method_counts[rule_method] += 1 except Exception as e: self.error(f"Error processing {cve_id}: {e}") # Calculate statistics rule_coverage = (cves_with_rules / total_cves * 100) if total_cves > 0 else 0 avg_rules_per_cve = sum(rules_per_cve) / len(rules_per_cve) if rules_per_cve else 0 total_rules = sum(method_counts.values()) # Display rule statistics title = f"Rule Generation Statistics" if year: title += f" for {year}" if method: title += f" (method: {method})" self.info(f"\n{title}") self.info("=" * len(title)) self.info(f"Total CVEs: {total_cves}") self.info(f"CVEs with rules: {cves_with_rules}") self.info(f"Rule coverage: {rule_coverage:.1f}%") self.info(f"Total rules: {total_rules}") self.info(f"Average rules per CVE: {avg_rules_per_cve:.1f}") if method_counts: headers = ["Generation Method", "Rule Count", "% of Total"] rows = [] for gen_method, count in method_counts.most_common(): percentage = (count / total_rules * 100) if total_rules > 0 else 0 rows.append([ gen_method, str(count), f"{percentage:.1f}%" ]) self.print_table(headers, rows, "Rules by Generation Method") def _collect_overview_stats(self, year: Optional[int]) -> Dict: """Collect comprehensive overview statistics""" cves = self.get_all_cves(year) stats = { 'generated_at': datetime.utcnow().isoformat(), 'filter_year': year, 'total_cves': len(cves), 'severity_breakdown': Counter(), 'yearly_breakdown': Counter(), 'poc_stats': { 'cves_with_pocs': 0, 'total_poc_count': 0 }, 'rule_stats': { 'cves_with_rules': 0, 'total_rule_count': 0, 'generation_methods': Counter() } } for cve_id in cves: try: metadata = self.load_cve_metadata(cve_id) if not metadata: continue cve_info = metadata.get('cve_info', {}) poc_data = metadata.get('poc_data', {}) # Year breakdown cve_year = cve_id.split('-')[1] stats['yearly_breakdown'][cve_year] += 1 # Severity breakdown severity = cve_info.get('severity', 'Unknown') stats['severity_breakdown'][severity] += 1 # PoC statistics poc_count = poc_data.get('poc_count', 0) if poc_count > 0: stats['poc_stats']['cves_with_pocs'] += 1 stats['poc_stats']['total_poc_count'] += poc_count # Rule statistics rules = self.list_cve_rules(cve_id) if rules: stats['rule_stats']['cves_with_rules'] += 1 stats['rule_stats']['total_rule_count'] += len(rules) for rule_file in rules: method = rule_file.replace('rule_', '').replace('.sigma', '') stats['rule_stats']['generation_methods'][method] += 1 except Exception as e: self.error(f"Error collecting stats for {cve_id}: {e}") return stats def _display_overview_stats(self, stats: Dict, year: Optional[int]): """Display overview statistics""" title = f"CVE-SIGMA Overview Statistics" if year: title += f" for {year}" self.info(f"\n{title}") self.info("=" * len(title)) self.info(f"Generated at: {stats['generated_at']}") self.info(f"Total CVEs: {stats['total_cves']}") # PoC coverage poc_stats = stats['poc_stats'] poc_coverage = (poc_stats['cves_with_pocs'] / stats['total_cves'] * 100) if stats['total_cves'] > 0 else 0 self.info(f"PoC coverage: {poc_coverage:.1f}% ({poc_stats['cves_with_pocs']} CVEs)") # Rule coverage rule_stats = stats['rule_stats'] rule_coverage = (rule_stats['cves_with_rules'] / stats['total_cves'] * 100) if stats['total_cves'] > 0 else 0 self.info(f"Rule coverage: {rule_coverage:.1f}% ({rule_stats['cves_with_rules']} CVEs)") self.info(f"Total rules: {rule_stats['total_rule_count']}") # Severity breakdown if stats['severity_breakdown']: headers = ["Severity", "Count", "Percentage"] rows = [] for severity, count in stats['severity_breakdown'].most_common(): percentage = (count / stats['total_cves'] * 100) if stats['total_cves'] > 0 else 0 rows.append([severity, str(count), f"{percentage:.1f}%"]) self.print_table(headers, rows, "CVEs by Severity") # Yearly breakdown (if not filtered by year) if not year and stats['yearly_breakdown']: headers = ["Year", "CVE Count"] rows = [] for cve_year, count in sorted(stats['yearly_breakdown'].items()): rows.append([cve_year, str(count)]) self.print_table(headers, rows, "CVEs by Year")