用Python和Pandas手把手教你计算股票技术指标(MA、MACD、KDJ、RSI、OBV保姆级代码)
用Python和Pandas实现股票技术指标全解析从数据清洗到策略回测在量化投资领域技术指标分析是识别市场趋势、判断买卖时机的重要工具。对于刚接触Python数据分析的投资者来说如何将教科书上的指标公式转化为可执行的代码往往是个挑战。本文将带你用Pandas库完整实现MA、MACD、KDJ、RSI、OBV五大核心指标并分享实际应用中的数据处理技巧和可视化方法。1. 环境准备与数据清洗在开始计算指标前我们需要搭建合适的工作环境并处理好原始数据。假设我们已经从数据源获取了包含以下字段的股票交易数据股票代码(Stkcd)、交易日期(Trddt)、开盘价、最高价、最低价、收盘价、成交量、成交金额。首先安装必要的Python库pip install pandas numpy matplotlib ta-lib提示虽然TA-Lib是专业的技术指标计算库但本文选择用原生Pandas实现以帮助理解底层计算逻辑。数据清洗是量化分析的关键第一步常见的处理包括import pandas as pd # 读取Excel数据 df pd.read_excel(trd_data.xlsx) # 基础清洗步骤 def clean_data(df, stock_code, start_date, end_date): # 筛选特定股票和时间范围 data df[(df[Stkcd] stock_code) (df[Trddt].between(start_date, end_date))] # 按日期排序 data data.sort_values(Trddt).reset_index(dropTrue) # 处理缺失值 - 这里采用前后填充法 data[[开盘价,最高价,最低价,收盘价]] data[[开盘价,最高价,最低价,收盘价]].fillna(methodffill).fillna(methodbfill) # 验证数据连续性 date_range pd.date_range(startstart_date, endend_date) missing_dates date_range.difference(data[Trddt]) if not missing_dates.empty: print(f警告缺失{len(missing_dates)}个交易日数据) return data # 示例调用 cleaned_data clean_data(df, 601668, 2017-01-01, 2018-01-01)常见的数据质量问题及处理方法问题类型检测方法处理方案缺失值isna().sum()前向填充/线性插值异常值3σ原则或IQRWinsorize处理或剔除日期不连续日期序列比对补充交易日或调整计算窗口价格异常涨跌幅限制检查根据市场规则修正2. 移动平均线(MA)实现与优化移动平均线是最基础的趋势指标通过平滑价格数据来识别市场方向。我们将实现三种不同周期的MA并探讨计算效率优化方案。2.1 基础MA计算Pandas的rolling方法可以方便地计算移动平均def calculate_ma(data, windows[5, 10, 20]): ma_df pd.DataFrame() close data[收盘价] for window in windows: ma_df[fMA{window}] close.rolling(windowwindow).mean() # 处理初期NaN值 ma_df ma_df.dropna() return ma_df ma_results calculate_ma(cleaned_data)2.2 计算优化技巧当处理大数据量时标准的rolling方法可能效率不高。我们可以采用以下优化策略并行计算使用swifter库加速pandas操作指数加权移动平均(EWMA)对近期数据赋予更高权重缓存机制避免重复计算相同窗口的MA优化后的EWMA实现示例def enhanced_ma(data, windows[5, 10, 20], alpha0.3): ma_df pd.DataFrame() close data[收盘价] for window in windows: # 组合简单MA和指数加权MA simple_ma close.rolling(window).mean() ewma close.ewm(spanwindow, alphaalpha).mean() ma_df[fMA{window}] (simple_ma ewma) / 2 return ma_df.dropna()2.3 MA交叉策略实现MA交叉是常见的交易信号生成方法def ma_cross_signal(ma_df): signals pd.DataFrame(indexma_df.index) signals[price] cleaned_data.loc[ma_df.index, 收盘价] # 生成金叉/死叉信号 signals[5_10_cross] np.where(ma_df[MA5] ma_df[MA10], 1, -1) signals[signal] signals[5_10_cross].diff() # 1表示金叉买入信号-1表示死叉卖出信号 signals[trade_signal] np.where(signals[signal] 0, 1, np.where(signals[signal] 0, -1, 0)) return signals3. MACD指标深度解析与实现MACD(Moving Average Convergence Divergence)是结合趋势和动量的经典指标由DIF、DEA和MACD柱三部分组成。3.1 标准MACD实现def calculate_macd(data, fast12, slow26, signal9): close data[收盘价] # 计算长短EMA ema_fast close.ewm(spanfast, adjustFalse).mean() ema_slow close.ewm(spanslow, adjustFalse).mean() # 计算DIF和DEA dif ema_fast - ema_slow dea dif.ewm(spansignal, adjustFalse).mean() # MACD柱状图 macd_hist 2 * (dif - dea) return pd.DataFrame({ DIF: dif, DEA: dea, MACD: macd_hist }).dropna() macd_results calculate_macd(cleaned_data)3.2 MACD参数优化不同市场环境下标准参数可能不是最优选择。我们可以通过网格搜索寻找最佳参数组合from sklearn.metrics import sharpe_ratio def optimize_macd(data, fast_range(8,20), slow_range(20,35), signal_range(5,15)): best_sharpe -np.inf best_params {} for fast in range(*fast_range): for slow in range(*slow_range): if slow fast: continue for signal in range(*signal_range): macd calculate_macd(data, fast, slow, signal) # 这里简化计算实际应基于交易信号回测 returns macd[MACD].pct_change().dropna() sharpe returns.mean() / returns.std() if sharpe best_sharpe: best_sharpe sharpe best_params {fast:fast, slow:slow, signal:signal} return best_params, best_sharpe3.3 MACD背离检测价格与MACD的背离是重要的反转信号def detect_divergence(data, macd_df, lookback30): close data[收盘价] macd macd_df[MACD] # 寻找价格高点 price_highs close.rolling(lookback).max() macd_highs macd.rolling(lookback).max() # 顶背离价格新高但MACD未新高 top_divergence (close price_highs) (macd macd_highs) # 底背离价格新低但MACD未新低 price_lows close.rolling(lookback).min() macd_lows macd.rolling(lookback).min() bottom_divergence (close price_lows) (macd macd_lows) return pd.DataFrame({ top_divergence: top_divergence, bottom_divergence: bottom_divergence }).dropna()4. KDJ指标实现与交易策略KDJ指标是随机振荡器的一种特别适合震荡市中的超买超卖判断。4.1 标准KDJ计算def calculate_kdj(data, n9, m13, m23): high data[最高价] low data[最低价] close data[收盘价] # 计算RSV值 lowest_low low.rolling(n).min() highest_high high.rolling(n).max() rsv (close - lowest_low) / (highest_high - lowest_low) * 100 # 初始化KDJ值 k pd.Series(50, indexrsv.index) d pd.Series(50, indexrsv.index) j pd.Series(0, indexrsv.index) # 递归计算KDJ for i in range(1, len(rsv)): k.iloc[i] (m1-1)/m1 * k.iloc[i-1] 1/m1 * rsv.iloc[i] d.iloc[i] (m2-1)/m2 * d.iloc[i-1] 1/m2 * k.iloc[i] j.iloc[i] 3 * k.iloc[i] - 2 * d.iloc[i] return pd.DataFrame({ K: k, D: d, J: j }).dropna() kdj_results calculate_kdj(cleaned_data)4.2 KDJ交易信号生成基于KDJ的常见交易规则超买区域(K80/D70/J90)考虑卖出超卖区域(K20/D30/J10)考虑买入K线从下向上穿越D线买入信号K线从上向下穿越D线卖出信号实现代码def kdj_trading_signals(kdj_df): signals pd.DataFrame(indexkdj_df.index) # 超买超卖信号 signals[overbought] (kdj_df[K] 80) | (kdj_df[D] 70) | (kdj_df[J] 90) signals[oversold] (kdj_df[K] 20) | (kdj_df[D] 30) | (kdj_df[J] 10) # 金叉死叉信号 signals[K_D_cross] np.where(kdj_df[K] kdj_df[D], 1, -1) signals[cross_signal] signals[K_D_cross].diff() # 综合信号 signals[buy] signals[oversold] (signals[cross_signal] 0) signals[sell] signals[overbought] (signals[cross_signal] 0) return signals.dropna()4.3 KDJ参数敏感性分析不同参数对KDJ信号的影响参数组合敏感度适用场景(9,3,3)中等通用设置(14,3,3)较低趋势市场(5,3,3)较高震荡市场(9,5,5)平滑减少假信号测试不同参数表现的代码框架def test_kdj_parameters(data, param_sets): results [] for params in param_sets: kdj calculate_kdj(data, *params) signals kdj_trading_signals(kdj) # 这里应添加回测逻辑 performance {} # 存储收益率、胜率等指标 results.append((params, performance)) return pd.DataFrame(results, columns[params, performance])5. RSI与OBV指标实现及应用相对强弱指数(RSI)和能量潮(OBV)分别从动量和成交量角度提供市场洞察。5.1 多周期RSI计算def calculate_rsi(data, periods[6, 12, 24]): close data[收盘价] delta close.diff() rsi_df pd.DataFrame() for period in periods: # 计算涨跌幅 gain delta.where(delta 0, 0) loss -delta.where(delta 0, 0) # 计算平均增益和平均损失 avg_gain gain.rolling(period).mean() avg_loss loss.rolling(period).mean() # 计算RS和RSI rs avg_gain / avg_loss rsi 100 - (100 / (1 rs)) rsi_df[fRSI{period}] rsi return rsi_df.dropna() rsi_results calculate_rsi(cleaned_data)5.2 RSI背离检测def detect_rsi_divergence(data, rsi_df, period14, lookback30): close data[收盘价] rsi rsi_df[fRSI{period}] # 寻找价格和RSI极值点 price_highs close.rolling(lookback).max() rsi_highs rsi.rolling(lookback).max() price_lows close.rolling(lookback).min() rsi_lows rsi.rolling(lookback).min() # 检测背离 top_divergence (close price_highs) (rsi rsi_highs) bottom_divergence (close price_lows) (rsi rsi_lows) return pd.DataFrame({ top_divergence: top_divergence, bottom_divergence: bottom_divergence }).dropna()5.3 OBV指标实现能量潮指标通过累积成交量反映资金流向def calculate_obv(data): close data[收盘价] volume data[成交量] obv pd.Series(0, indexclose.index) for i in range(1, len(close)): if close.iloc[i] close.iloc[i-1]: obv.iloc[i] obv.iloc[i-1] volume.iloc[i] elif close.iloc[i] close.iloc[i-1]: obv.iloc[i] obv.iloc[i-1] - volume.iloc[i] else: obv.iloc[i] obv.iloc[i-1] # 平滑处理 obv_smoothed obv.ewm(span20).mean() return pd.DataFrame({ OBV_raw: obv, OBV_smooth: obv_smoothed }).dropna() obv_results calculate_obv(cleaned_data)5.4 OBV突破策略def obv_breakout_signal(obv_df, window20): obv obv_df[OBV_smooth] # 计算通道 upper_band obv.rolling(window).mean() obv.rolling(window).std() lower_band obv.rolling(window).mean() - obv.rolling(window).std() # 生成信号 signals pd.DataFrame(indexobv.index) signals[upper_break] obv upper_band signals[lower_break] obv lower_band return signals.dropna()6. 技术指标可视化与分析将计算出的指标可视化能更直观地理解市场行为。我们使用Matplotlib创建专业级图表。6.1 多指标综合图表import matplotlib.pyplot as plt from matplotlib.gridspec import GridSpec def plot_technical_analysis(data, ma_df, macd_df, kdj_df, rsi_df): plt.figure(figsize(16, 12)) gs GridSpec(4, 1, height_ratios[3, 1, 1, 1]) # 价格和MA ax1 plt.subplot(gs[0]) ax1.plot(data[收盘价], labelClose, colorblack, linewidth1) for col in ma_df.columns: ax1.plot(ma_df[col], labelcol, linestyle--) ax1.set_title(Price with Moving Averages) ax1.legend() # MACD ax2 plt.subplot(gs[1]) ax2.plot(macd_df[DIF], labelDIF, colorblue) ax2.plot(macd_df[DEA], labelDEA, colororange) ax2.bar(macd_df.index, macd_df[MACD], colornp.where(macd_df[MACD]0, green,red), labelMACD) ax2.set_title(MACD) ax2.legend() # KDJ ax3 plt.subplot(gs[2]) ax3.plot(kdj_df[K], labelK, colorblue) ax3.plot(kdj_df[D], labelD, colororange) ax3.plot(kdj_df[J], labelJ, colorpurple) ax3.axhline(80, linestyle--, colorred) ax3.axhline(20, linestyle--, colorgreen) ax3.set_title(KDJ) ax3.legend() # RSI ax4 plt.subplot(gs[3]) for col in rsi_df.columns: ax4.plot(rsi_df[col], labelcol) ax4.axhline(70, linestyle--, colorred) ax4.axhline(30, linestyle--, colorgreen) ax4.set_title(RSI) ax4.legend() plt.tight_layout() plt.show()6.2 交互式可视化对于更高级的分析可以使用Plotly创建交互式图表import plotly.graph_objects as go from plotly.subplots import make_subplots def interactive_ta_plot(data, indicators): fig make_subplots(rows4, cols1, shared_xaxesTrue, vertical_spacing0.05, row_heights[0.5, 0.2, 0.2, 0.2]) # 价格图表 fig.add_trace(go.Candlestick(xdata.index, opendata[开盘价], highdata[最高价], lowdata[最低价], closedata[收盘价], nameK线), row1, col1) # 添加MA线 for col in indicators[ma].columns: fig.add_trace(go.Scatter(xindicators[ma].index, yindicators[ma][col], namecol, linedict(width1)), row1, col1) # MACD图表 fig.add_trace(go.Bar(xmacd_df.index, ymacd_df[MACD], nameMACD柱, marker_colornp.where(macd_df[MACD]0, green, red)), row2, col1) fig.add_trace(go.Scatter(xmacd_df.index, ymacd_df[DIF], nameDIF, linedict(colorblue, width1)), row2, col1) fig.add_trace(go.Scatter(xmacd_df.index, ymacd_df[DEA], nameDEA, linedict(colororange, width1)), row2, col1) # KDJ图表 for col in [K, D, J]: fig.add_trace(go.Scatter(xkdj_df.index, ykdj_df[col], namecol, linedict(width1)), row3, col1) # RSI图表 for col in rsi_df.columns: fig.add_trace(go.Scatter(xrsi_df.index, yrsi_df[col], namecol, linedict(width1)), row4, col1) fig.update_layout(height900, title_text技术指标综合分析) fig.show()7. 策略回测与绩效评估计算出技术指标后我们需要验证其实际效果。下面实现一个简单的回测框架。7.1 基础回测框架def backtest_strategy(data, signals, initial_capital100000): positions pd.DataFrame(indexsignals.index) positions[signal] signals[position] # 计算持仓 positions[holdings] initial_capital * positions[signal] # 计算现金和总资产 portfolio pd.DataFrame(indexpositions.index) portfolio[holdings] positions[holdings] portfolio[cash] initial_capital - (positions[signal] * data[收盘价]).cumsum() portfolio[total] portfolio[cash] portfolio[holdings] portfolio[returns] portfolio[total].pct_change() return portfolio7.2 绩效指标计算def calculate_performance(portfolio): returns portfolio[returns] performance { total_return: portfolio[total].iloc[-1] / portfolio[total].iloc[0] - 1, annualized_return: (1 returns.mean()) ** 252 - 1, volatility: returns.std() * np.sqrt(252), sharpe_ratio: returns.mean() / returns.std() * np.sqrt(252), max_drawdown: (portfolio[total].cummax() - portfolio[total]).max() / portfolio[total].cummax().max(), win_rate: len(returns[returns 0]) / len(returns[returns ! 0]) } return pd.DataFrame.from_dict(performance, orientindex, columns[Value])7.3 多策略比较我们可以比较不同技术指标组合的策略表现def compare_strategies(data, strategies): results {} for name, strategy in strategies.items(): signals strategy.generate_signals(data) portfolio backtest_strategy(data, signals) performance calculate_performance(portfolio) results[name] performance return pd.concat(results, axis1)示例策略类结构class MAStrategy: def __init__(self, short_window5, long_window20): self.short_window short_window self.long_window long_window def generate_signals(self, data): signals pd.DataFrame(indexdata.index) signals[short_ma] data[收盘价].rolling(self.short_window).mean() signals[long_ma] data[收盘价].rolling(self.long_window).mean() signals[position] np.where(signals[short_ma] signals[long_ma], 1, -1) return signals8. 进阶应用与实战技巧在实际应用中单纯使用技术指标往往不够需要结合其他技巧提升策略效果。8.1 多时间框架分析结合不同时间周期的指标信号def multi_timeframe_analysis(daily_data, weekly_data): # 日线信号 daily_signals generate_daily_signals(daily_data) # 周线信号 weekly_signals generate_weekly_signals(weekly_data) # 综合信号 - 需要周线和日线信号一致时才交易 combined daily_signals.join(weekly_signals, howleft, rsuffix_weekly) combined[final_signal] np.where( combined[daily_signal] combined[weekly_signal], combined[daily_signal], 0 # 方向不一致时不交易 ) return combined8.2 动态仓位管理根据指标强度调整仓位大小def dynamic_position_sizing(rsi_values, max_position1): # RSI在30-70之间线性调整仓位 position np.zeros_like(rsi_values) mask (rsi_values 30) (rsi_values 70) position[mask] (70 - rsi_values[mask]) / 40 * max_position position[rsi_values 30] max_position position[rsi_values 70] 0 return position8.3 指标组合策略将多个指标信号组合起来def combined_strategy(data, ma_params, macd_params, rsi_params): # 计算各指标 ma calculate_ma(data, **ma_params) macd calculate_macd(data, **macd_params) rsi calculate_rsi(data, **rsi_params) # 生成各指标信号 ma_signal ma_cross_signal(ma) macd_signal macd_divergence_signal(macd) rsi_signal rsi_overbought_oversold(rsi) # 综合信号 - 至少两个指标同向才交易 signals pd.DataFrame(indexdata.index) signals[ma_dir] ma_signal[trade_signal] signals[macd_dir] macd_signal[divergence_signal] signals[rsi_dir] rsi_signal[trade_signal] signals[final_signal] signals.apply( lambda x: x.mode()[0] if len(set(x)) 3 else 0, axis1 ) return signals8.4 实时指标计算对于实时交易系统我们需要优化计算效率class StreamingTA: def __init__(self, window_sizes): self.buffers { price: deque(maxlenmax(window_sizes)), volume: deque(maxlenmax(window_sizes)) } self.window_sizes window_sizes def update(self, new_price, new_volume): # 更新数据缓冲区 self.buffers[price].append(new_price) self.buffers[volume].append(new_volume) # 计算指标 indicators {} if len(self.buffers[price]) min(self.window_sizes): price_array np.array(self.buffers[price]) # 计算移动平均 for window in [w for w in self.window_sizes if w len(price_array)]: indicators[fma_{window}] np.mean(price_array[-window:]) # 计算RSI if len(price_array) 14: # RSI常用周期 deltas np.diff(price_array) gains deltas[deltas 0] losses -deltas[deltas 0] avg_gain np.mean(gains[-14:]) if len(gains) 0 else 0 avg_loss np.mean(losses[-14:]) if len(losses) 0 else 1 rs avg_gain / avg_loss indicators[rsi_14] 100 - (100 / (1 rs)) return indicators
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2458945.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!