323 lines
13 KiB
Python
323 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Security-focused Twitter CLI for threat monitoring and vulnerability research.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
import click
|
|
import tweepy
|
|
import redis
|
|
import pandas as pd
|
|
from colorama import init, Fore, Style
|
|
from tabulate import tabulate
|
|
from dotenv import load_dotenv
|
|
|
|
# Initialize colorama
|
|
init()
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('data/security_twitter.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class SecurityTwitterCLI:
|
|
def __init__(self):
|
|
self.setup_twitter_api()
|
|
self.setup_redis()
|
|
self.security_keywords = [
|
|
'vulnerability', 'exploit', 'malware', 'ransomware', 'phishing',
|
|
'databreach', 'cybersecurity', 'infosec', 'threathunting', 'apt',
|
|
'zeroday', '0day', 'cve', 'security', 'breach', 'attack'
|
|
]
|
|
|
|
def setup_twitter_api(self):
|
|
"""Initialize Twitter API client."""
|
|
try:
|
|
bearer_token = os.getenv('TWITTER_BEARER_TOKEN')
|
|
if not bearer_token:
|
|
raise ValueError("Twitter Bearer Token not found in environment")
|
|
|
|
self.twitter_client = tweepy.Client(
|
|
bearer_token=bearer_token,
|
|
consumer_key=os.getenv('TWITTER_API_KEY'),
|
|
consumer_secret=os.getenv('TWITTER_API_SECRET'),
|
|
access_token=os.getenv('TWITTER_ACCESS_TOKEN'),
|
|
access_token_secret=os.getenv('TWITTER_ACCESS_TOKEN_SECRET'),
|
|
wait_on_rate_limit=True
|
|
)
|
|
logger.info("Twitter API client initialized successfully")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize Twitter API: {e}")
|
|
sys.exit(1)
|
|
|
|
def setup_redis(self):
|
|
"""Initialize Redis connection for caching."""
|
|
try:
|
|
self.redis_client = redis.Redis(host='redis', port=6379, db=0, decode_responses=True)
|
|
self.redis_client.ping()
|
|
logger.info("Redis connection established")
|
|
except Exception as e:
|
|
logger.warning(f"Redis connection failed: {e}. Caching disabled.")
|
|
self.redis_client = None
|
|
|
|
def cache_get(self, key: str) -> Optional[str]:
|
|
"""Get data from cache."""
|
|
if self.redis_client:
|
|
return self.redis_client.get(key)
|
|
return None
|
|
|
|
def cache_set(self, key: str, value: str, ttl: int = 300):
|
|
"""Set data in cache with TTL."""
|
|
if self.redis_client:
|
|
self.redis_client.setex(key, ttl, value)
|
|
|
|
def search_security_hashtags(self, hashtags: List[str], max_results: int = 100) -> List[Dict]:
|
|
"""Search for tweets containing security-related hashtags."""
|
|
all_tweets = []
|
|
|
|
for hashtag in hashtags:
|
|
cache_key = f"hashtag:{hashtag}:{max_results}"
|
|
cached_data = self.cache_get(cache_key)
|
|
|
|
if cached_data:
|
|
logger.info(f"Using cached data for #{hashtag}")
|
|
all_tweets.extend(json.loads(cached_data))
|
|
continue
|
|
|
|
try:
|
|
query = f"#{hashtag} -is:retweet lang:en"
|
|
tweets = tweepy.Paginator(
|
|
self.twitter_client.search_recent_tweets,
|
|
query=query,
|
|
tweet_fields=['created_at', 'author_id', 'public_metrics', 'context_annotations'],
|
|
max_results=min(max_results, 100)
|
|
).flatten(limit=max_results)
|
|
|
|
tweet_data = []
|
|
for tweet in tweets:
|
|
tweet_info = {
|
|
'id': tweet.id,
|
|
'text': tweet.text,
|
|
'created_at': tweet.created_at.isoformat(),
|
|
'author_id': tweet.author_id,
|
|
'retweet_count': tweet.public_metrics['retweet_count'],
|
|
'like_count': tweet.public_metrics['like_count'],
|
|
'hashtag': hashtag,
|
|
'security_score': self.calculate_security_score(tweet.text)
|
|
}
|
|
tweet_data.append(tweet_info)
|
|
|
|
self.cache_set(cache_key, json.dumps(tweet_data))
|
|
all_tweets.extend(tweet_data)
|
|
logger.info(f"Found {len(tweet_data)} tweets for #{hashtag}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching #{hashtag}: {e}")
|
|
|
|
return all_tweets
|
|
|
|
def get_trending_topics(self, woeid: int = 1) -> List[Dict]:
|
|
"""Get trending topics (worldwide by default)."""
|
|
cache_key = f"trending:{woeid}"
|
|
cached_data = self.cache_get(cache_key)
|
|
|
|
if cached_data:
|
|
logger.info("Using cached trending data")
|
|
return json.loads(cached_data)
|
|
|
|
try:
|
|
# Note: This requires Twitter API v1.1 access
|
|
auth = tweepy.OAuth1UserHandler(
|
|
consumer_key=os.getenv('TWITTER_API_KEY'),
|
|
consumer_secret=os.getenv('TWITTER_API_SECRET'),
|
|
access_token=os.getenv('TWITTER_ACCESS_TOKEN'),
|
|
access_token_secret=os.getenv('TWITTER_ACCESS_TOKEN_SECRET')
|
|
)
|
|
api = tweepy.API(auth)
|
|
|
|
trends = api.get_place_trends(woeid)[0]['trends']
|
|
|
|
# Filter for security-related trends
|
|
security_trends = []
|
|
for trend in trends:
|
|
name = trend['name'].lower()
|
|
if any(keyword in name for keyword in self.security_keywords):
|
|
security_trends.append({
|
|
'name': trend['name'],
|
|
'url': trend['url'],
|
|
'tweet_volume': trend['tweet_volume'],
|
|
'security_relevance': 'high'
|
|
})
|
|
|
|
self.cache_set(cache_key, json.dumps(security_trends), ttl=600)
|
|
return security_trends
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting trending topics: {e}")
|
|
return []
|
|
|
|
def calculate_security_score(self, text: str) -> float:
|
|
"""Calculate a security relevance score for tweet text."""
|
|
text_lower = text.lower()
|
|
score = 0.0
|
|
|
|
# High-value security keywords
|
|
high_value_keywords = ['cve', '0day', 'zeroday', 'exploit', 'malware', 'breach']
|
|
for keyword in high_value_keywords:
|
|
if keyword in text_lower:
|
|
score += 2.0
|
|
|
|
# Medium-value security keywords
|
|
medium_value_keywords = ['vulnerability', 'security', 'threat', 'attack']
|
|
for keyword in medium_value_keywords:
|
|
if keyword in text_lower:
|
|
score += 1.0
|
|
|
|
# Urgency indicators
|
|
urgency_keywords = ['urgent', 'critical', 'immediate', 'alert']
|
|
for keyword in urgency_keywords:
|
|
if keyword in text_lower:
|
|
score += 1.5
|
|
|
|
return min(score, 10.0) # Cap at 10.0
|
|
|
|
def analyze_vulnerability_mentions(self, tweets: List[Dict]) -> Dict[str, Any]:
|
|
"""Analyze tweets for vulnerability mentions and patterns."""
|
|
analysis = {
|
|
'total_tweets': len(tweets),
|
|
'high_priority': [],
|
|
'cve_mentions': [],
|
|
'threat_actors': [],
|
|
'common_keywords': {},
|
|
'timeline_analysis': {}
|
|
}
|
|
|
|
for tweet in tweets:
|
|
# High priority tweets (score > 5)
|
|
if tweet['security_score'] > 5.0:
|
|
analysis['high_priority'].append(tweet)
|
|
|
|
# CVE pattern matching
|
|
import re
|
|
cve_pattern = r'CVE-\d{4}-\d{4,7}'
|
|
cves = re.findall(cve_pattern, tweet['text'], re.IGNORECASE)
|
|
if cves:
|
|
analysis['cve_mentions'].extend(cves)
|
|
|
|
# Keyword frequency
|
|
words = tweet['text'].lower().split()
|
|
for word in words:
|
|
if word in self.security_keywords:
|
|
analysis['common_keywords'][word] = analysis['common_keywords'].get(word, 0) + 1
|
|
|
|
return analysis
|
|
|
|
@click.group()
|
|
def cli():
|
|
"""Security-focused Twitter CLI for threat monitoring and vulnerability research."""
|
|
pass
|
|
|
|
@cli.command()
|
|
@click.option('--hashtags', '-h', multiple=True, required=True,
|
|
help='Security hashtags to search for (e.g., -h cybersecurity -h malware)')
|
|
@click.option('--max-results', '-n', default=100,
|
|
help='Maximum number of tweets to retrieve per hashtag')
|
|
@click.option('--output', '-o', type=click.Choice(['table', 'json', 'csv']), default='table',
|
|
help='Output format')
|
|
@click.option('--min-score', '-s', default=1.0,
|
|
help='Minimum security score threshold')
|
|
def search(hashtags, max_results, output, min_score):
|
|
"""Search for security-related tweets by hashtags."""
|
|
click.echo(f"{Fore.CYAN}🔍 Searching for security tweets...{Style.RESET_ALL}")
|
|
|
|
twitter_cli = SecurityTwitterCLI()
|
|
tweets = twitter_cli.search_security_hashtags(list(hashtags), max_results)
|
|
|
|
# Filter by security score
|
|
filtered_tweets = [t for t in tweets if t['security_score'] >= min_score]
|
|
|
|
if output == 'json':
|
|
click.echo(json.dumps(filtered_tweets, indent=2))
|
|
elif output == 'csv':
|
|
df = pd.DataFrame(filtered_tweets)
|
|
click.echo(df.to_csv(index=False))
|
|
else: # table
|
|
if filtered_tweets:
|
|
table_data = []
|
|
for tweet in filtered_tweets[:20]: # Show top 20
|
|
table_data.append([
|
|
tweet['created_at'][:10],
|
|
tweet['hashtag'],
|
|
tweet['text'][:80] + '...' if len(tweet['text']) > 80 else tweet['text'],
|
|
f"{tweet['security_score']:.1f}",
|
|
tweet['like_count']
|
|
])
|
|
|
|
headers = ['Date', 'Hashtag', 'Tweet', 'Score', 'Likes']
|
|
click.echo(tabulate(table_data, headers=headers, tablefmt='grid'))
|
|
click.echo(f"{Fore.GREEN}Found {len(filtered_tweets)} relevant tweets{Style.RESET_ALL}")
|
|
else:
|
|
click.echo(f"{Fore.YELLOW}No tweets found matching criteria{Style.RESET_ALL}")
|
|
|
|
@cli.command()
|
|
@click.option('--analyze', '-a', is_flag=True, help='Show detailed analysis')
|
|
def trending(analyze):
|
|
"""Get security-related trending topics."""
|
|
click.echo(f"{Fore.CYAN}📈 Getting security-related trending topics...{Style.RESET_ALL}")
|
|
|
|
twitter_cli = SecurityTwitterCLI()
|
|
trends = twitter_cli.get_trending_topics()
|
|
|
|
if trends:
|
|
for trend in trends:
|
|
volume = trend['tweet_volume'] if trend['tweet_volume'] else 'N/A'
|
|
click.echo(f"{Fore.GREEN}• {trend['name']}{Style.RESET_ALL} (Volume: {volume})")
|
|
else:
|
|
click.echo(f"{Fore.YELLOW}No security-related trending topics found{Style.RESET_ALL}")
|
|
|
|
@cli.command()
|
|
@click.option('--hashtags', '-h', multiple=True, required=True)
|
|
@click.option('--max-results', '-n', default=100)
|
|
def analyze(hashtags, max_results):
|
|
"""Analyze vulnerability mentions and threat patterns."""
|
|
click.echo(f"{Fore.CYAN}🔬 Analyzing security threats...{Style.RESET_ALL}")
|
|
|
|
twitter_cli = SecurityTwitterCLI()
|
|
tweets = twitter_cli.search_security_hashtags(list(hashtags), max_results)
|
|
analysis = twitter_cli.analyze_vulnerability_mentions(tweets)
|
|
|
|
click.echo(f"\n{Fore.GREEN}📊 Analysis Results:{Style.RESET_ALL}")
|
|
click.echo(f"Total tweets analyzed: {analysis['total_tweets']}")
|
|
click.echo(f"High priority alerts: {len(analysis['high_priority'])}")
|
|
click.echo(f"CVE mentions found: {len(set(analysis['cve_mentions']))}")
|
|
|
|
if analysis['cve_mentions']:
|
|
click.echo(f"\n{Fore.YELLOW}🚨 CVEs mentioned:{Style.RESET_ALL}")
|
|
for cve in set(analysis['cve_mentions']):
|
|
click.echo(f" • {cve}")
|
|
|
|
if analysis['common_keywords']:
|
|
click.echo(f"\n{Fore.BLUE}🔑 Top security keywords:{Style.RESET_ALL}")
|
|
sorted_keywords = sorted(analysis['common_keywords'].items(), key=lambda x: x[1], reverse=True)
|
|
for keyword, count in sorted_keywords[:10]:
|
|
click.echo(f" • {keyword}: {count}")
|
|
|
|
if __name__ == '__main__':
|
|
# Create data directory if it doesn't exist
|
|
os.makedirs('data', exist_ok=True)
|
|
cli()
|