twitter_cli/main.py

323 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Security-focused Twitter CLI for threat monitoring and vulnerability research.
"""
import os
import sys
import json
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
import click
import tweepy
import redis
import pandas as pd
from colorama import init, Fore, Style
from tabulate import tabulate
from dotenv import load_dotenv
# Initialize colorama
init()
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('data/security_twitter.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class SecurityTwitterCLI:
def __init__(self):
self.setup_twitter_api()
self.setup_redis()
self.security_keywords = [
'vulnerability', 'exploit', 'malware', 'ransomware', 'phishing',
'databreach', 'cybersecurity', 'infosec', 'threathunting', 'apt',
'zeroday', '0day', 'cve', 'security', 'breach', 'attack'
]
def setup_twitter_api(self):
"""Initialize Twitter API client."""
try:
bearer_token = os.getenv('TWITTER_BEARER_TOKEN')
if not bearer_token:
raise ValueError("Twitter Bearer Token not found in environment")
self.twitter_client = tweepy.Client(
bearer_token=bearer_token,
consumer_key=os.getenv('TWITTER_API_KEY'),
consumer_secret=os.getenv('TWITTER_API_SECRET'),
access_token=os.getenv('TWITTER_ACCESS_TOKEN'),
access_token_secret=os.getenv('TWITTER_ACCESS_TOKEN_SECRET'),
wait_on_rate_limit=True
)
logger.info("Twitter API client initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Twitter API: {e}")
sys.exit(1)
def setup_redis(self):
"""Initialize Redis connection for caching."""
try:
self.redis_client = redis.Redis(host='redis', port=6379, db=0, decode_responses=True)
self.redis_client.ping()
logger.info("Redis connection established")
except Exception as e:
logger.warning(f"Redis connection failed: {e}. Caching disabled.")
self.redis_client = None
def cache_get(self, key: str) -> Optional[str]:
"""Get data from cache."""
if self.redis_client:
return self.redis_client.get(key)
return None
def cache_set(self, key: str, value: str, ttl: int = 300):
"""Set data in cache with TTL."""
if self.redis_client:
self.redis_client.setex(key, ttl, value)
def search_security_hashtags(self, hashtags: List[str], max_results: int = 100) -> List[Dict]:
"""Search for tweets containing security-related hashtags."""
all_tweets = []
for hashtag in hashtags:
cache_key = f"hashtag:{hashtag}:{max_results}"
cached_data = self.cache_get(cache_key)
if cached_data:
logger.info(f"Using cached data for #{hashtag}")
all_tweets.extend(json.loads(cached_data))
continue
try:
query = f"#{hashtag} -is:retweet lang:en"
tweets = tweepy.Paginator(
self.twitter_client.search_recent_tweets,
query=query,
tweet_fields=['created_at', 'author_id', 'public_metrics', 'context_annotations'],
max_results=min(max_results, 100)
).flatten(limit=max_results)
tweet_data = []
for tweet in tweets:
tweet_info = {
'id': tweet.id,
'text': tweet.text,
'created_at': tweet.created_at.isoformat(),
'author_id': tweet.author_id,
'retweet_count': tweet.public_metrics['retweet_count'],
'like_count': tweet.public_metrics['like_count'],
'hashtag': hashtag,
'security_score': self.calculate_security_score(tweet.text)
}
tweet_data.append(tweet_info)
self.cache_set(cache_key, json.dumps(tweet_data))
all_tweets.extend(tweet_data)
logger.info(f"Found {len(tweet_data)} tweets for #{hashtag}")
except Exception as e:
logger.error(f"Error searching #{hashtag}: {e}")
return all_tweets
def get_trending_topics(self, woeid: int = 1) -> List[Dict]:
"""Get trending topics (worldwide by default)."""
cache_key = f"trending:{woeid}"
cached_data = self.cache_get(cache_key)
if cached_data:
logger.info("Using cached trending data")
return json.loads(cached_data)
try:
# Note: This requires Twitter API v1.1 access
auth = tweepy.OAuth1UserHandler(
consumer_key=os.getenv('TWITTER_API_KEY'),
consumer_secret=os.getenv('TWITTER_API_SECRET'),
access_token=os.getenv('TWITTER_ACCESS_TOKEN'),
access_token_secret=os.getenv('TWITTER_ACCESS_TOKEN_SECRET')
)
api = tweepy.API(auth)
trends = api.get_place_trends(woeid)[0]['trends']
# Filter for security-related trends
security_trends = []
for trend in trends:
name = trend['name'].lower()
if any(keyword in name for keyword in self.security_keywords):
security_trends.append({
'name': trend['name'],
'url': trend['url'],
'tweet_volume': trend['tweet_volume'],
'security_relevance': 'high'
})
self.cache_set(cache_key, json.dumps(security_trends), ttl=600)
return security_trends
except Exception as e:
logger.error(f"Error getting trending topics: {e}")
return []
def calculate_security_score(self, text: str) -> float:
"""Calculate a security relevance score for tweet text."""
text_lower = text.lower()
score = 0.0
# High-value security keywords
high_value_keywords = ['cve', '0day', 'zeroday', 'exploit', 'malware', 'breach']
for keyword in high_value_keywords:
if keyword in text_lower:
score += 2.0
# Medium-value security keywords
medium_value_keywords = ['vulnerability', 'security', 'threat', 'attack']
for keyword in medium_value_keywords:
if keyword in text_lower:
score += 1.0
# Urgency indicators
urgency_keywords = ['urgent', 'critical', 'immediate', 'alert']
for keyword in urgency_keywords:
if keyword in text_lower:
score += 1.5
return min(score, 10.0) # Cap at 10.0
def analyze_vulnerability_mentions(self, tweets: List[Dict]) -> Dict[str, Any]:
"""Analyze tweets for vulnerability mentions and patterns."""
analysis = {
'total_tweets': len(tweets),
'high_priority': [],
'cve_mentions': [],
'threat_actors': [],
'common_keywords': {},
'timeline_analysis': {}
}
for tweet in tweets:
# High priority tweets (score > 5)
if tweet['security_score'] > 5.0:
analysis['high_priority'].append(tweet)
# CVE pattern matching
import re
cve_pattern = r'CVE-\d{4}-\d{4,7}'
cves = re.findall(cve_pattern, tweet['text'], re.IGNORECASE)
if cves:
analysis['cve_mentions'].extend(cves)
# Keyword frequency
words = tweet['text'].lower().split()
for word in words:
if word in self.security_keywords:
analysis['common_keywords'][word] = analysis['common_keywords'].get(word, 0) + 1
return analysis
@click.group()
def cli():
"""Security-focused Twitter CLI for threat monitoring and vulnerability research."""
pass
@cli.command()
@click.option('--hashtags', '-h', multiple=True, required=True,
help='Security hashtags to search for (e.g., -h cybersecurity -h malware)')
@click.option('--max-results', '-n', default=100,
help='Maximum number of tweets to retrieve per hashtag')
@click.option('--output', '-o', type=click.Choice(['table', 'json', 'csv']), default='table',
help='Output format')
@click.option('--min-score', '-s', default=1.0,
help='Minimum security score threshold')
def search(hashtags, max_results, output, min_score):
"""Search for security-related tweets by hashtags."""
click.echo(f"{Fore.CYAN}🔍 Searching for security tweets...{Style.RESET_ALL}")
twitter_cli = SecurityTwitterCLI()
tweets = twitter_cli.search_security_hashtags(list(hashtags), max_results)
# Filter by security score
filtered_tweets = [t for t in tweets if t['security_score'] >= min_score]
if output == 'json':
click.echo(json.dumps(filtered_tweets, indent=2))
elif output == 'csv':
df = pd.DataFrame(filtered_tweets)
click.echo(df.to_csv(index=False))
else: # table
if filtered_tweets:
table_data = []
for tweet in filtered_tweets[:20]: # Show top 20
table_data.append([
tweet['created_at'][:10],
tweet['hashtag'],
tweet['text'][:80] + '...' if len(tweet['text']) > 80 else tweet['text'],
f"{tweet['security_score']:.1f}",
tweet['like_count']
])
headers = ['Date', 'Hashtag', 'Tweet', 'Score', 'Likes']
click.echo(tabulate(table_data, headers=headers, tablefmt='grid'))
click.echo(f"{Fore.GREEN}Found {len(filtered_tweets)} relevant tweets{Style.RESET_ALL}")
else:
click.echo(f"{Fore.YELLOW}No tweets found matching criteria{Style.RESET_ALL}")
@cli.command()
@click.option('--analyze', '-a', is_flag=True, help='Show detailed analysis')
def trending(analyze):
"""Get security-related trending topics."""
click.echo(f"{Fore.CYAN}📈 Getting security-related trending topics...{Style.RESET_ALL}")
twitter_cli = SecurityTwitterCLI()
trends = twitter_cli.get_trending_topics()
if trends:
for trend in trends:
volume = trend['tweet_volume'] if trend['tweet_volume'] else 'N/A'
click.echo(f"{Fore.GREEN}{trend['name']}{Style.RESET_ALL} (Volume: {volume})")
else:
click.echo(f"{Fore.YELLOW}No security-related trending topics found{Style.RESET_ALL}")
@cli.command()
@click.option('--hashtags', '-h', multiple=True, required=True)
@click.option('--max-results', '-n', default=100)
def analyze(hashtags, max_results):
"""Analyze vulnerability mentions and threat patterns."""
click.echo(f"{Fore.CYAN}🔬 Analyzing security threats...{Style.RESET_ALL}")
twitter_cli = SecurityTwitterCLI()
tweets = twitter_cli.search_security_hashtags(list(hashtags), max_results)
analysis = twitter_cli.analyze_vulnerability_mentions(tweets)
click.echo(f"\n{Fore.GREEN}📊 Analysis Results:{Style.RESET_ALL}")
click.echo(f"Total tweets analyzed: {analysis['total_tweets']}")
click.echo(f"High priority alerts: {len(analysis['high_priority'])}")
click.echo(f"CVE mentions found: {len(set(analysis['cve_mentions']))}")
if analysis['cve_mentions']:
click.echo(f"\n{Fore.YELLOW}🚨 CVEs mentioned:{Style.RESET_ALL}")
for cve in set(analysis['cve_mentions']):
click.echo(f"{cve}")
if analysis['common_keywords']:
click.echo(f"\n{Fore.BLUE}🔑 Top security keywords:{Style.RESET_ALL}")
sorted_keywords = sorted(analysis['common_keywords'].items(), key=lambda x: x[1], reverse=True)
for keyword, count in sorted_keywords[:10]:
click.echo(f"{keyword}: {count}")
if __name__ == '__main__':
# Create data directory if it doesn't exist
os.makedirs('data', exist_ok=True)
cli()