补充停牌的日K数据

补充停牌的日K数据

问题

从TuShare获取的数据,停牌日是没有数据的,这将会在回测时,不能直接参与账户的净值计算,导致账户的净值以及收益计算不准确。

停盘

股票由于某种消息或进行某种活动引起股价的连续上涨或下跌,由证券交易所暂停其在股票市场上进行交易。待情况澄清或企业恢复正常后,再复牌在交易所挂牌交易。

解决方法

1、 增加is_trading字段,用于区分停牌日还是交易日
2、补充停牌日的日k数据,更加当前数据现状,填充一个交易日的close、volume、high、low为停牌前最后一个交易日的close、volume为0,is_trading为false

填充指定时间段的is_trading

流程图

补充停牌的日K数据

代码实现

1、获取所有交易日列表
由于指数比如上证指数(000001)是不会停牌的,因此可通过指数来获得交易日期

def get_trading_date(begin_date= None,end_date=None):
    """
    获取指定日期范围的按照正序排列的交易日列表
    如果没有指定日期范围,则获取从当期日期向前365个自然日内的所有交易日

    :param begin_date: 开始日期
    :param end_date: 结束日期
    :return: 交易日期列表
    """
    #当前日期
    now = datetime.now()
    #开始日期,默认当前日期向前365个自然日
    if begin_date is None:
        #当前日期减去365天
        one_year_ago = now - timedelta(days=365)
        #转换成str类型
        begin_date = one_year_ago.strftime("%Y-%m-%d")
    #结束日期默认为今天
    if end_date is None:
        end_date = now.strftime("%Y-%m-%d")
    
    #用上证指数000001作为查询条件,因为指数是不会停盘的,所以可以查询到所有的交易日期
    daily_cursor = DB_CONN.daily.find(
        {"code":"000001",'date':{'$gte':begin_date,'$lte':end_date},'index':True},
        sort=[('date',ASCENDING)],
        projection={'date':True,'_id':False}
    )
    #转换日期列表
    dates = [x['date'] for x in daily_cursor]
    return dates

2、填充某个日行情数据的is_trading,并更新数据库(同样采用bulk_write将更新数据写入数据集)

def fill_is_trading_between(begin_date=None,end_date=None):
    """
    填充指定时间段内的is_trading字段
    :param begin_date :开始日期
    :param end_date :结束日期
    """
    #获取指定日期范围的左右交易日子列表,按日期正序排列
    all_dates = get_trading_date(begin_date,end_date)
    #循环填充所有交易日的is_trading字段
    for date in all_dates:
        #填充daily数据集
        fill_single_date_is_trading(date,'daily')
        #填充daily_hfq数据集
        fill_single_date_is_trading(date,'daily_hfq')

def fill_single_date_is_trading(date,collection_name):
    """
    填充某一个日行情的数据集的is_trading
    :param date: 日期
    :param collection_name: 集合名称
    """
    print('填充字段,字段名:is_trading,日期:%s,数据集:%s' %(date,collection_name))
    daily_cursor = DB_CONN[collection_name].find(
        {'date':date},
        projection={'code':True,'volume':True,'_id':False},
        batch_size = 1000
        })

    update_requests = []
    for daily in daily_cursor:
        #当日成交量大于0,则为交易状态
        is_trading = daily['volume']>0
        update_requests.append(
            UpdateOne(
                {'code':daily['code'],'date':date},
                {'$set':{'is_trading':is_trading}}
            )
        )
        if len(update_requests):
            update_result = DB_CONN[collection_name].bulk_write(update_requests,ordered=False)
            print("填充字段,字段名:is_trading,日期:%s,数据集:%s,更新:%4d"%
            (date,collection_name,update_result.modified_count),flush=True)

获取股票基本信息

用途

  • 获取每日的股票列表

主要字段

  • 股票代码
  • 股票名称
  • 股本(总股本、流通股本)
  • 上市日期
  • 日期

通过采用TuShare中get_stock_basics接口获取股票的基本信息
get_stock_basics()接口信息:

def get_stock_basics(date=None):
    """
        获取沪深上市公司基本情况
    Parameters
    date:日期YYYY-MM-DD,默认为上一个交易日,目前只能提供2016-08-09之后的历史数据

    Return
    --------
    DataFrame
               code,代码
               name,名称
               industry,细分行业
               area,地区
               pe,市盈率
               outstanding,流通股本
               totals,总股本(万)
               totalAssets,总资产(万)
               liquidAssets,流动资产
               fixedAssets,固定资产
               reserved,公积金
               reservedPerShare,每股公积金
               eps,每股收益
               bvps,每股净资
               pb,市净率
               timeToMarket,上市日期
    """

详细代码实现

from database import DB_CONN
from datetime import datetime,timedelta
import tushare as ts
from pymongo import UpdateOne
from stock_util import get_trading_date

"""
从tushare中获取股票的基础数据,保存在本地MongoDB中
"""
def crawl_basic(begin_date=None,end_date=None):
    """
    抓取指定时间范围的股票基础信息
    :param begin_date:开始时间
    :param end_date:结束时间
    """
    #如果没有指定日期,则默认为前一日
    if begin_date is None :
        begin_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
    if end_date is None:
        end_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")

    #获取指定日期范围的所有交易日列表
    all_dates = get_trading_date(begin_date,end_date)
    #按每个交易日抓取
    for date in all_dates:
        try:
            #抓取当日的基本信息
            crawl_basic_at_date(date)
        except:
            print("抓取股票基本信息出错,日期:%s" %date,flush=True)

