diff --git a/miningwood/technical-analysis/technical_analysis_script.py b/miningwood/technical-analysis/technical_analysis_script.py new file mode 100644 index 0000000..8cde38c --- /dev/null +++ b/miningwood/technical-analysis/technical_analysis_script.py @@ -0,0 +1,327 @@ +import warnings +from datetime import datetime, timedelta +import pandas as pd +import numpy as np +warnings.filterwarnings('ignore') + + +class TechnicalAnalyzer: + def __init__(self, data): + """ + Initialize with price data DataFrame + Expected columns: ['date', 'open', 'high', 'low', 'close', 'volume'] + """ + self.data = data.copy() + self.signals = pd.DataFrame() + + def calculate_sma(self, period): + """Simple Moving Average""" + return self.data['close'].rolling(window=period).mean() + + def calculate_ema(self, period): + """Exponential Moving Average""" + return self.data['close'].ewm(span=period).mean() + + def calculate_rsi(self, period=14): + """Relative Strength Index""" + delta = self.data['close'].diff() + gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() + loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() + rs = gain / loss + rsi = 100 - (100 / (1 + rs)) + return rsi + + def calculate_macd(self, fast=12, slow=26, signal=9): + """MACD Indicator""" + ema_fast = self.calculate_ema(fast) + ema_slow = self.calculate_ema(slow) + macd_line = ema_fast - ema_slow + signal_line = macd_line.ewm(span=signal).mean() + histogram = macd_line - signal_line + return macd_line, signal_line, histogram + + def calculate_bollinger_bands(self, period=20, std_dev=2): + """Bollinger Bands""" + sma = self.calculate_sma(period) + std = self.data['close'].rolling(window=period).std() + upper_band = sma + (std * std_dev) + lower_band = sma - (std * std_dev) + return upper_band, sma, lower_band + + def calculate_atr(self, period=14): + """Average True Range""" + high_low = self.data['high'] - self.data['low'] + high_close = np.abs(self.data['high'] - self.data['close'].shift()) + low_close = np.abs(self.data['low'] - self.data['close'].shift()) + ranges = pd.concat([high_low, high_close, low_close], axis=1) + true_range = np.max(ranges, axis=1) + atr = true_range.rolling(window=period).mean() + return atr + + def calculate_volume_indicators(self): + """Volume-based indicators""" + # Volume Moving Average + vol_sma_20 = self.data['volume'].rolling(window=20).mean() + vol_ratio = self.data['volume'] / vol_sma_20 + + # On Balance Volume (OBV) + obv = (np.sign(self.data['close'].diff()) * + self.data['volume']).fillna(0).cumsum() + + return vol_ratio, obv + + def generate_all_indicators(self): + """Calculate all technical indicators""" + # Moving Averages + self.data['sma_20'] = self.calculate_sma(20) + self.data['sma_50'] = self.calculate_sma(50) + self.data['ema_12'] = self.calculate_ema(12) + self.data['ema_26'] = self.calculate_ema(26) + + # RSI + self.data['rsi'] = self.calculate_rsi() + + # MACD + macd, signal, histogram = self.calculate_macd() + self.data['macd'] = macd + self.data['macd_signal'] = signal + self.data['macd_histogram'] = histogram + + # Bollinger Bands + bb_upper, bb_middle, bb_lower = self.calculate_bollinger_bands() + self.data['bb_upper'] = bb_upper + self.data['bb_middle'] = bb_middle + self.data['bb_lower'] = bb_lower + + # ATR + self.data['atr'] = self.calculate_atr() + + # Volume indicators + vol_ratio, obv = self.calculate_volume_indicators() + self.data['vol_ratio'] = vol_ratio + self.data['obv'] = obv + + return self.data + + def identify_entry_signals(self): + """Identify potential entry points""" + signals = [] + + for i in range(1, len(self.data)): + entry_score = 0 + reasons = [] + + current = self.data.iloc[i] + previous = self.data.iloc[i-1] + + # Moving Average Crossover (Golden Cross) + if (current['sma_20'] > current['sma_50'] and + previous['sma_20'] <= previous['sma_50']): + entry_score += 2 + reasons.append("SMA Golden Cross") + + # Price above both MAs + if current['close'] > current['sma_20'] > current['sma_50']: + entry_score += 1 + reasons.append("Price above MAs") + + # RSI oversold recovery + if previous['rsi'] < 30 and current['rsi'] > 30: + entry_score += 2 + reasons.append("RSI oversold recovery") + + # MACD bullish crossover + if (current['macd'] > current['macd_signal'] and + previous['macd'] <= previous['macd_signal']): + entry_score += 2 + reasons.append("MACD bullish crossover") + + # Bollinger Band bounce + if previous['close'] <= previous['bb_lower'] and current['close'] > previous['bb_lower']: + entry_score += 1 + reasons.append("BB lower band bounce") + + # Volume confirmation + if current['vol_ratio'] > 1.5: # 50% above average + entry_score += 1 + reasons.append("High volume") + + # Strong overall conditions + if (current['rsi'] > 40 and current['rsi'] < 70 and + current['macd'] > 0): + entry_score += 1 + reasons.append("Favorable momentum") + + if entry_score >= 3: # Minimum threshold for entry + signals.append({ + 'date': current['date'], + 'type': 'ENTRY', + 'price': current['close'], + 'score': entry_score, + 'reasons': reasons + }) + + return signals + + def identify_exit_signals(self): + """Identify potential exit points""" + signals = [] + + for i in range(1, len(self.data)): + exit_score = 0 + reasons = [] + + current = self.data.iloc[i] + previous = self.data.iloc[i-1] + + # Moving Average bearish cross + if (current['sma_20'] < current['sma_50'] and + previous['sma_20'] >= previous['sma_50']): + exit_score += 2 + reasons.append("SMA Death Cross") + + # Price below key MA + if current['close'] < current['sma_20']: + exit_score += 1 + reasons.append("Price below SMA20") + + # RSI overbought + if current['rsi'] > 70: + exit_score += 1 + reasons.append("RSI overbought") + + # RSI bearish divergence (simplified) + if previous['rsi'] > 70 and current['rsi'] < 70: + exit_score += 2 + reasons.append("RSI overbought exit") + + # MACD bearish crossover + if (current['macd'] < current['macd_signal'] and + previous['macd'] >= previous['macd_signal']): + exit_score += 2 + reasons.append("MACD bearish crossover") + + # Bollinger Band upper touch + if current['close'] >= current['bb_upper']: + exit_score += 1 + reasons.append("BB upper band resistance") + + # Volume spike (could indicate distribution) + if current['vol_ratio'] > 3.0: + exit_score += 1 + reasons.append("Extreme volume spike") + + if exit_score >= 3: # Minimum threshold for exit + signals.append({ + 'date': current['date'], + 'type': 'EXIT', + 'price': current['close'], + 'score': exit_score, + 'reasons': reasons + }) + + return signals + + def analyze_stock(self): + """Complete analysis workflow""" + # Generate all indicators + self.generate_all_indicators() + + # Get entry and exit signals + entry_signals = self.identify_entry_signals() + exit_signals = self.identify_exit_signals() + + # Combine all signals + all_signals = entry_signals + exit_signals + all_signals = sorted(all_signals, key=lambda x: x['date']) + + return all_signals, self.data + +# Example usage and demo data generation + + +def generate_sample_data(days=252): + """Generate sample stock data for demonstration""" + np.random.seed(42) # For reproducible results + + start_date = datetime.now() - timedelta(days=days) + dates = [start_date + timedelta(days=i) for i in range(days)] + + # Generate realistic price movement + returns = np.random.normal(0.001, 0.02, days) # Daily returns + price = 100 # Starting price + prices = [price] + + for ret in returns[1:]: + price *= (1 + ret) + prices.append(price) + + # Generate OHLC data + data = [] + for i, (date, close) in enumerate(zip(dates, prices)): + high = close * (1 + abs(np.random.normal(0, 0.015))) + low = close * (1 - abs(np.random.normal(0, 0.015))) + open_price = low + (high - low) * np.random.random() + volume = int(np.random.normal(1000000, 300000)) + + data.append({ + 'date': date, + 'open': open_price, + 'high': high, + 'low': low, + 'close': close, + 'volume': max(volume, 100000) # Ensure positive volume + }) + + return pd.DataFrame(data) + + +# Demo execution +if __name__ == "__main__": + # Generate sample data + print("Generating sample stock data...") + sample_data = generate_sample_data(180) # 6 months of data + + # Initialize analyzer + analyzer = TechnicalAnalyzer(sample_data) + + # Run complete analysis + print("Analyzing technical indicators...") + signals, enhanced_data = analyzer.analyze_stock() + + # Display results + print("\n=== TECHNICAL ANALYSIS RESULTS ===") + print(f"Analysis period: {len(sample_data)} days") + print(f"Total signals found: {len(signals)}") + + # Show recent indicators + print("\n=== LATEST INDICATOR VALUES ===") + latest = enhanced_data.iloc[-1] + print(f"Price: ${latest['close']:.2f}") + print(f"RSI: {latest['rsi']:.2f}") + print(f"MACD: {latest['macd']:.4f}") + print(f"Volume Ratio: {latest['vol_ratio']:.2f}x") + print(f"20-day SMA: ${latest['sma_20']:.2f}") + print(f"50-day SMA: ${latest['sma_50']:.2f}") + + # Show recent signals + print("\n=== RECENT SIGNALS ===") + recent_signals = [s for s in signals if s['date'] + >= (datetime.now() - timedelta(days=30))] + + if recent_signals: + for signal in recent_signals[-5:]: # Last 5 signals + print(f"\n{signal['type']} Signal:") + print(f" Date: {signal['date'].strftime('%Y-%m-%d')}") + print(f" Price: ${signal['price']:.2f}") + print(f" Score: {signal['score']}") + print(f" Reasons: {', '.join(signal['reasons'])}") + else: + print("No recent signals found.") + + print("\n=== USAGE NOTES ===") + print("1. Replace sample data with real market data from your preferred source") + print("2. Adjust indicator parameters based on your trading style") + print("3. Modify signal thresholds based on backtesting results") + print("4. Always combine with risk management and position sizing") + print("5. Consider market conditions and fundamental analysis")