Files
docker/miningwood/technical-analysis/technical_analysis_script.py

328 lines
11 KiB
Python

import warnings
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
warnings.filterwarnings('ignore')
class TechnicalAnalyzer:
def __init__(self, data):
"""
Initialize with price data DataFrame
Expected columns: ['date', 'open', 'high', 'low', 'close', 'volume']
"""
self.data = data.copy()
self.signals = pd.DataFrame()
def calculate_sma(self, period):
"""Simple Moving Average"""
return self.data['close'].rolling(window=period).mean()
def calculate_ema(self, period):
"""Exponential Moving Average"""
return self.data['close'].ewm(span=period).mean()
def calculate_rsi(self, period=14):
"""Relative Strength Index"""
delta = self.data['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def calculate_macd(self, fast=12, slow=26, signal=9):
"""MACD Indicator"""
ema_fast = self.calculate_ema(fast)
ema_slow = self.calculate_ema(slow)
macd_line = ema_fast - ema_slow
signal_line = macd_line.ewm(span=signal).mean()
histogram = macd_line - signal_line
return macd_line, signal_line, histogram
def calculate_bollinger_bands(self, period=20, std_dev=2):
"""Bollinger Bands"""
sma = self.calculate_sma(period)
std = self.data['close'].rolling(window=period).std()
upper_band = sma + (std * std_dev)
lower_band = sma - (std * std_dev)
return upper_band, sma, lower_band
def calculate_atr(self, period=14):
"""Average True Range"""
high_low = self.data['high'] - self.data['low']
high_close = np.abs(self.data['high'] - self.data['close'].shift())
low_close = np.abs(self.data['low'] - self.data['close'].shift())
ranges = pd.concat([high_low, high_close, low_close], axis=1)
true_range = np.max(ranges, axis=1)
atr = true_range.rolling(window=period).mean()
return atr
def calculate_volume_indicators(self):
"""Volume-based indicators"""
# Volume Moving Average
vol_sma_20 = self.data['volume'].rolling(window=20).mean()
vol_ratio = self.data['volume'] / vol_sma_20
# On Balance Volume (OBV)
obv = (np.sign(self.data['close'].diff()) *
self.data['volume']).fillna(0).cumsum()
return vol_ratio, obv
def generate_all_indicators(self):
"""Calculate all technical indicators"""
# Moving Averages
self.data['sma_20'] = self.calculate_sma(20)
self.data['sma_50'] = self.calculate_sma(50)
self.data['ema_12'] = self.calculate_ema(12)
self.data['ema_26'] = self.calculate_ema(26)
# RSI
self.data['rsi'] = self.calculate_rsi()
# MACD
macd, signal, histogram = self.calculate_macd()
self.data['macd'] = macd
self.data['macd_signal'] = signal
self.data['macd_histogram'] = histogram
# Bollinger Bands
bb_upper, bb_middle, bb_lower = self.calculate_bollinger_bands()
self.data['bb_upper'] = bb_upper
self.data['bb_middle'] = bb_middle
self.data['bb_lower'] = bb_lower
# ATR
self.data['atr'] = self.calculate_atr()
# Volume indicators
vol_ratio, obv = self.calculate_volume_indicators()
self.data['vol_ratio'] = vol_ratio
self.data['obv'] = obv
return self.data
def identify_entry_signals(self):
"""Identify potential entry points"""
signals = []
for i in range(1, len(self.data)):
entry_score = 0
reasons = []
current = self.data.iloc[i]
previous = self.data.iloc[i-1]
# Moving Average Crossover (Golden Cross)
if (current['sma_20'] > current['sma_50'] and
previous['sma_20'] <= previous['sma_50']):
entry_score += 2
reasons.append("SMA Golden Cross")
# Price above both MAs
if current['close'] > current['sma_20'] > current['sma_50']:
entry_score += 1
reasons.append("Price above MAs")
# RSI oversold recovery
if previous['rsi'] < 30 and current['rsi'] > 30:
entry_score += 2
reasons.append("RSI oversold recovery")
# MACD bullish crossover
if (current['macd'] > current['macd_signal'] and
previous['macd'] <= previous['macd_signal']):
entry_score += 2
reasons.append("MACD bullish crossover")
# Bollinger Band bounce
if previous['close'] <= previous['bb_lower'] and current['close'] > previous['bb_lower']:
entry_score += 1
reasons.append("BB lower band bounce")
# Volume confirmation
if current['vol_ratio'] > 1.5: # 50% above average
entry_score += 1
reasons.append("High volume")
# Strong overall conditions
if (current['rsi'] > 40 and current['rsi'] < 70 and
current['macd'] > 0):
entry_score += 1
reasons.append("Favorable momentum")
if entry_score >= 3: # Minimum threshold for entry
signals.append({
'date': current['date'],
'type': 'ENTRY',
'price': current['close'],
'score': entry_score,
'reasons': reasons
})
return signals
def identify_exit_signals(self):
"""Identify potential exit points"""
signals = []
for i in range(1, len(self.data)):
exit_score = 0
reasons = []
current = self.data.iloc[i]
previous = self.data.iloc[i-1]
# Moving Average bearish cross
if (current['sma_20'] < current['sma_50'] and
previous['sma_20'] >= previous['sma_50']):
exit_score += 2
reasons.append("SMA Death Cross")
# Price below key MA
if current['close'] < current['sma_20']:
exit_score += 1
reasons.append("Price below SMA20")
# RSI overbought
if current['rsi'] > 70:
exit_score += 1
reasons.append("RSI overbought")
# RSI bearish divergence (simplified)
if previous['rsi'] > 70 and current['rsi'] < 70:
exit_score += 2
reasons.append("RSI overbought exit")
# MACD bearish crossover
if (current['macd'] < current['macd_signal'] and
previous['macd'] >= previous['macd_signal']):
exit_score += 2
reasons.append("MACD bearish crossover")
# Bollinger Band upper touch
if current['close'] >= current['bb_upper']:
exit_score += 1
reasons.append("BB upper band resistance")
# Volume spike (could indicate distribution)
if current['vol_ratio'] > 3.0:
exit_score += 1
reasons.append("Extreme volume spike")
if exit_score >= 3: # Minimum threshold for exit
signals.append({
'date': current['date'],
'type': 'EXIT',
'price': current['close'],
'score': exit_score,
'reasons': reasons
})
return signals
def analyze_stock(self):
"""Complete analysis workflow"""
# Generate all indicators
self.generate_all_indicators()
# Get entry and exit signals
entry_signals = self.identify_entry_signals()
exit_signals = self.identify_exit_signals()
# Combine all signals
all_signals = entry_signals + exit_signals
all_signals = sorted(all_signals, key=lambda x: x['date'])
return all_signals, self.data
# Example usage and demo data generation
def generate_sample_data(days=252):
"""Generate sample stock data for demonstration"""
np.random.seed(42) # For reproducible results
start_date = datetime.now() - timedelta(days=days)
dates = [start_date + timedelta(days=i) for i in range(days)]
# Generate realistic price movement
returns = np.random.normal(0.001, 0.02, days) # Daily returns
price = 100 # Starting price
prices = [price]
for ret in returns[1:]:
price *= (1 + ret)
prices.append(price)
# Generate OHLC data
data = []
for i, (date, close) in enumerate(zip(dates, prices)):
high = close * (1 + abs(np.random.normal(0, 0.015)))
low = close * (1 - abs(np.random.normal(0, 0.015)))
open_price = low + (high - low) * np.random.random()
volume = int(np.random.normal(1000000, 300000))
data.append({
'date': date,
'open': open_price,
'high': high,
'low': low,
'close': close,
'volume': max(volume, 100000) # Ensure positive volume
})
return pd.DataFrame(data)
# Demo execution
if __name__ == "__main__":
# Generate sample data
print("Generating sample stock data...")
sample_data = generate_sample_data(180) # 6 months of data
# Initialize analyzer
analyzer = TechnicalAnalyzer(sample_data)
# Run complete analysis
print("Analyzing technical indicators...")
signals, enhanced_data = analyzer.analyze_stock()
# Display results
print("\n=== TECHNICAL ANALYSIS RESULTS ===")
print(f"Analysis period: {len(sample_data)} days")
print(f"Total signals found: {len(signals)}")
# Show recent indicators
print("\n=== LATEST INDICATOR VALUES ===")
latest = enhanced_data.iloc[-1]
print(f"Price: ${latest['close']:.2f}")
print(f"RSI: {latest['rsi']:.2f}")
print(f"MACD: {latest['macd']:.4f}")
print(f"Volume Ratio: {latest['vol_ratio']:.2f}x")
print(f"20-day SMA: ${latest['sma_20']:.2f}")
print(f"50-day SMA: ${latest['sma_50']:.2f}")
# Show recent signals
print("\n=== RECENT SIGNALS ===")
recent_signals = [s for s in signals if s['date']
>= (datetime.now() - timedelta(days=30))]
if recent_signals:
for signal in recent_signals[-5:]: # Last 5 signals
print(f"\n{signal['type']} Signal:")
print(f" Date: {signal['date'].strftime('%Y-%m-%d')}")
print(f" Price: ${signal['price']:.2f}")
print(f" Score: {signal['score']}")
print(f" Reasons: {', '.join(signal['reasons'])}")
else:
print("No recent signals found.")
print("\n=== USAGE NOTES ===")
print("1. Replace sample data with real market data from your preferred source")
print("2. Adjust indicator parameters based on your trading style")
print("3. Modify signal thresholds based on backtesting results")
print("4. Always combine with risk management and position sizing")
print("5. Consider market conditions and fundamental analysis")