def crawl_basic_at_date(date):
    """
    从Tushare抓取指定日期的股票基本信息
    :param date: 日期
    """
    #从tushare获取基本信息,index是股票代码列表
    df_basics = ts.get_stock_basics(date)
    #如果当日没有基本信息,不做操作
    if df_basics is None:
        return
    
    #初始化更新列表
    update_requests=[]
    codes = set(df_basics.index)
    #按照股票代码提取所有数据
    for code in codes:
        #获取一只股票的数据
        doc = df_basics.loc[code]
        try:
            #将上市日期,19971113转换成199-11-13
            time_to_market = datetime.strptime(str(doc['timeToMarket']),"%Y%m%d").strftime('%Y-%m-%d')
            #将总股本和流通股本转为数字
            totals = float(doc['totals'])
            outstanding = float(doc['outstanding'])

            #组合基本信息文档
            doc.update(
                {
                    #股票代码
                    'code':code,
                    #日期
                    'date':date,
                    #上市时间
                    'timeToMarket':time_to_market,
                    #流通股本
                    'outstanding':outstanding,
                    #总股本
                    'totals':totals
                }
            )
            #生成更新请求,按照code、date创建索引
            update_requests.append(
                UpdateOne(
                {'code':code,'date':date},
                {'$set':doc},
                upsert=True
                )
            )
        except:
            print('发生异常,股票代码:%s,日期:%s' %(code,date),flush=True)
            print(doc,flush=True)
        if len(update_requests)>0:
            update_result = DB_CONN['basic'].bulk_write(update_requests,ordered=False)
            print("抓取股票基本信息,日期:%s,插入:%4d条,更新:%4d条" %(date,update_result.upserted_count,update_result.modified_count),flush=True)

执行上述程序后,会将股票基本信息(包括,股票代码,日期,上市时间,流通股本和总股本)保存在MongoDB中名为basic的集合(表)中

填充停牌日的行情数据

补充停牌的日K数据

代码实现:

ef fill_daily_k_at_suspension_days(begin_date=None,end_date=None):
    """
    填充指定日期范围内,股票停牌日的行情数据
    填充时,停牌的开盘价、最高价、最低价和收盘价都为最近一个交易日的收盘价,成交量为0
    is_trading为False
    """
    #当前日期的前一天
    before = datetime.now() - timedelta(days=1)
    #找到据当前最近一个交易日的所有股票的基本信息
    basics =[]
    while 1:
        #转化成str
        last_trading_date = before.strftime("%Y-%m-%d")
        #因为Tushare的基本信息从2016-08-09开始,如果早于这个时间就结束查找
        if last_trading_date < '2016-08-09':
            break
        #找当日的基本信息
        basic_cursor = DB_CONN['basic'].find(
            {'date':last_trading_date},
            #填充时需要用到两个子段:股票代码code和上市日期timeToMarket,上市日期用来判断是否上市
            projection={'code':True,'timeToMarket':True,'_id':False},
            #一次性返回5000条数据,可以降低网络IO开销,提高速度
            batch_size=5000
        )
        #将数据放到basics列表中
        basics = [basic for basic in basic_cursor]
        #如果查到数据,跳出循环
        if len(basics)>0:
            break
        #没有找到数据,则继续向前一天
        before = before-timedelta(days=1)
    
    #获取指定日期范围内所有交易日列表
    all_dates = get_trading_date(begin_date,end_date)

    #填充daily数据集中的停牌日数据
    fill_daily_k_at_suspension_days_at_date_at_one_collections(basics,all_dates,'daily')
    #填充daily_hfq数据集中的停牌日数据
    fill_daily_k_at_suspension_days_at_date_at_one_collections(basics,all_dates,'daily_hfq')

def fill_daily_k_at_suspension_days_at_date_at_one_collections(basics,all_dates,collection):
    """
    更新单个数据集的单个日期的数据
    :param basic:基本信息
    :param all_dates:日期列表
    :param collection:集合名
    """
    code_last_trading_daily_dict = dict()
    for date in all_dates:
        update_requests = []
        last_daily_code_set = set(code_last_trading_daily_dict.keys())
        for basic in basics:
            code = basic['code']
            #如果循环日期小于上市日期
            if date < basic['timeToMarket']:
                print('日期:%s,%s还没上市,上市日期为%s'%(date,code,basic['timeToMarket']),flush=True)
            else:
                #找到当日数据
                daily = DB_CONN[collection].find(
                    {'code':code,'date':date}
                )
                if daily is not None:
                    code_last_trading_daily_dict[code] = daily
                    last_daily_code_set.add(code)
                else:
                    if code in last_daily_code_set:
                        last_trading_daily = code_last_trading_daily_dict[code]
                        suspension_daily_doc = {
                            'code':code,
                            'date':date,
                            'close':last_trading_daily['close'],
                            'open':last_trading_daily['close'],
                            'high':last_trading_daily['close'],
                            'low':last_trading_daily['close'],
                            'volume':0,
                            'is_trading':False
                        }
                        update_requests.append(
                            UpdateOne(
                                {'code':code,'date':date},
                                {'$set':suspension_daily_doc},
                                upsert=True
                            )
                        )
        if len(update_requests)>0:
            update_result = DB_CONN[collection].bulk_write(update_requests,ordered=False)
            print('填充停牌数据,日期:%s,数据集:%s,插入:%4d条,更新:%4d条'%(date,collection,update_result.upserted_count,update_result.modified_count),flush=True)