#!/usr/bin/env python3 """ Security-focused Twitter CLI for threat monitoring and vulnerability research. """ import os import sys import json import logging from datetime import datetime, timedelta from typing import List, Dict, Any, Optional import click import tweepy import redis import pandas as pd from colorama import init, Fore, Style from tabulate import tabulate from dotenv import load_dotenv # Initialize colorama init() # Load environment variables load_dotenv() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('data/security_twitter.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class SecurityTwitterCLI: def __init__(self): self.setup_twitter_api() self.setup_redis() self.security_keywords = [ 'vulnerability', 'exploit', 'malware', 'ransomware', 'phishing', 'databreach', 'cybersecurity', 'infosec', 'threathunting', 'apt', 'zeroday', '0day', 'cve', 'security', 'breach', 'attack' ] def setup_twitter_api(self): """Initialize Twitter API client.""" try: bearer_token = os.getenv('TWITTER_BEARER_TOKEN') if not bearer_token: raise ValueError("Twitter Bearer Token not found in environment") self.twitter_client = tweepy.Client( bearer_token=bearer_token, consumer_key=os.getenv('TWITTER_API_KEY'), consumer_secret=os.getenv('TWITTER_API_SECRET'), access_token=os.getenv('TWITTER_ACCESS_TOKEN'), access_token_secret=os.getenv('TWITTER_ACCESS_TOKEN_SECRET'), wait_on_rate_limit=True ) logger.info("Twitter API client initialized successfully") except Exception as e: logger.error(f"Failed to initialize Twitter API: {e}") sys.exit(1) def setup_redis(self): """Initialize Redis connection for caching.""" try: self.redis_client = redis.Redis(host='redis', port=6379, db=0, decode_responses=True) self.redis_client.ping() logger.info("Redis connection established") except Exception as e: logger.warning(f"Redis connection failed: {e}. Caching disabled.") self.redis_client = None def cache_get(self, key: str) -> Optional[str]: """Get data from cache.""" if self.redis_client: return self.redis_client.get(key) return None def cache_set(self, key: str, value: str, ttl: int = 300): """Set data in cache with TTL.""" if self.redis_client: self.redis_client.setex(key, ttl, value) def search_security_hashtags(self, hashtags: List[str], max_results: int = 100) -> List[Dict]: """Search for tweets containing security-related hashtags.""" all_tweets = [] for hashtag in hashtags: cache_key = f"hashtag:{hashtag}:{max_results}" cached_data = self.cache_get(cache_key) if cached_data: logger.info(f"Using cached data for #{hashtag}") all_tweets.extend(json.loads(cached_data)) continue try: query = f"#{hashtag} -is:retweet lang:en" tweets = tweepy.Paginator( self.twitter_client.search_recent_tweets, query=query, tweet_fields=['created_at', 'author_id', 'public_metrics', 'context_annotations'], max_results=min(max_results, 100) ).flatten(limit=max_results) tweet_data = [] for tweet in tweets: tweet_info = { 'id': tweet.id, 'text': tweet.text, 'created_at': tweet.created_at.isoformat(), 'author_id': tweet.author_id, 'retweet_count': tweet.public_metrics['retweet_count'], 'like_count': tweet.public_metrics['like_count'], 'hashtag': hashtag, 'security_score': self.calculate_security_score(tweet.text) } tweet_data.append(tweet_info) self.cache_set(cache_key, json.dumps(tweet_data)) all_tweets.extend(tweet_data) logger.info(f"Found {len(tweet_data)} tweets for #{hashtag}") except Exception as e: logger.error(f"Error searching #{hashtag}: {e}") return all_tweets def get_trending_topics(self, woeid: int = 1) -> List[Dict]: """Get trending topics (worldwide by default).""" cache_key = f"trending:{woeid}" cached_data = self.cache_get(cache_key) if cached_data: logger.info("Using cached trending data") return json.loads(cached_data) try: # Note: This requires Twitter API v1.1 access auth = tweepy.OAuth1UserHandler( consumer_key=os.getenv('TWITTER_API_KEY'), consumer_secret=os.getenv('TWITTER_API_SECRET'), access_token=os.getenv('TWITTER_ACCESS_TOKEN'), access_token_secret=os.getenv('TWITTER_ACCESS_TOKEN_SECRET') ) api = tweepy.API(auth) trends = api.get_place_trends(woeid)[0]['trends'] # Filter for security-related trends security_trends = [] for trend in trends: name = trend['name'].lower() if any(keyword in name for keyword in self.security_keywords): security_trends.append({ 'name': trend['name'], 'url': trend['url'], 'tweet_volume': trend['tweet_volume'], 'security_relevance': 'high' }) self.cache_set(cache_key, json.dumps(security_trends), ttl=600) return security_trends except Exception as e: logger.error(f"Error getting trending topics: {e}") return [] def calculate_security_score(self, text: str) -> float: """Calculate a security relevance score for tweet text.""" text_lower = text.lower() score = 0.0 # High-value security keywords high_value_keywords = ['cve', '0day', 'zeroday', 'exploit', 'malware', 'breach'] for keyword in high_value_keywords: if keyword in text_lower: score += 2.0 # Medium-value security keywords medium_value_keywords = ['vulnerability', 'security', 'threat', 'attack'] for keyword in medium_value_keywords: if keyword in text_lower: score += 1.0 # Urgency indicators urgency_keywords = ['urgent', 'critical', 'immediate', 'alert'] for keyword in urgency_keywords: if keyword in text_lower: score += 1.5 return min(score, 10.0) # Cap at 10.0 def analyze_vulnerability_mentions(self, tweets: List[Dict]) -> Dict[str, Any]: """Analyze tweets for vulnerability mentions and patterns.""" analysis = { 'total_tweets': len(tweets), 'high_priority': [], 'cve_mentions': [], 'threat_actors': [], 'common_keywords': {}, 'timeline_analysis': {} } for tweet in tweets: # High priority tweets (score > 5) if tweet['security_score'] > 5.0: analysis['high_priority'].append(tweet) # CVE pattern matching import re cve_pattern = r'CVE-\d{4}-\d{4,7}' cves = re.findall(cve_pattern, tweet['text'], re.IGNORECASE) if cves: analysis['cve_mentions'].extend(cves) # Keyword frequency words = tweet['text'].lower().split() for word in words: if word in self.security_keywords: analysis['common_keywords'][word] = analysis['common_keywords'].get(word, 0) + 1 return analysis @click.group() def cli(): """Security-focused Twitter CLI for threat monitoring and vulnerability research.""" pass @cli.command() @click.option('--hashtags', '-h', multiple=True, required=True, help='Security hashtags to search for (e.g., -h cybersecurity -h malware)') @click.option('--max-results', '-n', default=100, help='Maximum number of tweets to retrieve per hashtag') @click.option('--output', '-o', type=click.Choice(['table', 'json', 'csv']), default='table', help='Output format') @click.option('--min-score', '-s', default=1.0, help='Minimum security score threshold') def search(hashtags, max_results, output, min_score): """Search for security-related tweets by hashtags.""" click.echo(f"{Fore.CYAN}🔍 Searching for security tweets...{Style.RESET_ALL}") twitter_cli = SecurityTwitterCLI() tweets = twitter_cli.search_security_hashtags(list(hashtags), max_results) # Filter by security score filtered_tweets = [t for t in tweets if t['security_score'] >= min_score] if output == 'json': click.echo(json.dumps(filtered_tweets, indent=2)) elif output == 'csv': df = pd.DataFrame(filtered_tweets) click.echo(df.to_csv(index=False)) else: # table if filtered_tweets: table_data = [] for tweet in filtered_tweets[:20]: # Show top 20 table_data.append([ tweet['created_at'][:10], tweet['hashtag'], tweet['text'][:80] + '...' if len(tweet['text']) > 80 else tweet['text'], f"{tweet['security_score']:.1f}", tweet['like_count'] ]) headers = ['Date', 'Hashtag', 'Tweet', 'Score', 'Likes'] click.echo(tabulate(table_data, headers=headers, tablefmt='grid')) click.echo(f"{Fore.GREEN}Found {len(filtered_tweets)} relevant tweets{Style.RESET_ALL}") else: click.echo(f"{Fore.YELLOW}No tweets found matching criteria{Style.RESET_ALL}") @cli.command() @click.option('--analyze', '-a', is_flag=True, help='Show detailed analysis') def trending(analyze): """Get security-related trending topics.""" click.echo(f"{Fore.CYAN}📈 Getting security-related trending topics...{Style.RESET_ALL}") twitter_cli = SecurityTwitterCLI() trends = twitter_cli.get_trending_topics() if trends: for trend in trends: volume = trend['tweet_volume'] if trend['tweet_volume'] else 'N/A' click.echo(f"{Fore.GREEN}• {trend['name']}{Style.RESET_ALL} (Volume: {volume})") else: click.echo(f"{Fore.YELLOW}No security-related trending topics found{Style.RESET_ALL}") @cli.command() @click.option('--hashtags', '-h', multiple=True, required=True) @click.option('--max-results', '-n', default=100) def analyze(hashtags, max_results): """Analyze vulnerability mentions and threat patterns.""" click.echo(f"{Fore.CYAN}🔬 Analyzing security threats...{Style.RESET_ALL}") twitter_cli = SecurityTwitterCLI() tweets = twitter_cli.search_security_hashtags(list(hashtags), max_results) analysis = twitter_cli.analyze_vulnerability_mentions(tweets) click.echo(f"\n{Fore.GREEN}📊 Analysis Results:{Style.RESET_ALL}") click.echo(f"Total tweets analyzed: {analysis['total_tweets']}") click.echo(f"High priority alerts: {len(analysis['high_priority'])}") click.echo(f"CVE mentions found: {len(set(analysis['cve_mentions']))}") if analysis['cve_mentions']: click.echo(f"\n{Fore.YELLOW}🚨 CVEs mentioned:{Style.RESET_ALL}") for cve in set(analysis['cve_mentions']): click.echo(f" • {cve}") if analysis['common_keywords']: click.echo(f"\n{Fore.BLUE}🔑 Top security keywords:{Style.RESET_ALL}") sorted_keywords = sorted(analysis['common_keywords'].items(), key=lambda x: x[1], reverse=True) for keyword, count in sorted_keywords[:10]: click.echo(f" • {keyword}: {count}") if __name__ == '__main__': # Create data directory if it doesn't exist os.makedirs('data', exist_ok=True) cli()