import os
import pandas as pd
import requests
import numpy as np
import gradio as gr
from datetime import datetime, timedelta
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import yfinance as yf
# Configuration
class Config:
FINNHUB_API_KEY = "cuj17q1r01qm7p9n307gcuj17q1r01qm7p9n3080"
DEFAULT_DAYS = 30 # Reduced from 365 to make it faster
DATA_DIR = "data"
@classmethod
def initialize(cls):
os.makedirs(cls.DATA_DIR, exist_ok=True)
Config.initialize()
# Simple sentiment analyzer
class SentimentAnalyzer:
def __init__(self):
self.analyzer = SentimentIntensityAnalyzer()
def analyze(self, text):
if not isinstance(text, str) or not text.strip():
return 0
return self.analyzer.polarity_scores(text)['compound']
# News fetcher and sentiment analyzer
class StockNewsAnalyzer:
def __init__(self, symbol):
self.symbol = symbol
self.sentiment_analyzer = SentimentAnalyzer()
def get_file_path(self, file_type):
return os.path.join(Config.DATA_DIR, f"{self.symbol}_{file_type}.csv")
def get_news(self, days=Config.DEFAULT_DAYS, force_refresh=False):
"""Fetch news articles from Finnhub API"""
file_path = self.get_file_path("news")
# Return cached data if it exists and no refresh is forced
if os.path.exists(file_path) and not force_refresh:
try:
return pd.read_csv(file_path, parse_dates=['datetime'])
except Exception:
# If the file is corrupted, fetch fresh data
pass
# Calculate date range
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
# Fetch from API
url = "https://finnhub.io/api/v1/company-news"
params = {
"symbol": self.symbol,
"from": start_date.strftime('%Y-%m-%d'),
"to": end_date.strftime('%Y-%m-%d'),
"token": Config.FINNHUB_API_KEY,
}
try:
response = requests.get(url, params=params, timeout=10)
data = response.json()
if not data or not isinstance(data, list):
return pd.DataFrame()
# Create DataFrame
df = pd.DataFrame(data)
if 'datetime' in df.columns:
df['datetime'] = pd.to_datetime(df['datetime'], unit='s')
# Save to CSV
df.to_csv(file_path, index=False)
return df
return pd.DataFrame()
except Exception as e:
print(f"Error fetching news: {e}")
return pd.DataFrame()
def analyze_news_sentiment(self, days=Config.DEFAULT_DAYS, force_refresh=False):
"""Analyze sentiment from news articles"""
news_df = self.get_news(days, force_refresh)
if news_df.empty:
return None, None, None
# Add sentiment scores to headlines
if 'headline' in news_df.columns:
news_df['sentiment_score'] = news_df['headline'].apply(self.sentiment_analyzer.analyze)
# Add date column for daily aggregation
news_df['date'] = news_df['datetime'].dt.date
news_df['date'] = pd.to_datetime(news_df['date'])
# Get stock price for the same period
try:
start_date = news_df['date'].min() - timedelta(days=5) # Get a few days before for context
end_date = news_df['date'].max() + timedelta(days=1)
stock_data = yf.download(self.symbol, start=start_date, end=end_date, progress=False)
if not stock_data.empty and 'Close' in stock_data.columns:
stock_data = stock_data[['Close']]
stock_data.columns = ['close']
stock_data = stock_data.reset_index()
stock_data.rename(columns={'Date': 'date'}, inplace=True)
stock_data['date'] = pd.to_datetime(stock_data['date'].dt.date)
stock_data.set_index('date', inplace=True)
else:
stock_data = pd.DataFrame()
except Exception:
stock_data = pd.DataFrame()
# Group by date for daily sentiment
daily_sentiment = news_df.groupby('date').agg(
avg_sentiment=('sentiment_score', 'mean'),
article_count=('sentiment_score', 'count'),
positive_count=('sentiment_score', lambda x: sum(x > 0.05)),
negative_count=('sentiment_score', lambda x: sum(x < -0.05)),
neutral_count=('sentiment_score', lambda x: sum((x >= -0.05) & (x <= 0.05)))
).reset_index()
# Sort news articles by sentiment (most positive and most negative)
news_df = news_df.sort_values('sentiment_score', ascending=False)
# Get top 5 positive and negative headlines
top_positive = news_df[news_df['sentiment_score'] > 0].head(5)
top_negative = news_df[news_df['sentiment_score'] < 0].tail(5)
# Return sentiment data and headlines
return daily_sentiment, stock_data, pd.concat([top_positive, top_negative])
return None, None, None
# Visualization Functions
def create_sentiment_overview(daily_sentiment, stock_data, top_headlines, symbol):
"""Create a sentiment overview visualization"""
if daily_sentiment is None or daily_sentiment.empty:
return None
# Create figure with secondary y-axis
fig = make_subplots(rows=2, cols=1, specs=[[{"secondary_y": True}], [{}]],
row_heights=[0.7, 0.3], vertical_spacing=0.1)
# Add stock price if available
if not stock_data.empty:
fig.add_trace(
go.Scatter(
x=stock_data.index,
y=stock_data['close'],
name='Stock Price',
line=dict(color='#1f77b4', width=2)
),
row=1, col=1, secondary_y=False
)
# Add daily sentiment score
fig.add_trace(
go.Scatter(
x=daily_sentiment['date'],
y=daily_sentiment['avg_sentiment'],
name='Sentiment Score',
line=dict(color='#ff7f0e', width=2)
),
row=1, col=1, secondary_y=True
)
# Add article count as a bar
fig.add_trace(
go.Bar(
x=daily_sentiment['date'],
y=daily_sentiment['article_count'],
name='Article Count',
marker_color='rgba(135, 206, 235, 0.5)',
opacity=0.7
),
row=2, col=1
)
# Add sentiment breakdown bars (positive, negative, neutral)
fig.add_trace(
go.Bar(
x=daily_sentiment['date'],
y=daily_sentiment['positive_count'],
name='Positive',
marker_color='rgba(0, 128, 0, 0.7)'
),
row=2, col=1
)
fig.add_trace(
go.Bar(
x=daily_sentiment['date'],
y=daily_sentiment['negative_count'],
name='Negative',
marker_color='rgba(255, 0, 0, 0.7)'
),
row=2, col=1
)
fig.add_trace(
go.Bar(
x=daily_sentiment['date'],
y=daily_sentiment['neutral_count'],
name='Neutral',
marker_color='rgba(128, 128, 128, 0.7)'
),
row=2, col=1
)
# Update layout
fig.update_layout(
title=f"{symbol} News Sentiment Analysis",
template='plotly_white',
hovermode='x unified',
barmode='stack',
legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1),
height=700,
margin=dict(l=20, r=20, t=80, b=20)
)
# Update y-axis titles
fig.update_yaxes(title_text="Stock Price", row=1, col=1, secondary_y=False)
fig.update_yaxes(title_text="Sentiment Score", row=1, col=1, secondary_y=True)
fig.update_yaxes(title_text="Article Count", row=2, col=1)
return fig
def format_headlines(headlines_df):
"""Format headlines with sentiment scores for display"""
if headlines_df is None or headlines_df.empty:
return "No headlines available."
# Sort by sentiment score (most positive first)
headlines_df = headlines_df.sort_values('sentiment_score', ascending=False)
result = "## Top Positive Headlines\n\n"
for _, row in headlines_df[headlines_df['sentiment_score'] > 0].head(5).iterrows():
date = row['datetime'].strftime('%Y-%m-%d')
sentiment = row['sentiment_score']
color = "green"
result += f"- **{date}** | [{row['headline']}]({row['url']}) | *{sentiment:.2f}*\n\n"
result += "## Top Negative Headlines\n\n"
for _, row in headlines_df[headlines_df['sentiment_score'] < 0].sort_values('sentiment_score').head(5).iterrows():
date = row['datetime'].strftime('%Y-%m-%d')
sentiment = row['sentiment_score']
color = "red"
result += f"- **{date}** | [{row['headline']}]({row['url']}) | *{sentiment:.2f}*\n\n"
return result
def create_summary(daily_sentiment, symbol):
"""Create a text summary of sentiment analysis"""
if daily_sentiment is None or daily_sentiment.empty:
return f"No sentiment data available for {symbol}."
# Calculate overall sentiment statistics
avg_sentiment = daily_sentiment['avg_sentiment'].mean()
total_articles = daily_sentiment['article_count'].sum()
total_positive = daily_sentiment['positive_count'].sum()
total_negative = daily_sentiment['negative_count'].sum()
total_neutral = daily_sentiment['neutral_count'].sum()
# Determine sentiment trend
sentiment_trend = "neutral"
if avg_sentiment > 0.05:
sentiment_trend = "positive"
elif avg_sentiment < -0.05:
sentiment_trend = "negative"
# Create summary
summary = f"""
## {symbol} Sentiment Summary
### Overview
- **Overall Sentiment**: {sentiment_trend.title()} (Score: {avg_sentiment:.2f})
- **Total Articles**: {total_articles}
- **Date Range**: {daily_sentiment['date'].min().strftime('%Y-%m-%d')} to {daily_sentiment['date'].max().strftime('%Y-%m-%d')}
### Sentiment Breakdown
- **Positive Articles**: {total_positive} ({total_positive/total_articles*100:.1f}%)
- **Negative Articles**: {total_negative} ({total_negative/total_articles*100:.1f}%)
- **Neutral Articles**: {total_neutral} ({total_neutral/total_articles*100:.1f}%)
"""
return summary
# Gradio Interface
def analyze_stock_sentiment(symbol, days, refresh_data):
"""Main function for Gradio interface"""
if not symbol:
return "Please enter a valid stock symbol.", None, "No headlines available."
# Make sure symbol is uppercase
symbol = symbol.upper().strip()
# Create analyzer
analyzer = StockNewsAnalyzer(symbol)
# Get sentiment data
daily_sentiment, stock_data, top_headlines = analyzer.analyze_news_sentiment(days, refresh_data)
if daily_sentiment is None or daily_sentiment.empty:
return f"No news data available for {symbol}. Try another symbol or increase the time range.", None, "No headlines available."
# Create visualization
sentiment_plot = create_sentiment_overview(daily_sentiment, stock_data, top_headlines, symbol)
# Generate summary
summary = create_summary(daily_sentiment, symbol)
# Format headlines
headlines = format_headlines(top_headlines)
return summary, sentiment_plot, headlines
# Build Gradio interface
def build_interface():
"""Create the Gradio interface"""
with gr.Blocks(title="Stock Sentiment Analysis", theme=gr.themes.Soft()) as app:
gr.Markdown("# Stock News Sentiment Analysis")
gr.Markdown("Analyze the sentiment of news articles for any stock symbol")
with gr.Row():
with gr.Column(scale=1):
# Inputs
symbol_input = gr.Textbox(label="Stock Symbol", value="BABA", placeholder="e.g., AAPL, MSFT, GOOGL")
days_input = gr.Slider(label="Days of History", minimum=7, maximum=90, value=90, step=1)
refresh_data = gr.Checkbox(label="Refresh Data", value=False)
analyze_button = gr.Button("Analyze Sentiment", variant="primary")
# Outputs
summary_text = gr.Markdown()
sentiment_plot = gr.Plot()
headlines_text = gr.Markdown()
# Set up event handlers
analyze_button.click(
fn=analyze_stock_sentiment,
inputs=[symbol_input, days_input, refresh_data],
outputs=[summary_text, sentiment_plot, headlines_text]
)
return app
# Main function
def main():
app = build_interface()
app.launch()
if __name__ == "__main__":
main()