auto_sigma_rule_generator/cli/commands/stats_commands.py
bpmcdevitt e579c91b5e MAJOR: Transform web application to professional CLI-based SIGMA rule generator
🎉 **Architecture Transformation (v2.0)**
- Complete migration from web app to professional CLI tool
- File-based SIGMA rule management system
- Git-friendly directory structure organized by year/CVE-ID
- Multiple rule variants per CVE (template, LLM, hybrid)

 **New CLI System**
- Professional command-line interface with Click framework
- 8 command groups: process, generate, search, stats, export, migrate
- Modular command architecture for maintainability
- Comprehensive help system and configuration management

📁 **File-Based Storage Architecture**
- Individual CVE directories: cves/YEAR/CVE-ID/
- Multiple SIGMA rule variants per CVE
- JSON metadata with processing history and PoC data
- Native YAML files perfect for version control

🚀 **Core CLI Commands**
- process: CVE processing and bulk operations
- generate: SIGMA rule generation with multiple methods
- search: Advanced CVE and rule searching with filters
- stats: Comprehensive statistics and analytics
- export: Multiple output formats for different workflows
- migrate: Database-to-file migration tools

🔧 **Migration Support**
- Complete migration utilities from web database
- Data validation and integrity checking
- Backward compatibility with existing processors
- Legacy web interface maintained for transition

📊 **Enhanced Features**
- Advanced search with complex filtering (severity, PoC presence, etc.)
- Multi-format exports (YAML, JSON, CSV)
- Comprehensive statistics and coverage reports
- File-based rule versioning and management

🎯 **Production Benefits**
- No database dependency - runs anywhere
- Perfect for cybersecurity teams using git workflows
- Direct integration with SIGMA ecosystems
- Portable architecture for CI/CD pipelines
- Multiple rule variants for different detection scenarios

📝 **Documentation Updates**
- Complete README rewrite for CLI-first approach
- Updated CLAUDE.md with new architecture details
- Detailed CLI documentation with examples
- Migration guides and troubleshooting

**Perfect for security teams wanting production-ready SIGMA rules with version control\! 🛡️**

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-21 13:11:03 -05:00

296 lines
No EOL
11 KiB
Python

