🎉 **Architecture Transformation (v2.0)** - Complete migration from web app to professional CLI tool - File-based SIGMA rule management system - Git-friendly directory structure organized by year/CVE-ID - Multiple rule variants per CVE (template, LLM, hybrid) ✨ **New CLI System** - Professional command-line interface with Click framework - 8 command groups: process, generate, search, stats, export, migrate - Modular command architecture for maintainability - Comprehensive help system and configuration management 📁 **File-Based Storage Architecture** - Individual CVE directories: cves/YEAR/CVE-ID/ - Multiple SIGMA rule variants per CVE - JSON metadata with processing history and PoC data - Native YAML files perfect for version control 🚀 **Core CLI Commands** - process: CVE processing and bulk operations - generate: SIGMA rule generation with multiple methods - search: Advanced CVE and rule searching with filters - stats: Comprehensive statistics and analytics - export: Multiple output formats for different workflows - migrate: Database-to-file migration tools 🔧 **Migration Support** - Complete migration utilities from web database - Data validation and integrity checking - Backward compatibility with existing processors - Legacy web interface maintained for transition 📊 **Enhanced Features** - Advanced search with complex filtering (severity, PoC presence, etc.) - Multi-format exports (YAML, JSON, CSV) - Comprehensive statistics and coverage reports - File-based rule versioning and management 🎯 **Production Benefits** - No database dependency - runs anywhere - Perfect for cybersecurity teams using git workflows - Direct integration with SIGMA ecosystems - Portable architecture for CI/CD pipelines - Multiple rule variants for different detection scenarios 📝 **Documentation Updates** - Complete README rewrite for CLI-first approach - Updated CLAUDE.md with new architecture details - Detailed CLI documentation with examples - Migration guides and troubleshooting **Perfect for security teams wanting production-ready SIGMA rules with version control\! 🛡️** 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
296 lines
No EOL
11 KiB
Python
296 lines
No EOL
11 KiB
Python
"""
|
|
Statistics Commands
|
|
|
|
Commands for generating statistics and reports about CVEs and SIGMA rules.
|
|
"""
|
|
|
|
import json
|
|
from datetime import datetime
|
|
from collections import defaultdict, Counter
|
|
from typing import Dict, List, Optional
|
|
from .base_command import BaseCommand
|
|
|
|
class StatsCommands(BaseCommand):
|
|
"""Commands for generating statistics"""
|
|
|
|
async def overview(self, year: Optional[int], output: Optional[str]):
|
|
"""Generate overview statistics"""
|
|
self.info("Generating overview statistics...")
|
|
|
|
# Collect statistics
|
|
stats = self._collect_overview_stats(year)
|
|
|
|
# Display overview
|
|
self._display_overview_stats(stats, year)
|
|
|
|
# Save to file if requested
|
|
if output:
|
|
try:
|
|
with open(output, 'w') as f:
|
|
json.dump(stats, f, indent=2, default=str)
|
|
self.success(f"Statistics saved to {output}")
|
|
except Exception as e:
|
|
self.error(f"Failed to save statistics: {e}")
|
|
|
|
async def poc_stats(self, year: Optional[int]):
|
|
"""Generate PoC coverage statistics"""
|
|
self.info("Generating PoC coverage statistics...")
|
|
|
|
cves = self.get_all_cves(year)
|
|
if not cves:
|
|
self.warning("No CVEs found")
|
|
return
|
|
|
|
# Collect PoC statistics
|
|
total_cves = len(cves)
|
|
cves_with_pocs = 0
|
|
poc_sources = Counter()
|
|
quality_distribution = Counter()
|
|
severity_poc_breakdown = defaultdict(lambda: {'total': 0, 'with_poc': 0})
|
|
|
|
for cve_id in cves:
|
|
try:
|
|
metadata = self.load_cve_metadata(cve_id)
|
|
if not metadata:
|
|
continue
|
|
|
|
cve_info = metadata.get('cve_info', {})
|
|
poc_data = metadata.get('poc_data', {})
|
|
severity = cve_info.get('severity', 'Unknown')
|
|
|
|
severity_poc_breakdown[severity]['total'] += 1
|
|
|
|
poc_count = poc_data.get('poc_count', 0)
|
|
if poc_count > 0:
|
|
cves_with_pocs += 1
|
|
severity_poc_breakdown[severity]['with_poc'] += 1
|
|
|
|
# Count PoC sources
|
|
if 'poc_data' in poc_data:
|
|
poc_info = poc_data['poc_data']
|
|
if 'nomi_sec' in poc_info and poc_info['nomi_sec']:
|
|
poc_sources['nomi_sec'] += len(poc_info['nomi_sec'])
|
|
if 'github' in poc_info and poc_info['github']:
|
|
poc_sources['github'] += len(poc_info['github'])
|
|
if 'exploitdb' in poc_info and poc_info['exploitdb']:
|
|
poc_sources['exploitdb'] += len(poc_info['exploitdb'])
|
|
|
|
# Quality assessment based on PoC count
|
|
if poc_count >= 5:
|
|
quality_distribution['excellent'] += 1
|
|
elif poc_count >= 3:
|
|
quality_distribution['good'] += 1
|
|
elif poc_count >= 1:
|
|
quality_distribution['fair'] += 1
|
|
|
|
except Exception as e:
|
|
self.error(f"Error processing {cve_id}: {e}")
|
|
|
|
# Display PoC statistics
|
|
coverage_percent = (cves_with_pocs / total_cves * 100) if total_cves > 0 else 0
|
|
|
|
title = f"PoC Coverage Statistics"
|
|
if year:
|
|
title += f" for {year}"
|
|
|
|
self.info(f"\n{title}")
|
|
self.info("=" * len(title))
|
|
self.info(f"Total CVEs: {total_cves}")
|
|
self.info(f"CVEs with PoCs: {cves_with_pocs}")
|
|
self.info(f"Coverage: {coverage_percent:.1f}%")
|
|
|
|
if poc_sources:
|
|
self.info(f"\nPoC Sources:")
|
|
for source, count in poc_sources.most_common():
|
|
self.info(f" {source}: {count}")
|
|
|
|
if quality_distribution:
|
|
self.info(f"\nQuality Distribution:")
|
|
for quality, count in quality_distribution.most_common():
|
|
self.info(f" {quality}: {count}")
|
|
|
|
# Severity breakdown table
|
|
if severity_poc_breakdown:
|
|
headers = ["Severity", "Total CVEs", "With PoCs", "Coverage %"]
|
|
rows = []
|
|
|
|
for severity, data in sorted(severity_poc_breakdown.items()):
|
|
coverage = (data['with_poc'] / data['total'] * 100) if data['total'] > 0 else 0
|
|
rows.append([
|
|
severity,
|
|
str(data['total']),
|
|
str(data['with_poc']),
|
|
f"{coverage:.1f}%"
|
|
])
|
|
|
|
self.print_table(headers, rows, "PoC Coverage by Severity")
|
|
|
|
async def rule_stats(self, year: Optional[int], method: Optional[str]):
|
|
"""Generate rule generation statistics"""
|
|
self.info("Generating rule generation statistics...")
|
|
|
|
cves = self.get_all_cves(year)
|
|
if not cves:
|
|
self.warning("No CVEs found")
|
|
return
|
|
|
|
# Collect rule statistics
|
|
total_cves = len(cves)
|
|
cves_with_rules = 0
|
|
method_counts = Counter()
|
|
rules_per_cve = []
|
|
|
|
for cve_id in cves:
|
|
try:
|
|
rules = self.list_cve_rules(cve_id)
|
|
|
|
if method:
|
|
# Filter rules by method
|
|
rules = [r for r in rules if method.lower() in r.lower()]
|
|
|
|
if rules:
|
|
cves_with_rules += 1
|
|
rules_per_cve.append(len(rules))
|
|
|
|
for rule_file in rules:
|
|
rule_method = rule_file.replace('rule_', '').replace('.sigma', '')
|
|
method_counts[rule_method] += 1
|
|
|
|
except Exception as e:
|
|
self.error(f"Error processing {cve_id}: {e}")
|
|
|
|
# Calculate statistics
|
|
rule_coverage = (cves_with_rules / total_cves * 100) if total_cves > 0 else 0
|
|
avg_rules_per_cve = sum(rules_per_cve) / len(rules_per_cve) if rules_per_cve else 0
|
|
total_rules = sum(method_counts.values())
|
|
|
|
# Display rule statistics
|
|
title = f"Rule Generation Statistics"
|
|
if year:
|
|
title += f" for {year}"
|
|
if method:
|
|
title += f" (method: {method})"
|
|
|
|
self.info(f"\n{title}")
|
|
self.info("=" * len(title))
|
|
self.info(f"Total CVEs: {total_cves}")
|
|
self.info(f"CVEs with rules: {cves_with_rules}")
|
|
self.info(f"Rule coverage: {rule_coverage:.1f}%")
|
|
self.info(f"Total rules: {total_rules}")
|
|
self.info(f"Average rules per CVE: {avg_rules_per_cve:.1f}")
|
|
|
|
if method_counts:
|
|
headers = ["Generation Method", "Rule Count", "% of Total"]
|
|
rows = []
|
|
|
|
for gen_method, count in method_counts.most_common():
|
|
percentage = (count / total_rules * 100) if total_rules > 0 else 0
|
|
rows.append([
|
|
gen_method,
|
|
str(count),
|
|
f"{percentage:.1f}%"
|
|
])
|
|
|
|
self.print_table(headers, rows, "Rules by Generation Method")
|
|
|
|
def _collect_overview_stats(self, year: Optional[int]) -> Dict:
|
|
"""Collect comprehensive overview statistics"""
|
|
cves = self.get_all_cves(year)
|
|
|
|
stats = {
|
|
'generated_at': datetime.utcnow().isoformat(),
|
|
'filter_year': year,
|
|
'total_cves': len(cves),
|
|
'severity_breakdown': Counter(),
|
|
'yearly_breakdown': Counter(),
|
|
'poc_stats': {
|
|
'cves_with_pocs': 0,
|
|
'total_poc_count': 0
|
|
},
|
|
'rule_stats': {
|
|
'cves_with_rules': 0,
|
|
'total_rule_count': 0,
|
|
'generation_methods': Counter()
|
|
}
|
|
}
|
|
|
|
for cve_id in cves:
|
|
try:
|
|
metadata = self.load_cve_metadata(cve_id)
|
|
if not metadata:
|
|
continue
|
|
|
|
cve_info = metadata.get('cve_info', {})
|
|
poc_data = metadata.get('poc_data', {})
|
|
|
|
# Year breakdown
|
|
cve_year = cve_id.split('-')[1]
|
|
stats['yearly_breakdown'][cve_year] += 1
|
|
|
|
# Severity breakdown
|
|
severity = cve_info.get('severity', 'Unknown')
|
|
stats['severity_breakdown'][severity] += 1
|
|
|
|
# PoC statistics
|
|
poc_count = poc_data.get('poc_count', 0)
|
|
if poc_count > 0:
|
|
stats['poc_stats']['cves_with_pocs'] += 1
|
|
stats['poc_stats']['total_poc_count'] += poc_count
|
|
|
|
# Rule statistics
|
|
rules = self.list_cve_rules(cve_id)
|
|
if rules:
|
|
stats['rule_stats']['cves_with_rules'] += 1
|
|
stats['rule_stats']['total_rule_count'] += len(rules)
|
|
|
|
for rule_file in rules:
|
|
method = rule_file.replace('rule_', '').replace('.sigma', '')
|
|
stats['rule_stats']['generation_methods'][method] += 1
|
|
|
|
except Exception as e:
|
|
self.error(f"Error collecting stats for {cve_id}: {e}")
|
|
|
|
return stats
|
|
|
|
def _display_overview_stats(self, stats: Dict, year: Optional[int]):
|
|
"""Display overview statistics"""
|
|
title = f"CVE-SIGMA Overview Statistics"
|
|
if year:
|
|
title += f" for {year}"
|
|
|
|
self.info(f"\n{title}")
|
|
self.info("=" * len(title))
|
|
self.info(f"Generated at: {stats['generated_at']}")
|
|
self.info(f"Total CVEs: {stats['total_cves']}")
|
|
|
|
# PoC coverage
|
|
poc_stats = stats['poc_stats']
|
|
poc_coverage = (poc_stats['cves_with_pocs'] / stats['total_cves'] * 100) if stats['total_cves'] > 0 else 0
|
|
self.info(f"PoC coverage: {poc_coverage:.1f}% ({poc_stats['cves_with_pocs']} CVEs)")
|
|
|
|
# Rule coverage
|
|
rule_stats = stats['rule_stats']
|
|
rule_coverage = (rule_stats['cves_with_rules'] / stats['total_cves'] * 100) if stats['total_cves'] > 0 else 0
|
|
self.info(f"Rule coverage: {rule_coverage:.1f}% ({rule_stats['cves_with_rules']} CVEs)")
|
|
self.info(f"Total rules: {rule_stats['total_rule_count']}")
|
|
|
|
# Severity breakdown
|
|
if stats['severity_breakdown']:
|
|
headers = ["Severity", "Count", "Percentage"]
|
|
rows = []
|
|
|
|
for severity, count in stats['severity_breakdown'].most_common():
|
|
percentage = (count / stats['total_cves'] * 100) if stats['total_cves'] > 0 else 0
|
|
rows.append([severity, str(count), f"{percentage:.1f}%"])
|
|
|
|
self.print_table(headers, rows, "CVEs by Severity")
|
|
|
|
# Yearly breakdown (if not filtered by year)
|
|
if not year and stats['yearly_breakdown']:
|
|
headers = ["Year", "CVE Count"]
|
|
rows = []
|
|
|
|
for cve_year, count in sorted(stats['yearly_breakdown'].items()):
|
|
rows.append([cve_year, str(count)])
|
|
|
|
self.print_table(headers, rows, "CVEs by Year") |