文章目录
- 1. 考夫曼自适应移动平均 (KAMA)算法推导及Python实现
- 2. 对 (KAMA)算法参数进行优化及实现
自适应移动平均(Adaptive Moving Average, AMA)由Perry Kaufman在其著作《Trading Systems and Methods》中提出,它通过动态调整平滑系数来适应不同的市场状况:
在趋势明显时,AMA更像快速移动平均线,紧跟价格变化
在震荡市场时,AMA更像慢速移动平均线,过滤噪音。
是一种动态调整平滑系数的移动平均方法,适用于非线性、非平稳的【时间序列数据预测场景】(如负荷受温度、节假日等因素影响),其核心是通过波动率或趋势指标自动调整权重,提高预测灵敏度。
在接触过程中记录一下。
1. 考夫曼自适应移动平均 (KAMA)算法推导及Python实现
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
def ama(close_prices, n=10, fast_sc=2, slow_sc=30):
"""
计算自适应移动平均线(AMA)
参数:
close_prices: 收盘价序列 (list, np.array或pd.Series)
n: 计算效率比率(ER)的周期 (默认10)
fast_sc: 快速平滑常数周期 (默认2)
slow_sc: 慢速平滑常数周期 (默认30)
返回:
AMA值 (np.array)
"""
close = np.array(close_prices)
length = len(close)
ama_values = np.zeros(length)
sc = np.zeros(length)
er = np.zeros(length)
fast_alpha = 2 / (fast_sc + 1)
slow_alpha = 2 / (slow_sc + 1)
# 初始AMA值为第一个收盘价
ama_values[0] = close[0]
for i in range(1, length):
# 计算方向变化和波动总和
direction = abs(close[i] - close[i - n if i >= n else 0])
volatility = sum(abs(close[j] - close[j-1]) for j in range(max(1, i-n+1), i+1))
# 计算效率比率(ER)
er[i] = direction / volatility if volatility != 0 else 0
# 计算平滑系数(SC)
temp_sc = er[i] * (fast_alpha - slow_alpha) + slow_alpha
sc[i] = temp_sc * temp_sc # 平方使变化更平滑
# 计算AMA
ama_values[i] = ama_values[i-1] + sc[i] * (close[i] - ama_values[i-1])
return ama_values
# 示例使用
if __name__ == "__main__":
# 生成示例数据(正弦波+随机噪声)
np.random.seed(42)
x = np.linspace(0, 10, 200)
prices = np.sin(x) * 10 + np.random.normal(0, 1, 200) + 20
# 计算AMA
ama_values = ama(prices, n=2, fast_sc=2, slow_sc=30)
# 绘图
plt.figure(figsize=(12, 6))
plt.plot(prices, label='Data', alpha=0.5)
plt.plot(ama_values, label='AMA(2,2,30)', color='red', linewidth=2)
plt.title("Adaptive Moving Average (AMA)")
plt.legend()
plt.grid()
plt.show()
2. 对 (KAMA)算法参数进行优化及实现
## 基于上面方法的对参数进行优化
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error
def ama(close_prices, n=10, fast_sc=2, slow_sc=30):
"""
计算自适应移动平均线(AMA)
参数:
close_prices: 收盘价序列 (list, np.array或pd.Series)
n: 计算效率比率(ER)的周期 (默认10)
fast_sc: 快速平滑常数周期 (默认2)
slow_sc: 慢速平滑常数周期 (默认30)
返回:
AMA值 (np.array)
"""
close = np.array(close_prices)
length = len(close)
ama_values = np.zeros(length)
sc = np.zeros(length)
er = np.zeros(length)
fast_alpha = 2 / (fast_sc + 1)
slow_alpha = 2 / (slow_sc + 1)
# 初始AMA值为第一个收盘价
ama_values[0] = close[0]
for i in range(1, length):
# 计算方向变化和波动总和
direction = abs(close[i] - close[i - n if i >= n else 0])
volatility = sum(abs(close[j] - close[j-1]) for j in range(max(1, i-n+1), i+1))
# 计算效率比率(ER)
er[i] = direction / volatility if volatility != 0 else 0
# 计算平滑系数(SC)
temp_sc = er[i] * (fast_alpha - slow_alpha) + slow_alpha
sc[i] = temp_sc * temp_sc # 平方使变化更平滑
# 计算AMA
ama_values[i] = ama_values[i-1] + sc[i] * (close[i] - ama_values[i-1])
return ama_values
def optimize_ama(close_prices, n_range, fast_range, slow_range):
best_params = {}
min_mse = float('inf')
for n in n_range:
for fast in fast_range:
for slow in slow_range:
ama_vals = ama(close_prices, n, fast, slow)
mse = mean_squared_error(close_prices[n:], ama_vals[n:])
if mse < min_mse:
min_mse = mse
best_params = {'n': n, 'fast': fast, 'slow': slow}
return best_params
params = optimize_ama(prices,
n_range=range(2, 21, 3),
fast_range=range(2, 6),
slow_range=range(10, 31, 3))
print("最佳参数:", params)
# 示例使用
if __name__ == "__main__":
# 生成示例数据(正弦波+随机噪声)
np.random.seed(42)
x = np.linspace(0, 10, 200)
prices = np.sin(x) * 10 + np.random.normal(0, 1, 200) + 20
# 计算AMA
n = params['n']
fast_sc = params['fast']
slow_sc = params['slow']
ama_values = ama(prices, n= n, fast_sc =fast_sc , slow_sc=slow_sc)
# 绘图
plt.figure(figsize=(12, 6))
plt.plot(prices, label='Data', alpha=0.5)
plt.plot(ama_values, label=f'AMA({n},{fast_sc},{slow_sc})', color='red', linewidth=2)
plt.title("Adaptive Moving Average (AMA)")
plt.legend()
plt.grid()
plt.show()
## 方法2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import ParameterGrid
from sklearn.metrics import mean_absolute_error, mean_squared_error
import logging
import warnings
warnings.filterwarnings("ignore")
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
# 配置日志
logging.getLogger('prophet').setLevel(logging.WARNING)
logging.getLogger('cmdstanpy').setLevel(logging.WARNING)
# =================================== 1. 模拟数据生成(若已有真实数据,可跳过此步) ====================
def data_row_column(hourly_loads): #将数据1*96转换成96*1
# 计算时间戳(每15分钟一个点)
hourly_loads["point_idx"] = hourly_loads["point"].str.extract("(\d+)").astype(int)
hourly_loads["hour"] = (hourly_loads["point_idx"] - 1) // 4 # 计算小时(0-23)
hourly_loads["minute"] = ((hourly_loads["point_idx"] - 1) % 4) * 15 # 计算分钟(0, 15, 30, 45)
hourly_loads["timestamp"] = hourly_loads["amt_ym"] + pd.to_timedelta(hourly_loads["hour"], unit="h") + pd.to_timedelta(hourly_loads["minute"], unit="m")
# 按时间戳排序
hourly_loads = hourly_loads.sort_values("timestamp")
ts_df = hourly_loads[["timestamp", "load"]].set_index("timestamp")
return ts_df
RAWcus_df = pd.read_excel("D:\\data_example.xlsx",engine='openpyxl')
cus_df = RAWcus_df.copy()
cus_df['年月'] = cus_df['amt_ym'].dt.strftime('%Y-%m')
load_series1 = data_row_column(cus_df).squeeze()
train_data = load_series1[:-96] # 训练集(排除最后96点)
test_data = load_series1[-96:] # 测试集(最后96点)
# =================================== 2. AMA模型实现 ====================
def ama_forecast(data, forecast_steps=96, window=24*4, alpha_min=0.05, alpha_max=0.95, initial_window=100):
"""
AMA模型预测(带动态平滑系数调整)
:param data: 历史负荷序列(Series)
:param forecast_steps: 预测步长
:param window: 计算效率比率(ER)的窗口大小 =n
:param alpha_min: 平滑系数下限 =s
:param alpha_max: 平滑系数上限 =f
:param initial_window: 初始化AMA的窗口( warm-up 阶段)
:return: 预测结果数组(numpy.ndarray)
"""
history = data.values.copy()
n = len(history)
ama = np.zeros(n + forecast_steps)
ama[:n] = history
# Warm-up阶段:用简单移动平均初始化AMA
if initial_window > 0:
ama[:initial_window] = np.convolve(history[:initial_window], np.ones(5)/5, mode='same')
for t in range(n, n + forecast_steps):
# 计算效率比率(ER)
start_idx = max(0, t - window)
direction = abs(ama[t-1] - ama[start_idx])
volatility = sum(abs(ama[i] - ama[i-1]) for i in range(start_idx + 1, t))
er = direction / (volatility + 1e-6) # 避免除零
# 动态平滑系数
alpha = er * (alpha_max - alpha_min) + alpha_min
alpha = np.clip(alpha, alpha_min, alpha_max) # 限制在[min, max]范围内
# 更新AMA(假设未来变化与最近变化相同)
recent_change = ama[t-1] - ama[t-2] if t >= 2 else 0
ama[t] = ama[t-1] + alpha * recent_change
return ama[n:]
# =================================== 3. 参数调优(网格搜索) ====================
def grid_search_ama(train_data, test_data, param_grid):
"""网格搜索最优AMA参数"""
best_params = None
best_mae = float('inf')
results = []
for params in ParameterGrid(param_grid):
# 滚动预测验证
forecasts = []
for i in range(len(test_data)):
history = pd.concat([train_data, test_data[:i]])
pred = ama_forecast(history, forecast_steps=1, **params)
forecasts.append(pred[0])
# 计算指标
mae = mean_absolute_error(test_data, forecasts)
results.append({'params': params, 'MAE': mae})
if mae < best_mae:
best_mae = mae
best_params = params
return best_params, best_mae, pd.DataFrame(results)
# 定义参数搜索空间
param_grid = {
'window': [10,24*1, 24*2, 24*4,24*10,24*15], # 等效窗口大小 n: 1天、2天、4天窗口
'alpha_min': [0.01, 0.05, 0.1,0.5], # s
'alpha_max': [0.8, 0.9, 0.95], # f
'initial_window': [50, 100, 200]
}
# 执行网格搜索(耗时操作,实际使用时建议缩小参数范围)
best_params, best_mae, search_results = grid_search_ama(train_data, test_data, param_grid)
print(f"最佳参数: {best_params}, 最优MAE: {best_mae:.2f}")
# =================================== 4. 使用最优参数预测 ====================
final_forecast = ama_forecast(train_data, forecast_steps=96, **best_params)
# =================================== 5. 评估与可视化 ====================
# 计算指标
mae = mean_absolute_error(test_data, final_forecast)
rmse = np.sqrt(mean_squared_error(test_data, final_forecast))
print(f"测试集 MAE: {mae:.2f}, RMSE: {rmse:.2f}")
# 绘图
plt.figure(figsize=(14, 6))
plt.plot(train_data.index[-200:], train_data.values[-200:], label="历史负荷", color="blue", alpha=0.7)
plt.plot(test_data.index, test_data.values, label="真实值", color="green", marker='o', markersize=3)
plt.plot(test_data.index, final_forecast, label="AMA预测", linestyle="--", color="red", linewidth=2)
plt.fill_between(test_data.index,
final_forecast - rmse,
final_forecast + rmse,
color="red", alpha=0.1, label="±RMSE误差带")
plt.title(f"负荷预测 (AMA模型)\n最优参数: {best_params}, MAE: {mae:.2f}, RMSE: {rmse:.2f}")
plt.xlabel("时间")
plt.ylabel("负荷")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
# 保存预测结果
forecast_df = pd.DataFrame({
"时间": test_data.index,
"真实值": test_data.values,
"预测值": final_forecast,
"绝对误差": np.abs(test_data.values - final_forecast),
"误差百分比":np.abs(test_data.values - final_forecast)/test_data.values*100
})
# forecast_df.to_csv("ama_load_forecast_results.csv", index=False)
预测值与真实对比