diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..93a4a88 --- /dev/null +++ b/.env.example @@ -0,0 +1,12 @@ +# Twitter API Credentials +# Get these from https://developer.twitter.com/ +TWITTER_BEARER_TOKEN=your_bearer_token_here +TWITTER_API_KEY=your_api_key_here +TWITTER_API_SECRET=your_api_secret_here +TWITTER_ACCESS_TOKEN=your_access_token_here +TWITTER_ACCESS_TOKEN_SECRET=your_access_token_secret_here + +# Application Settings +MAX_TWEETS=100 +CACHE_DURATION=300 +OUTPUT_FORMAT=json \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e3bc5d3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,42 @@ +# Environment files +.env +.env.local + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Data and logs +data/ +*.log + +# Docker +docker-compose.override.yml + +# IDE +.vscode/ +.idea/ +*.swp +*.swo + +# OS +.DS_Store +Thumbs.db \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..295ed65 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,24 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + curl \ + jq \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements first for better caching +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +# Create data and config directories +RUN mkdir -p /app/data /app/config + +# Set executable permissions +RUN chmod +x main.py + +CMD ["python", "main.py"] \ No newline at end of file diff --git a/README.md b/README.md index e69de29..d117172 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,102 @@ +# Security Twitter CLI + +A Docker Compose application that provides a Python-based CLI tool for security researchers to monitor Twitter for vulnerability data, threat intelligence, and security-related discussions. + +## Features + +- **Hashtag Search**: Monitor security-related hashtags like #cybersecurity, #malware, #vulnerability +- **Trending Analysis**: Get security-related trending topics +- **Vulnerability Detection**: Automatic CVE pattern detection and extraction +- **Security Scoring**: Intelligent relevance scoring for tweets +- **Multiple Formats**: Output in table, JSON, or CSV format +- **Caching**: Redis-based caching to manage API rate limits +- **Logging**: Comprehensive logging for monitoring and analysis + +## Quick Start + +1. **Get Twitter API Credentials** + - Apply for a Twitter Developer account at https://developer.twitter.com/ + - Create a new app and generate API keys + +2. **Configure Environment** + ```bash + cp .env.example .env + # Edit .env with your Twitter API credentials + ``` + +3. **Build and Run** + ```bash + docker-compose up --build + ``` + +4. **Search for Security Data** + ```bash + # Search for cybersecurity and malware tweets + docker-compose run twitter-cli python main.py search -h cybersecurity -h malware + + # Get trending security topics + docker-compose run twitter-cli python main.py trending + + # Analyze vulnerability mentions + docker-compose run twitter-cli python main.py analyze -h vulnerability -h cve + ``` + +## CLI Commands + +### Search Command +```bash +python main.py search [OPTIONS] +``` +- `-h, --hashtags`: Security hashtags to search (required, can use multiple) +- `-n, --max-results`: Maximum tweets per hashtag (default: 100) +- `-o, --output`: Output format: table/json/csv (default: table) +- `-s, --min-score`: Minimum security score threshold (default: 1.0) + +### Trending Command +```bash +python main.py trending [OPTIONS] +``` +- `-a, --analyze`: Show detailed analysis + +### Analyze Command +```bash +python main.py analyze [OPTIONS] +``` +- `-h, --hashtags`: Hashtags to analyze (required) +- `-n, --max-results`: Maximum tweets to analyze (default: 100) + +## Example Usage + +```bash +# Search for high-priority security alerts +docker-compose run twitter-cli python main.py search -h cybersecurity -h breach -s 3.0 + +# Export vulnerability data as JSON +docker-compose run twitter-cli python main.py search -h cve -h vulnerability -o json > vulns.json + +# Monitor multiple security topics +docker-compose run twitter-cli python main.py search -h malware -h ransomware -h phishing -n 50 +``` + +## Security Focus + +This tool is designed specifically for **defensive security research**: +- Vulnerability disclosure monitoring +- Threat intelligence gathering +- CVE tracking and analysis +- Security trend identification +- Incident response support + +## Data Storage + +- Logs: `./data/security_twitter.log` +- Cache: Redis container with persistent volume +- Config: `./config/` directory (mounted volume) + +## Requirements + +- Docker and Docker Compose +- Twitter Developer Account with API access +- Internet connection for API calls + +⚠️ **Twitter API Limitation**: The Twitter Free tier API only allows pulling 100 posts per month. Consider upgrading to a paid tier for production use or extensive monitoring. \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..1016ab6 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,36 @@ +version: '3.8' + +services: + twitter-cli: + build: . + container_name: security-twitter-cli + volumes: + - ./data:/app/data + - ./config:/app/config + environment: + - TWITTER_BEARER_TOKEN=${TWITTER_BEARER_TOKEN} + - TWITTER_API_KEY=${TWITTER_API_KEY} + - TWITTER_API_SECRET=${TWITTER_API_SECRET} + - TWITTER_ACCESS_TOKEN=${TWITTER_ACCESS_TOKEN} + - TWITTER_ACCESS_TOKEN_SECRET=${TWITTER_ACCESS_TOKEN_SECRET} + stdin_open: true + tty: true + command: ["python", "main.py"] + networks: + - security-net + + redis: + image: redis:7-alpine + container_name: security-redis + volumes: + - redis_data:/data + networks: + - security-net + command: redis-server --appendonly yes + +networks: + security-net: + driver: bridge + +volumes: + redis_data: \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..e7a99d9 --- /dev/null +++ b/main.py @@ -0,0 +1,323 @@ +#!/usr/bin/env python3 +""" +Security-focused Twitter CLI for threat monitoring and vulnerability research. +""" + +import os +import sys +import json +import logging +from datetime import datetime, timedelta +from typing import List, Dict, Any, Optional + +import click +import tweepy +import redis +import pandas as pd +from colorama import init, Fore, Style +from tabulate import tabulate +from dotenv import load_dotenv + +# Initialize colorama +init() + +# Load environment variables +load_dotenv() + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('data/security_twitter.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + +class SecurityTwitterCLI: + def __init__(self): + self.setup_twitter_api() + self.setup_redis() + self.security_keywords = [ + 'vulnerability', 'exploit', 'malware', 'ransomware', 'phishing', + 'databreach', 'cybersecurity', 'infosec', 'threathunting', 'apt', + 'zeroday', '0day', 'cve', 'security', 'breach', 'attack' + ] + + def setup_twitter_api(self): + """Initialize Twitter API client.""" + try: + bearer_token = os.getenv('TWITTER_BEARER_TOKEN') + if not bearer_token: + raise ValueError("Twitter Bearer Token not found in environment") + + self.twitter_client = tweepy.Client( + bearer_token=bearer_token, + consumer_key=os.getenv('TWITTER_API_KEY'), + consumer_secret=os.getenv('TWITTER_API_SECRET'), + access_token=os.getenv('TWITTER_ACCESS_TOKEN'), + access_token_secret=os.getenv('TWITTER_ACCESS_TOKEN_SECRET'), + wait_on_rate_limit=True + ) + logger.info("Twitter API client initialized successfully") + except Exception as e: + logger.error(f"Failed to initialize Twitter API: {e}") + sys.exit(1) + + def setup_redis(self): + """Initialize Redis connection for caching.""" + try: + self.redis_client = redis.Redis(host='redis', port=6379, db=0, decode_responses=True) + self.redis_client.ping() + logger.info("Redis connection established") + except Exception as e: + logger.warning(f"Redis connection failed: {e}. Caching disabled.") + self.redis_client = None + + def cache_get(self, key: str) -> Optional[str]: + """Get data from cache.""" + if self.redis_client: + return self.redis_client.get(key) + return None + + def cache_set(self, key: str, value: str, ttl: int = 300): + """Set data in cache with TTL.""" + if self.redis_client: + self.redis_client.setex(key, ttl, value) + + def search_security_hashtags(self, hashtags: List[str], max_results: int = 100) -> List[Dict]: + """Search for tweets containing security-related hashtags.""" + all_tweets = [] + + for hashtag in hashtags: + cache_key = f"hashtag:{hashtag}:{max_results}" + cached_data = self.cache_get(cache_key) + + if cached_data: + logger.info(f"Using cached data for #{hashtag}") + all_tweets.extend(json.loads(cached_data)) + continue + + try: + query = f"#{hashtag} -is:retweet lang:en" + tweets = tweepy.Paginator( + self.twitter_client.search_recent_tweets, + query=query, + tweet_fields=['created_at', 'author_id', 'public_metrics', 'context_annotations'], + max_results=min(max_results, 100) + ).flatten(limit=max_results) + + tweet_data = [] + for tweet in tweets: + tweet_info = { + 'id': tweet.id, + 'text': tweet.text, + 'created_at': tweet.created_at.isoformat(), + 'author_id': tweet.author_id, + 'retweet_count': tweet.public_metrics['retweet_count'], + 'like_count': tweet.public_metrics['like_count'], + 'hashtag': hashtag, + 'security_score': self.calculate_security_score(tweet.text) + } + tweet_data.append(tweet_info) + + self.cache_set(cache_key, json.dumps(tweet_data)) + all_tweets.extend(tweet_data) + logger.info(f"Found {len(tweet_data)} tweets for #{hashtag}") + + except Exception as e: + logger.error(f"Error searching #{hashtag}: {e}") + + return all_tweets + + def get_trending_topics(self, woeid: int = 1) -> List[Dict]: + """Get trending topics (worldwide by default).""" + cache_key = f"trending:{woeid}" + cached_data = self.cache_get(cache_key) + + if cached_data: + logger.info("Using cached trending data") + return json.loads(cached_data) + + try: + # Note: This requires Twitter API v1.1 access + auth = tweepy.OAuth1UserHandler( + consumer_key=os.getenv('TWITTER_API_KEY'), + consumer_secret=os.getenv('TWITTER_API_SECRET'), + access_token=os.getenv('TWITTER_ACCESS_TOKEN'), + access_token_secret=os.getenv('TWITTER_ACCESS_TOKEN_SECRET') + ) + api = tweepy.API(auth) + + trends = api.get_place_trends(woeid)[0]['trends'] + + # Filter for security-related trends + security_trends = [] + for trend in trends: + name = trend['name'].lower() + if any(keyword in name for keyword in self.security_keywords): + security_trends.append({ + 'name': trend['name'], + 'url': trend['url'], + 'tweet_volume': trend['tweet_volume'], + 'security_relevance': 'high' + }) + + self.cache_set(cache_key, json.dumps(security_trends), ttl=600) + return security_trends + + except Exception as e: + logger.error(f"Error getting trending topics: {e}") + return [] + + def calculate_security_score(self, text: str) -> float: + """Calculate a security relevance score for tweet text.""" + text_lower = text.lower() + score = 0.0 + + # High-value security keywords + high_value_keywords = ['cve', '0day', 'zeroday', 'exploit', 'malware', 'breach'] + for keyword in high_value_keywords: + if keyword in text_lower: + score += 2.0 + + # Medium-value security keywords + medium_value_keywords = ['vulnerability', 'security', 'threat', 'attack'] + for keyword in medium_value_keywords: + if keyword in text_lower: + score += 1.0 + + # Urgency indicators + urgency_keywords = ['urgent', 'critical', 'immediate', 'alert'] + for keyword in urgency_keywords: + if keyword in text_lower: + score += 1.5 + + return min(score, 10.0) # Cap at 10.0 + + def analyze_vulnerability_mentions(self, tweets: List[Dict]) -> Dict[str, Any]: + """Analyze tweets for vulnerability mentions and patterns.""" + analysis = { + 'total_tweets': len(tweets), + 'high_priority': [], + 'cve_mentions': [], + 'threat_actors': [], + 'common_keywords': {}, + 'timeline_analysis': {} + } + + for tweet in tweets: + # High priority tweets (score > 5) + if tweet['security_score'] > 5.0: + analysis['high_priority'].append(tweet) + + # CVE pattern matching + import re + cve_pattern = r'CVE-\d{4}-\d{4,7}' + cves = re.findall(cve_pattern, tweet['text'], re.IGNORECASE) + if cves: + analysis['cve_mentions'].extend(cves) + + # Keyword frequency + words = tweet['text'].lower().split() + for word in words: + if word in self.security_keywords: + analysis['common_keywords'][word] = analysis['common_keywords'].get(word, 0) + 1 + + return analysis + +@click.group() +def cli(): + """Security-focused Twitter CLI for threat monitoring and vulnerability research.""" + pass + +@cli.command() +@click.option('--hashtags', '-h', multiple=True, required=True, + help='Security hashtags to search for (e.g., -h cybersecurity -h malware)') +@click.option('--max-results', '-n', default=100, + help='Maximum number of tweets to retrieve per hashtag') +@click.option('--output', '-o', type=click.Choice(['table', 'json', 'csv']), default='table', + help='Output format') +@click.option('--min-score', '-s', default=1.0, + help='Minimum security score threshold') +def search(hashtags, max_results, output, min_score): + """Search for security-related tweets by hashtags.""" + click.echo(f"{Fore.CYAN}🔍 Searching for security tweets...{Style.RESET_ALL}") + + twitter_cli = SecurityTwitterCLI() + tweets = twitter_cli.search_security_hashtags(list(hashtags), max_results) + + # Filter by security score + filtered_tweets = [t for t in tweets if t['security_score'] >= min_score] + + if output == 'json': + click.echo(json.dumps(filtered_tweets, indent=2)) + elif output == 'csv': + df = pd.DataFrame(filtered_tweets) + click.echo(df.to_csv(index=False)) + else: # table + if filtered_tweets: + table_data = [] + for tweet in filtered_tweets[:20]: # Show top 20 + table_data.append([ + tweet['created_at'][:10], + tweet['hashtag'], + tweet['text'][:80] + '...' if len(tweet['text']) > 80 else tweet['text'], + f"{tweet['security_score']:.1f}", + tweet['like_count'] + ]) + + headers = ['Date', 'Hashtag', 'Tweet', 'Score', 'Likes'] + click.echo(tabulate(table_data, headers=headers, tablefmt='grid')) + click.echo(f"{Fore.GREEN}Found {len(filtered_tweets)} relevant tweets{Style.RESET_ALL}") + else: + click.echo(f"{Fore.YELLOW}No tweets found matching criteria{Style.RESET_ALL}") + +@cli.command() +@click.option('--analyze', '-a', is_flag=True, help='Show detailed analysis') +def trending(analyze): + """Get security-related trending topics.""" + click.echo(f"{Fore.CYAN}📈 Getting security-related trending topics...{Style.RESET_ALL}") + + twitter_cli = SecurityTwitterCLI() + trends = twitter_cli.get_trending_topics() + + if trends: + for trend in trends: + volume = trend['tweet_volume'] if trend['tweet_volume'] else 'N/A' + click.echo(f"{Fore.GREEN}• {trend['name']}{Style.RESET_ALL} (Volume: {volume})") + else: + click.echo(f"{Fore.YELLOW}No security-related trending topics found{Style.RESET_ALL}") + +@cli.command() +@click.option('--hashtags', '-h', multiple=True, required=True) +@click.option('--max-results', '-n', default=100) +def analyze(hashtags, max_results): + """Analyze vulnerability mentions and threat patterns.""" + click.echo(f"{Fore.CYAN}🔬 Analyzing security threats...{Style.RESET_ALL}") + + twitter_cli = SecurityTwitterCLI() + tweets = twitter_cli.search_security_hashtags(list(hashtags), max_results) + analysis = twitter_cli.analyze_vulnerability_mentions(tweets) + + click.echo(f"\n{Fore.GREEN}📊 Analysis Results:{Style.RESET_ALL}") + click.echo(f"Total tweets analyzed: {analysis['total_tweets']}") + click.echo(f"High priority alerts: {len(analysis['high_priority'])}") + click.echo(f"CVE mentions found: {len(set(analysis['cve_mentions']))}") + + if analysis['cve_mentions']: + click.echo(f"\n{Fore.YELLOW}🚨 CVEs mentioned:{Style.RESET_ALL}") + for cve in set(analysis['cve_mentions']): + click.echo(f" • {cve}") + + if analysis['common_keywords']: + click.echo(f"\n{Fore.BLUE}🔑 Top security keywords:{Style.RESET_ALL}") + sorted_keywords = sorted(analysis['common_keywords'].items(), key=lambda x: x[1], reverse=True) + for keyword, count in sorted_keywords[:10]: + click.echo(f" • {keyword}: {count}") + +if __name__ == '__main__': + # Create data directory if it doesn't exist + os.makedirs('data', exist_ok=True) + cli() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..1932e59 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +tweepy==4.14.0 +click==8.1.7 +redis==5.0.1 +python-dotenv==1.0.0 +requests==2.31.0 +pandas==2.1.4 +colorama==0.4.6 +tabulate==0.9.0 +python-dateutil==2.8.2 \ No newline at end of file