""" Claude API client for enhanced SIGMA rule generation. """ import os import logging from typing import Optional, Dict, Any from anthropic import Anthropic logger = logging.getLogger(__name__) class ClaudeClient: """Client for interacting with Claude API for SIGMA rule generation.""" def __init__(self, api_key: Optional[str] = None): """Initialize Claude client with API key.""" self.api_key = api_key or os.getenv('CLAUDE_API_KEY') self.client = None if self.api_key: try: self.client = Anthropic(api_key=self.api_key) logger.info("Claude API client initialized successfully") except Exception as e: logger.error(f"Failed to initialize Claude API client: {e}") self.client = None else: logger.warning("No Claude API key provided. Claude-enhanced rule generation disabled.") def is_available(self) -> bool: """Check if Claude API client is available.""" return self.client is not None async def generate_sigma_rule(self, cve_id: str, poc_content: str, cve_description: str, existing_rule: Optional[str] = None) -> Optional[str]: """ Generate or enhance a SIGMA rule using Claude API. Args: cve_id: CVE identifier poc_content: Proof-of-concept code content from GitHub cve_description: CVE description from NVD existing_rule: Optional existing SIGMA rule to enhance Returns: Generated SIGMA rule YAML content or None if failed """ if not self.is_available(): logger.warning("Claude API client not available") return None try: # Construct the prompt for Claude prompt = self._build_sigma_generation_prompt( cve_id, poc_content, cve_description, existing_rule ) # Make API call to Claude response = self.client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=2000, temperature=0.1, messages=[ { "role": "user", "content": prompt } ] ) # Extract the SIGMA rule from response sigma_rule = self._extract_sigma_rule(response.content[0].text) logger.info(f"Successfully generated SIGMA rule for {cve_id} using Claude") return sigma_rule except Exception as e: logger.error(f"Failed to generate SIGMA rule for {cve_id} using Claude: {e}") return None def _build_sigma_generation_prompt(self, cve_id: str, poc_content: str, cve_description: str, existing_rule: Optional[str] = None) -> str: """Build the prompt for Claude to generate SIGMA rules.""" base_prompt = f"""You are a cybersecurity expert specializing in SIGMA rule creation for threat detection. Your goal is to analyze exploit code from GitHub PoC repositories and create syntactically correct SIGMA rules. **CVE Information:** - CVE ID: {cve_id} - Description: {cve_description} **Proof-of-Concept Code:** ``` {poc_content[:4000]} # Truncate if too long ``` **Your Task:** 1. Analyze the exploit code to identify: - Process execution patterns - File system activities - Network connections - Registry modifications - Command line arguments - Suspicious behaviors 2. Create a SIGMA rule that: - Follows proper SIGMA syntax (YAML format) - Includes appropriate detection logic - Has relevant metadata (title, description, author, date, references) - Uses correct field names for the target log source - Includes proper condition logic - Maps to relevant MITRE ATT&CK techniques when applicable 3. Focus on detection patterns that would catch this specific exploit in action **Important Requirements:** - Output ONLY the SIGMA rule in valid YAML format - Do not include explanations or comments outside the YAML - Use proper SIGMA rule structure with title, id, status, description, references, author, date, logsource, detection, and condition - Make the rule specific enough to detect the exploit but not too narrow to miss variants - Include relevant tags and MITRE ATT&CK technique mappings""" if existing_rule: base_prompt += f""" **Existing SIGMA Rule (to enhance):** ```yaml {existing_rule} ``` Please enhance the existing rule with insights from the PoC code analysis.""" return base_prompt def _extract_sigma_rule(self, response_text: str) -> str: """Extract SIGMA rule YAML from Claude's response.""" # Look for YAML content in the response lines = response_text.split('\n') yaml_lines = [] in_yaml = False for line in lines: if line.strip().startswith('```yaml') or line.strip().startswith('```'): in_yaml = True continue elif line.strip() == '```' and in_yaml: break elif in_yaml or line.strip().startswith('title:'): yaml_lines.append(line) in_yaml = True if not yaml_lines: # If no YAML block found, return the whole response return response_text.strip() return '\n'.join(yaml_lines).strip() async def enhance_existing_rule(self, existing_rule: str, poc_content: str, cve_id: str) -> Optional[str]: """ Enhance an existing SIGMA rule with PoC analysis. Args: existing_rule: Existing SIGMA rule YAML poc_content: PoC code content cve_id: CVE identifier Returns: Enhanced SIGMA rule or None if failed """ if not self.is_available(): return None try: prompt = f"""You are a SIGMA rule enhancement expert. Analyze the following PoC code and enhance the existing SIGMA rule with more specific detection patterns. **CVE ID:** {cve_id} **PoC Code:** ``` {poc_content[:3000]} ``` **Existing SIGMA Rule:** ```yaml {existing_rule} ``` **Task:** Enhance the existing rule by: 1. Adding more specific detection patterns found in the PoC 2. Improving the condition logic 3. Adding relevant tags or MITRE ATT&CK mappings 4. Keeping the rule structure intact but making it more effective Output ONLY the enhanced SIGMA rule in valid YAML format.""" response = self.client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=2000, temperature=0.1, messages=[ { "role": "user", "content": prompt } ] ) enhanced_rule = self._extract_sigma_rule(response.content[0].text) logger.info(f"Successfully enhanced SIGMA rule for {cve_id}") return enhanced_rule except Exception as e: logger.error(f"Failed to enhance SIGMA rule for {cve_id}: {e}") return None