mirror of
https://github.com/acedanger/docker.git
synced 2025-12-06 01:10:11 -08:00
add technical analysis script with multiple indicators and signal identification
This commit is contained in:
327
miningwood/technical-analysis/technical_analysis_script.py
Normal file
327
miningwood/technical-analysis/technical_analysis_script.py
Normal file
@@ -0,0 +1,327 @@
|
||||
import warnings
|
||||
from datetime import datetime, timedelta
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
warnings.filterwarnings('ignore')
|
||||
|
||||
|
||||
class TechnicalAnalyzer:
|
||||
def __init__(self, data):
|
||||
"""
|
||||
Initialize with price data DataFrame
|
||||
Expected columns: ['date', 'open', 'high', 'low', 'close', 'volume']
|
||||
"""
|
||||
self.data = data.copy()
|
||||
self.signals = pd.DataFrame()
|
||||
|
||||
def calculate_sma(self, period):
|
||||
"""Simple Moving Average"""
|
||||
return self.data['close'].rolling(window=period).mean()
|
||||
|
||||
def calculate_ema(self, period):
|
||||
"""Exponential Moving Average"""
|
||||
return self.data['close'].ewm(span=period).mean()
|
||||
|
||||
def calculate_rsi(self, period=14):
|
||||
"""Relative Strength Index"""
|
||||
delta = self.data['close'].diff()
|
||||
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
|
||||
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
|
||||
rs = gain / loss
|
||||
rsi = 100 - (100 / (1 + rs))
|
||||
return rsi
|
||||
|
||||
def calculate_macd(self, fast=12, slow=26, signal=9):
|
||||
"""MACD Indicator"""
|
||||
ema_fast = self.calculate_ema(fast)
|
||||
ema_slow = self.calculate_ema(slow)
|
||||
macd_line = ema_fast - ema_slow
|
||||
signal_line = macd_line.ewm(span=signal).mean()
|
||||
histogram = macd_line - signal_line
|
||||
return macd_line, signal_line, histogram
|
||||
|
||||
def calculate_bollinger_bands(self, period=20, std_dev=2):
|
||||
"""Bollinger Bands"""
|
||||
sma = self.calculate_sma(period)
|
||||
std = self.data['close'].rolling(window=period).std()
|
||||
upper_band = sma + (std * std_dev)
|
||||
lower_band = sma - (std * std_dev)
|
||||
return upper_band, sma, lower_band
|
||||
|
||||
def calculate_atr(self, period=14):
|
||||
"""Average True Range"""
|
||||
high_low = self.data['high'] - self.data['low']
|
||||
high_close = np.abs(self.data['high'] - self.data['close'].shift())
|
||||
low_close = np.abs(self.data['low'] - self.data['close'].shift())
|
||||
ranges = pd.concat([high_low, high_close, low_close], axis=1)
|
||||
true_range = np.max(ranges, axis=1)
|
||||
atr = true_range.rolling(window=period).mean()
|
||||
return atr
|
||||
|
||||
def calculate_volume_indicators(self):
|
||||
"""Volume-based indicators"""
|
||||
# Volume Moving Average
|
||||
vol_sma_20 = self.data['volume'].rolling(window=20).mean()
|
||||
vol_ratio = self.data['volume'] / vol_sma_20
|
||||
|
||||
# On Balance Volume (OBV)
|
||||
obv = (np.sign(self.data['close'].diff()) *
|
||||
self.data['volume']).fillna(0).cumsum()
|
||||
|
||||
return vol_ratio, obv
|
||||
|
||||
def generate_all_indicators(self):
|
||||
"""Calculate all technical indicators"""
|
||||
# Moving Averages
|
||||
self.data['sma_20'] = self.calculate_sma(20)
|
||||
self.data['sma_50'] = self.calculate_sma(50)
|
||||
self.data['ema_12'] = self.calculate_ema(12)
|
||||
self.data['ema_26'] = self.calculate_ema(26)
|
||||
|
||||
# RSI
|
||||
self.data['rsi'] = self.calculate_rsi()
|
||||
|
||||
# MACD
|
||||
macd, signal, histogram = self.calculate_macd()
|
||||
self.data['macd'] = macd
|
||||
self.data['macd_signal'] = signal
|
||||
self.data['macd_histogram'] = histogram
|
||||
|
||||
# Bollinger Bands
|
||||
bb_upper, bb_middle, bb_lower = self.calculate_bollinger_bands()
|
||||
self.data['bb_upper'] = bb_upper
|
||||
self.data['bb_middle'] = bb_middle
|
||||
self.data['bb_lower'] = bb_lower
|
||||
|
||||
# ATR
|
||||
self.data['atr'] = self.calculate_atr()
|
||||
|
||||
# Volume indicators
|
||||
vol_ratio, obv = self.calculate_volume_indicators()
|
||||
self.data['vol_ratio'] = vol_ratio
|
||||
self.data['obv'] = obv
|
||||
|
||||
return self.data
|
||||
|
||||
def identify_entry_signals(self):
|
||||
"""Identify potential entry points"""
|
||||
signals = []
|
||||
|
||||
for i in range(1, len(self.data)):
|
||||
entry_score = 0
|
||||
reasons = []
|
||||
|
||||
current = self.data.iloc[i]
|
||||
previous = self.data.iloc[i-1]
|
||||
|
||||
# Moving Average Crossover (Golden Cross)
|
||||
if (current['sma_20'] > current['sma_50'] and
|
||||
previous['sma_20'] <= previous['sma_50']):
|
||||
entry_score += 2
|
||||
reasons.append("SMA Golden Cross")
|
||||
|
||||
# Price above both MAs
|
||||
if current['close'] > current['sma_20'] > current['sma_50']:
|
||||
entry_score += 1
|
||||
reasons.append("Price above MAs")
|
||||
|
||||
# RSI oversold recovery
|
||||
if previous['rsi'] < 30 and current['rsi'] > 30:
|
||||
entry_score += 2
|
||||
reasons.append("RSI oversold recovery")
|
||||
|
||||
# MACD bullish crossover
|
||||
if (current['macd'] > current['macd_signal'] and
|
||||
previous['macd'] <= previous['macd_signal']):
|
||||
entry_score += 2
|
||||
reasons.append("MACD bullish crossover")
|
||||
|
||||
# Bollinger Band bounce
|
||||
if previous['close'] <= previous['bb_lower'] and current['close'] > previous['bb_lower']:
|
||||
entry_score += 1
|
||||
reasons.append("BB lower band bounce")
|
||||
|
||||
# Volume confirmation
|
||||
if current['vol_ratio'] > 1.5: # 50% above average
|
||||
entry_score += 1
|
||||
reasons.append("High volume")
|
||||
|
||||
# Strong overall conditions
|
||||
if (current['rsi'] > 40 and current['rsi'] < 70 and
|
||||
current['macd'] > 0):
|
||||
entry_score += 1
|
||||
reasons.append("Favorable momentum")
|
||||
|
||||
if entry_score >= 3: # Minimum threshold for entry
|
||||
signals.append({
|
||||
'date': current['date'],
|
||||
'type': 'ENTRY',
|
||||
'price': current['close'],
|
||||
'score': entry_score,
|
||||
'reasons': reasons
|
||||
})
|
||||
|
||||
return signals
|
||||
|
||||
def identify_exit_signals(self):
|
||||
"""Identify potential exit points"""
|
||||
signals = []
|
||||
|
||||
for i in range(1, len(self.data)):
|
||||
exit_score = 0
|
||||
reasons = []
|
||||
|
||||
current = self.data.iloc[i]
|
||||
previous = self.data.iloc[i-1]
|
||||
|
||||
# Moving Average bearish cross
|
||||
if (current['sma_20'] < current['sma_50'] and
|
||||
previous['sma_20'] >= previous['sma_50']):
|
||||
exit_score += 2
|
||||
reasons.append("SMA Death Cross")
|
||||
|
||||
# Price below key MA
|
||||
if current['close'] < current['sma_20']:
|
||||
exit_score += 1
|
||||
reasons.append("Price below SMA20")
|
||||
|
||||
# RSI overbought
|
||||
if current['rsi'] > 70:
|
||||
exit_score += 1
|
||||
reasons.append("RSI overbought")
|
||||
|
||||
# RSI bearish divergence (simplified)
|
||||
if previous['rsi'] > 70 and current['rsi'] < 70:
|
||||
exit_score += 2
|
||||
reasons.append("RSI overbought exit")
|
||||
|
||||
# MACD bearish crossover
|
||||
if (current['macd'] < current['macd_signal'] and
|
||||
previous['macd'] >= previous['macd_signal']):
|
||||
exit_score += 2
|
||||
reasons.append("MACD bearish crossover")
|
||||
|
||||
# Bollinger Band upper touch
|
||||
if current['close'] >= current['bb_upper']:
|
||||
exit_score += 1
|
||||
reasons.append("BB upper band resistance")
|
||||
|
||||
# Volume spike (could indicate distribution)
|
||||
if current['vol_ratio'] > 3.0:
|
||||
exit_score += 1
|
||||
reasons.append("Extreme volume spike")
|
||||
|
||||
if exit_score >= 3: # Minimum threshold for exit
|
||||
signals.append({
|
||||
'date': current['date'],
|
||||
'type': 'EXIT',
|
||||
'price': current['close'],
|
||||
'score': exit_score,
|
||||
'reasons': reasons
|
||||
})
|
||||
|
||||
return signals
|
||||
|
||||
def analyze_stock(self):
|
||||
"""Complete analysis workflow"""
|
||||
# Generate all indicators
|
||||
self.generate_all_indicators()
|
||||
|
||||
# Get entry and exit signals
|
||||
entry_signals = self.identify_entry_signals()
|
||||
exit_signals = self.identify_exit_signals()
|
||||
|
||||
# Combine all signals
|
||||
all_signals = entry_signals + exit_signals
|
||||
all_signals = sorted(all_signals, key=lambda x: x['date'])
|
||||
|
||||
return all_signals, self.data
|
||||
|
||||
# Example usage and demo data generation
|
||||
|
||||
|
||||
def generate_sample_data(days=252):
|
||||
"""Generate sample stock data for demonstration"""
|
||||
np.random.seed(42) # For reproducible results
|
||||
|
||||
start_date = datetime.now() - timedelta(days=days)
|
||||
dates = [start_date + timedelta(days=i) for i in range(days)]
|
||||
|
||||
# Generate realistic price movement
|
||||
returns = np.random.normal(0.001, 0.02, days) # Daily returns
|
||||
price = 100 # Starting price
|
||||
prices = [price]
|
||||
|
||||
for ret in returns[1:]:
|
||||
price *= (1 + ret)
|
||||
prices.append(price)
|
||||
|
||||
# Generate OHLC data
|
||||
data = []
|
||||
for i, (date, close) in enumerate(zip(dates, prices)):
|
||||
high = close * (1 + abs(np.random.normal(0, 0.015)))
|
||||
low = close * (1 - abs(np.random.normal(0, 0.015)))
|
||||
open_price = low + (high - low) * np.random.random()
|
||||
volume = int(np.random.normal(1000000, 300000))
|
||||
|
||||
data.append({
|
||||
'date': date,
|
||||
'open': open_price,
|
||||
'high': high,
|
||||
'low': low,
|
||||
'close': close,
|
||||
'volume': max(volume, 100000) # Ensure positive volume
|
||||
})
|
||||
|
||||
return pd.DataFrame(data)
|
||||
|
||||
|
||||
# Demo execution
|
||||
if __name__ == "__main__":
|
||||
# Generate sample data
|
||||
print("Generating sample stock data...")
|
||||
sample_data = generate_sample_data(180) # 6 months of data
|
||||
|
||||
# Initialize analyzer
|
||||
analyzer = TechnicalAnalyzer(sample_data)
|
||||
|
||||
# Run complete analysis
|
||||
print("Analyzing technical indicators...")
|
||||
signals, enhanced_data = analyzer.analyze_stock()
|
||||
|
||||
# Display results
|
||||
print("\n=== TECHNICAL ANALYSIS RESULTS ===")
|
||||
print(f"Analysis period: {len(sample_data)} days")
|
||||
print(f"Total signals found: {len(signals)}")
|
||||
|
||||
# Show recent indicators
|
||||
print("\n=== LATEST INDICATOR VALUES ===")
|
||||
latest = enhanced_data.iloc[-1]
|
||||
print(f"Price: ${latest['close']:.2f}")
|
||||
print(f"RSI: {latest['rsi']:.2f}")
|
||||
print(f"MACD: {latest['macd']:.4f}")
|
||||
print(f"Volume Ratio: {latest['vol_ratio']:.2f}x")
|
||||
print(f"20-day SMA: ${latest['sma_20']:.2f}")
|
||||
print(f"50-day SMA: ${latest['sma_50']:.2f}")
|
||||
|
||||
# Show recent signals
|
||||
print("\n=== RECENT SIGNALS ===")
|
||||
recent_signals = [s for s in signals if s['date']
|
||||
>= (datetime.now() - timedelta(days=30))]
|
||||
|
||||
if recent_signals:
|
||||
for signal in recent_signals[-5:]: # Last 5 signals
|
||||
print(f"\n{signal['type']} Signal:")
|
||||
print(f" Date: {signal['date'].strftime('%Y-%m-%d')}")
|
||||
print(f" Price: ${signal['price']:.2f}")
|
||||
print(f" Score: {signal['score']}")
|
||||
print(f" Reasons: {', '.join(signal['reasons'])}")
|
||||
else:
|
||||
print("No recent signals found.")
|
||||
|
||||
print("\n=== USAGE NOTES ===")
|
||||
print("1. Replace sample data with real market data from your preferred source")
|
||||
print("2. Adjust indicator parameters based on your trading style")
|
||||
print("3. Modify signal thresholds based on backtesting results")
|
||||
print("4. Always combine with risk management and position sizing")
|
||||
print("5. Consider market conditions and fundamental analysis")
|
||||
Reference in New Issue
Block a user