目录
前置:
准备
代码:数据库交互部分
代码:生成前复权 日、周、月、季、年数据
前置:
1 未复权日数据获取,请查看 https://blog.csdn.net/m0_37967652/article/details/146435589 数据库使用PostgreSQL。更新日数据可以查看 https://blog.csdn.net/m0_37967652/article/details/146988667 将日数据更新到最新
2 权息数据,下载 t_exdividend.sql 文件
通过网盘分享的文件:t_exdividend.sql
链接: https://pan.baidu.com/s/17B1EiHcEYByfWSICqX1KNQ?pwd=4abg 提取码: 4abg
在命令行 postgresql安装目录的bin目录下执行
psql -U postgres -h 127.0.0.1 -p 5432 -d db_stock -f E:/temp005/t_exdividend.sql
E:/temp005/t_exdividend.sql 改成自己的文件目录
准备
1 从通达信中获取当前A股股票代码,存储到txt文件,一行一个股票代码
2 准备一个空目录,创建 day month quarter week year 目录
3 安装包
pip install pandas
pip install psycopg2
代码:数据库交互部分
这部分代码存储到 utils包目录下的 postgresql_utils01.py文件中
import psycopg2
import pandas as pd
def connect_db():
try:
conn = psycopg2.connect(database='db_stock',user='postgres',password='',host='127.0.0.1',port=5432)
except Exception as e:
print(f'connection failed。{e}')
else:
return conn
pass
def query_multi_stock_daily(ticker_list:list)->list:
ticker_list_str = '\',\''.join(ticker_list)
ticker_list_str = '\''+ticker_list_str+'\''
sql_str = f"select ticker,tradeDate,openPrice,highestPrice,lowestPrice,closePrice,turnoverVol,turnoverValue,dealAmount,turnoverRate,negMarketValue,marketValue,chgPct,PE,PE1,PB,isOpen,vwap from t_stock_daily where ticker in ({ticker_list_str});"
conn = connect_db()
cur = conn.cursor()
cur.execute(sql_str)
res = cur.fetchall()
cur.close()
conn.close()
return res
def query_multi_exdiv(ticker_list:list)->list:
ticker_list_str = '\',\''.join(ticker_list)
ticker_list_str = '\'' + ticker_list_str + '\''
sql_str = f"select ticker,exDate,perShareTransRadio,perCashDiv,allotmentRatio,allotmentPrice from t_exdividend where ticker in ({ticker_list_str});"
conn = connect_db()
cur = conn.cursor()
cur.execute(sql_str)
res = cur.fetchall()
cur.close()
conn.close()
return res
代码:生成前复权 日、周、月、季、年数据
from concurrent.futures import ThreadPoolExecutor
import os
import pandas as pd
from utils import postgresql_utils01
'''
股票日数据使用
'''
def output_daiy_caculate(thread_num:int,stock_ticker_list:list):
pre_dir =r'E:/temp006/'
# 每10个处理下
print(f'thread {thread_num}, {len(stock_ticker_list)}')
try:
interval = len(stock_ticker_list) // 10
for i in range(0, interval + 1):
if (i + 1) * 10 >= len(stock_ticker_list):
node_ticker_list = stock_ticker_list[i * 10:]
else:
node_ticker_list = stock_ticker_list[i * 10:(i + 1) * 10]
daily_res = postgresql_utils01.query_multi_stock_daily(node_ticker_list)
exdiv_res = postgresql_utils01.query_multi_exdiv(node_ticker_list)
df_d_dict = {}
df_ex_dict = {}
for one in daily_res:
ticker = one[0]
df = pd.DataFrame(data={
'tradeDate': one[1],
'openPrice': one[2],
'highestPrice': one[3],
'lowestPrice': one[4],
'closePrice': one[5],
'turnoverVol': one[6],
'turnoverValue': one[7],
'dealAmount': one[8],
'turnoverRate': one[9],
'negMarketValue': one[10],
'marketValue': one[11],
'chgPct': one[12],
'PE': one[13],
'PE1': one[14],
'PB': one[15],
'isOpen': one[16],
'vwap': one[17]
})
df_d_dict[ticker] = df
pass
for one in exdiv_res:
ticker = one[0]
df = pd.DataFrame(data={
'exDate': one[1],
'perShareTransRadio': one[2],
'perCashDiv': one[3],
'allotmentRatio': one[4],
'allotmentPrice': one[5]
})
df_ex_dict[ticker] = df
pass
fin_df_dict = {}
for ticker, daily in df_d_dict.items():
daily = daily.loc[daily['isOpen'] == 1].copy()
daily['o_date'] = pd.to_datetime(daily['tradeDate'])
daily.sort_values(by='o_date', ascending=True, inplace=True)
if ticker not in df_ex_dict:
fin_df_dict[ticker] = daily
continue
ex = df_ex_dict[ticker]
ex['a'] = 1 / (1 + ex['perShareTransRadio'] + ex['allotmentRatio'])
ex['b'] = (ex['allotmentRatio'] * ex['allotmentPrice'] - ex['perCashDiv']) / (
1 + ex['perShareTransRadio'] + ex['allotmentRatio'])
ex['o_date'] = pd.to_datetime(ex['exDate'])
ex.sort_values(by='o_date', ascending=True, inplace=True)
for i, row in ex.iterrows():
exDate = row['exDate']
daily.loc[daily['o_date'] < exDate, 'closePrice'] = daily['closePrice'] * row['a'] + row['b']
daily.loc[daily['o_date'] < exDate, 'openPrice'] = daily['openPrice'] * row['a'] + row['b']
daily.loc[daily['o_date'] < exDate, 'highestPrice'] = daily['highestPrice'] * row['a'] + row['b']
daily.loc[daily['o_date'] < exDate, 'lowestPrice'] = daily['lowestPrice'] * row['a'] + row['b']
fin_df_dict[ticker] = daily
pass
other_cols = ['tradeDate', 'openPrice', 'highestPrice', 'lowestPrice', 'closePrice', 'turnoverVol',
'turnoverValue', 'dealAmount', 'turnoverRate', 'negMarketValue', 'marketValue']
for ticker, df in fin_df_dict.items():
d_path = pre_dir + 'day' + os.path.sep + ticker + '.csv'
df.to_csv(d_path, encoding='utf-8', index=False)
# 开始计算并导出week month quarter year 数据
week_group = df.resample('W-FRI', on='o_date')
month_group = df.resample('ME', on='o_date')
quarter_group = df.resample('QE', on='o_date')
year_group = df.resample('YE', on='o_date')
w_df = week_group.last()
w_df['openPrice'] = week_group.first()['openPrice']
w_df['lowestPrice'] = week_group.min()['lowestPrice']
w_df['highestPrice'] = week_group.max()['highestPrice']
w_df['turnoverVol'] = week_group.sum()['turnoverVol']
w_df['turnoverValue'] = week_group.sum()['turnoverValue']
w_df['dealAmount'] = week_group.sum()['dealAmount']
w_df['turnoverRate'] = week_group.sum()['turnoverRate']
m_df = month_group.last()
m_df['openPrice'] = month_group.first()['openPrice']
m_df['lowestPrice'] = month_group.min()['lowestPrice']
m_df['highestPrice'] = month_group.max()['highestPrice']
m_df['turnoverVol'] = month_group.sum()['turnoverVol']
m_df['turnoverValue'] = month_group.sum()['turnoverValue']
m_df['dealAmount'] = month_group.sum()['dealAmount']
m_df['turnoverRate'] = month_group.sum()['turnoverRate']
q_df = quarter_group.last()
q_df['openPrice'] = quarter_group.first()['openPrice']
q_df['lowestPrice'] = quarter_group.min()['lowestPrice']
q_df['highestPrice'] = quarter_group.max()['highestPrice']
q_df['turnoverVol'] = quarter_group.sum()['turnoverVol']
q_df['turnoverValue'] = quarter_group.sum()['turnoverValue']
q_df['dealAmount'] = quarter_group.sum()['dealAmount']
q_df['turnoverRate'] = quarter_group.sum()['turnoverRate']
y_df = year_group.last()
y_df['openPrice'] = year_group.first()['openPrice']
y_df['lowestPrice'] = year_group.min()['lowestPrice']
y_df['highestPrice'] = year_group.max()['highestPrice']
y_df['turnoverVol'] = year_group.sum()['turnoverVol']
y_df['turnoverValue'] = year_group.sum()['turnoverValue']
y_df['dealAmount'] = year_group.sum()['dealAmount']
y_df['turnoverRate'] = year_group.sum()['turnoverRate']
w_df = w_df.loc[:, other_cols].copy()
m_df = m_df.loc[:, other_cols].copy()
q_df = q_df.loc[:, other_cols].copy()
y_df = y_df.loc[:, other_cols].copy()
w_df.to_csv(pre_dir + 'week' + os.path.sep + ticker + '.csv', encoding='utf-8')
m_df.to_csv(pre_dir + 'month' + os.path.sep + ticker + '.csv', encoding='utf-8')
q_df.to_csv(pre_dir + 'quarter' + os.path.sep + ticker + '.csv', encoding='utf-8')
y_df.to_csv(pre_dir + 'year' + os.path.sep + ticker + '.csv', encoding='utf-8')
pass
pass
except Exception as e:
print(f"{thread_num} error {e}")
finally:
print(f"{thread_num} finished")
print(f'{thread_num} ending...')
pass
def start_execute():
with open('./stock_ticker.txt',mode='r',encoding='utf-8') as fr:
contents = fr.read()
stock_ticker_list = contents.split('\n')
print(len(stock_ticker_list))
thread_count = 5
interval = len(stock_ticker_list)//thread_count
if interval == 0:
thread_count = 1
params_list = []
thread_num_list = []
for i in range(0,thread_count):
if i == thread_count-1:
pre_list = stock_ticker_list[i*interval:]
else:
pre_list = stock_ticker_list[i*interval:i*interval+interval]
thread_num_list.append(i)
params_list.append(pre_list)
with ThreadPoolExecutor() as executor:
executor.map(output_daiy_caculate, thread_num_list,params_list)
print('线程池任务分配完毕')
pass
if __name__ == '__main__':
start_execute()
pass