"""
Statistics Commands
Commands for generating statistics and reports about CVEs and SIGMA rules.
"""
import json
from datetime import datetime
from collections import defaultdict, Counter
from typing import Dict, List, Optional
from .base_command import BaseCommand
class StatsCommands(BaseCommand):
"""Commands for generating statistics"""
async def overview(self, year: Optional[int], output: Optional[str]):
"""Generate overview statistics"""
self.info("Generating overview statistics...")
# Collect statistics
stats = self._collect_overview_stats(year)
# Display overview
self._display_overview_stats(stats, year)
# Save to file if requested
if output:
try:
with open(output, 'w') as f:
json.dump(stats, f, indent=2, default=str)
self.success(f"Statistics saved to {output}")
except Exception as e:
self.error(f"Failed to save statistics: {e}")
async def poc_stats(self, year: Optional[int]):
"""Generate PoC coverage statistics"""
self.info("Generating PoC coverage statistics...")
cves = self.get_all_cves(year)
if not cves:
self.warning("No CVEs found")
return
# Collect PoC statistics
total_cves = len(cves)
cves_with_pocs = 0
poc_sources = Counter()
quality_distribution = Counter()
severity_poc_breakdown = defaultdict(lambda: {'total': 0, 'with_poc': 0})
for cve_id in cves:
try:
metadata = self.load_cve_metadata(cve_id)
if not metadata:
continue
cve_info = metadata.get('cve_info', {})
poc_data = metadata.get('poc_data', {})
severity = cve_info.get('severity', 'Unknown')
severity_poc_breakdown[severity]['total'] += 1
poc_count = poc_data.get('poc_count', 0)
if poc_count > 0:
cves_with_pocs += 1
severity_poc_breakdown[severity]['with_poc'] += 1
# Count PoC sources
if 'poc_data' in poc_data:
poc_info = poc_data['poc_data']
if 'nomi_sec' in poc_info and poc_info['nomi_sec']:
poc_sources['nomi_sec'] += len(poc_info['nomi_sec'])
if 'github' in poc_info and poc_info['github']:
poc_sources['github'] += len(poc_info['github'])
if 'exploitdb' in poc_info and poc_info['exploitdb']:
poc_sources['exploitdb'] += len(poc_info['exploitdb'])
# Quality assessment based on PoC count
if poc_count >= 5:
quality_distribution['excellent'] += 1
elif poc_count >= 3:
quality_distribution['good'] += 1
elif poc_count >= 1:
quality_distribution['fair'] += 1
except Exception as e:
self.error(f"Error processing {cve_id}: {e}")
# Display PoC statistics
coverage_percent = (cves_with_pocs / total_cves * 100) if total_cves > 0 else 0
title = f"PoC Coverage Statistics"
if year:
title += f" for {year}"
self.info(f"\n{title}")
self.info("=" * len(title))
self.info(f"Total CVEs: {total_cves}")
self.info(f"CVEs with PoCs: {cves_with_pocs}")
self.info(f"Coverage: {coverage_percent:.1f}%")
if poc_sources:
self.info(f"\nPoC Sources:")
for source, count in poc_sources.most_common():
self.info(f" {source}: {count}")
if quality_distribution:
self.info(f"\nQuality Distribution:")
for quality, count in quality_distribution.most_common():
self.info(f" {quality}: {count}")
# Severity breakdown table
if severity_poc_breakdown:
headers = ["Severity", "Total CVEs", "With PoCs", "Coverage %"]
rows = []
for severity, data in sorted(severity_poc_breakdown.items()):
coverage = (data['with_poc'] / data['total'] * 100) if data['total'] > 0 else 0
rows.append([
severity,
str(data['total']),
str(data['with_poc']),
f"{coverage:.1f}%"
])
self.print_table(headers, rows, "PoC Coverage by Severity")
async def rule_stats(self, year: Optional[int], method: Optional[str]):
"""Generate rule generation statistics"""
self.info("Generating rule generation statistics...")
cves = self.get_all_cves(year)
if not cves:
self.warning("No CVEs found")
return
# Collect rule statistics
total_cves = len(cves)
cves_with_rules = 0
method_counts = Counter()
rules_per_cve = []
for cve_id in cves:
try:
rules = self.list_cve_rules(cve_id)
if method:
# Filter rules by method
rules = [r for r in rules if method.lower() in r.lower()]
if rules:
cves_with_rules += 1
rules_per_cve.append(len(rules))
for rule_file in rules:
rule_method = rule_file.replace('rule_', '').replace('.sigma', '')
method_counts[rule_method] += 1
except Exception as e:
self.error(f"Error processing {cve_id}: {e}")
# Calculate statistics
rule_coverage = (cves_with_rules / total_cves * 100) if total_cves > 0 else 0
avg_rules_per_cve = sum(rules_per_cve) / len(rules_per_cve) if rules_per_cve else 0
total_rules = sum(method_counts.values())
# Display rule statistics
title = f"Rule Generation Statistics"
if year:
title += f" for {year}"
if method:
title += f" (method: {method})"
self.info(f"\n{title}")
self.info("=" * len(title))
self.info(f"Total CVEs: {total_cves}")
self.info(f"CVEs with rules: {cves_with_rules}")
self.info(f"Rule coverage: {rule_coverage:.1f}%")
self.info(f"Total rules: {total_rules}")
self.info(f"Average rules per CVE: {avg_rules_per_cve:.1f}")
if method_counts:
headers = ["Generation Method", "Rule Count", "% of Total"]
rows = []
for gen_method, count in method_counts.most_common():
percentage = (count / total_rules * 100) if total_rules > 0 else 0
rows.append([
gen_method,
str(count),
f"{percentage:.1f}%"
])
self.print_table(headers, rows, "Rules by Generation Method")
def _collect_overview_stats(self, year: Optional[int]) -> Dict:
"""Collect comprehensive overview statistics"""
cves = self.get_all_cves(year)
stats = {
'generated_at': datetime.utcnow().isoformat(),
'filter_year': year,
'total_cves': len(cves),
'severity_breakdown': Counter(),
'yearly_breakdown': Counter(),
'poc_stats': {
'cves_with_pocs': 0,
'total_poc_count': 0
},
'rule_stats': {
'cves_with_rules': 0,
'total_rule_count': 0,
'generation_methods': Counter()
}
}
for cve_id in cves:
try:
metadata = self.load_cve_metadata(cve_id)
if not metadata:
continue
cve_info = metadata.get('cve_info', {})
poc_data = metadata.get('poc_data', {})
# Year breakdown
cve_year = cve_id.split('-')[1]
stats['yearly_breakdown'][cve_year] += 1
# Severity breakdown
severity = cve_info.get('severity', 'Unknown')
stats['severity_breakdown'][severity] += 1
# PoC statistics
poc_count = poc_data.get('poc_count', 0)
if poc_count > 0:
stats['poc_stats']['cves_with_pocs'] += 1
stats['poc_stats']['total_poc_count'] += poc_count
# Rule statistics
rules = self.list_cve_rules(cve_id)
if rules:
stats['rule_stats']['cves_with_rules'] += 1
stats['rule_stats']['total_rule_count'] += len(rules)
for rule_file in rules:
method = rule_file.replace('rule_', '').replace('.sigma', '')
stats['rule_stats']['generation_methods'][method] += 1
except Exception as e:
self.error(f"Error collecting stats for {cve_id}: {e}")
return stats
def _display_overview_stats(self, stats: Dict, year: Optional[int]):
"""Display overview statistics"""
title = f"CVE-SIGMA Overview Statistics"
if year:
title += f" for {year}"
self.info(f"\n{title}")
self.info("=" * len(title))
self.info(f"Generated at: {stats['generated_at']}")
self.info(f"Total CVEs: {stats['total_cves']}")
# PoC coverage
poc_stats = stats['poc_stats']
poc_coverage = (poc_stats['cves_with_pocs'] / stats['total_cves'] * 100) if stats['total_cves'] > 0 else 0
self.info(f"PoC coverage: {poc_coverage:.1f}% ({poc_stats['cves_with_pocs']} CVEs)")
# Rule coverage
rule_stats = stats['rule_stats']
rule_coverage = (rule_stats['cves_with_rules'] / stats['total_cves'] * 100) if stats['total_cves'] > 0 else 0
self.info(f"Rule coverage: {rule_coverage:.1f}% ({rule_stats['cves_with_rules']} CVEs)")
self.info(f"Total rules: {rule_stats['total_rule_count']}")
# Severity breakdown
if stats['severity_breakdown']:
headers = ["Severity", "Count", "Percentage"]
rows = []
for severity, count in stats['severity_breakdown'].most_common():
percentage = (count / stats['total_cves'] * 100) if stats['total_cves'] > 0 else 0
rows.append([severity, str(count), f"{percentage:.1f}%"])
self.print_table(headers, rows, "CVEs by Severity")
# Yearly breakdown (if not filtered by year)
if not year and stats['yearly_breakdown']:
headers = ["Year", "CVE Count"]
rows = []
for cve_year, count in sorted(stats['yearly_breakdown'].items()):
rows.append([cve_year, str(count)])
self.print_table(headers, rows, "CVEs by Year")