仓位管理与风险控制

完整的主要包含以下几点:
1、股票池或是期货池,股票池的作用就是一个相对比较优秀的交易标的集合,或是删除比较差的交易交易集合而留下啦的集合
2、择时,何时进何时出,如均线(金叉死叉)、MACD等技术指标
3、仓位管理和风险控制,当择时信号发生,发出买入信号,需要买多少,买多了风险大,买少了收益太少,这就需要进行仓位管理;风险控制一般指止盈止损,尤其是止损

四种止盈方法

  • 目标盈利法
  • 回撤止盈法
  • 大福波动止盈
  • 技术信号止盈

四种止损方法

  • 最大亏损发
  • 波动率倍数止损
  • 波动率标准差止损
  • 技术信号止损

风险参考指标--波动率

  • 标准差
  • 平均真实波幅ATR

ATR:近N天真实波幅TR的算术平均值
TR :以下指标中的最大值

今日最高价减去今日最低价
今日最高价减去昨日收盘价的绝对值
今日最低价减去昨日收盘价的绝对值

举例:ATR的计算

[Day1(高低开收)=(16.3,15.93,15.97,16.23) ]

[Day2(高低开收)=(16.66,16.13,16.17,16.47) ]

仓位管理与风险控制

[Day2 ; TR = Max(0.53,0.43,0.1) = 0.53 ]

[Day2 ; ATR = 观察窗口内每天TR的平均值 ]
目标盈利法和最大亏损法

仓位管理与风险控制

大幅波动止盈、波动率倍数止损

仓位管理与风险控制
仓位管理与风险控制

回撤止盈(浮动/跟踪止盈)

仓位管理与风险控制

技术信号止盈/止损

以双均线交叉信号为例:

仓位管理与风险控制

交易系统中与盈利相关的要素

仓位管理与风险控制

仓位管理

仓位管理与风险控制

不同阶段:建仓、加仓/减仓、清仓
资金分配:头寸规模的确定核分配

确定头寸规模的四种模型

  • 每一固定金额交易一个单位
  • 等价值交易单位
  • 百分比风险模型
  • 百分比波动幅度模型
模型1:每一固定金额交易一个单位

资金池:

仓位管理与风险控制

把资金等分为相同金额的若干份,再出现买入信号的情况下,每份只允许交易一个单位的投资标的(比如1手股票,或是1份期货合约)
举例:
总资金:一百万
平均分为10份,每份金额:10万
交易实例一:

(待买入标的:工商银行qquad 价格:5元 qquad 一个交易单位=100股)
(买入量 = 100股 qquad 买入金额=5 * 100 = 500元)

交易实例二:

(待买入标的:贵州茅台 qquad 价格:750元 qquad 一个交易单位=100股)
(买入量 = 100股 qquad 买入金额=750 * 100 = 7.5万元)

模型2:等价值交易单位

资金池

仓位管理与风险控制

把资金等分为相同金额的若干份,在出现买入信号的情况下,按该金额计算出每份允许交易的次投资标的单位个数(如M手股票,或N份期货合约)
举例:
总资金:一百万
平均分为10份,每份金额:10万
交易实例一:

(待买入标的:工商银行qquad 价格:5元)
(买入量 = 100000/5=20000股 qquad 买入金额=10万元)

交易实例二:

$待买入标的:贵州茅台 qquad 价格:750元 ( )买入量 = 100000/750= 100股 qquad 买入金额=7.5万元$

模型3:百分比风险模型

资金池:

仓位管理与风险控制

根据每次交易允许承担的最大风险占总资金的比例,以及每个投资标的可接受的最大损失(即初始止损额度R),折算出可建立头寸的单位个数
CPR计算公式:

[P(头寸规模) = C(现金) / R(每股风险) ]

举例:
总资金:一百万
总风险:1%
单个交易标的风险:5%
交易实例:

(待买入标的:海康威视qquad 价格:40元)
(买入量 = (100000*1%)/(40 * 5%)=5000股 qquad 买入金额=40* 5000 = 20万元)

百分比波动幅度模型

资金池

仓位管理与风险控制

根据每次交易允许承担的最大风险占总资金的比例,以及每个投资标的在一段时间内的价格波动幅度(即有可能有利或不利的价格变动范围V),折算出可建立头寸的单位个数
举例:
总资金:一百万
总风险:1%
单个交易标的风险:3倍ATR
交易实例:

(待买入标的:海康威视qquad 价格:40元 ATR=2%)
(买入量 = (100000*1%)/(40 * 2% * 3)=4100股 qquad 买入金额=40* 4100 = 16.4万元)

常见的加仓方法

仓位管理与风险控制

交易系统的核心要素

  • 市场:买卖什么
  • 头寸规模:买卖多少
  • 入市:什么时候买进
  • 止损:什么时候放弃一个亏损的头寸
  • 退出:什么时候退出一个盈利的头寸
  • 战术:怎么买卖

交易系统的实现

股票池候选:上证50成分股,不调仓
双均线策略:

  • 日K级别:MA10 vs MA30
  • 开盘前检查信号,金叉买,死叉卖
    头寸规模确定
    *均仓方案(等额资金分配)

借助聚宽平台,对该模型进行编码回测:
代码如下:

# 导入函数库
from jqdata import *
# 均线
MA_WIN_1 = 10
MA_WIN_2 = 30

# 初始化函数,设定基准等等
def initialize(context):
    set_benchmark('000300.XSHG')
    set_option('use_real_price', True)
    # log.set_level('order', 'error')
    
    # 股票类每笔交易时的手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
    set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')

    # 定时运行函数
    run_daily(before_market_open, time='before_open', reference_security='000300.XSHG') 
    run_daily(market_open, time='every_bar', reference_security='000300.XSHG')
    run_daily(after_market_close, time='after_close', reference_security='000300.XSHG')
    # 股票池 - 上证50
    g.stock_pool = get_index_stocks("000016.XSHG", date=context.current_dt)
    g.init_cash = context.portfolio.starting_cash  # 启动资金

# 开盘前运行函数
def before_market_open(context):
    look_ahead_n = max(MA_WIN_1, MA_WIN_2) + 1
    g.up_cross_signaled = set()
    g.down_cross_signaled = set()
    for code in g.stock_pool:
        df = attribute_history(code, look_ahead_n, "1d", ["close"], skip_paused=True)  # 该函数返回结果不包括当天数据
        if len(df) != look_ahead_n:
            continue
        close = df["close"]
        ma_short = close.rolling(MA_WIN_1).mean()  # 短时均线
        ma_long = close.rolling(MA_WIN_2).mean()  # 长时均线
        uc_flags = (ma_short.shift(1) <= ma_long.shift(1)) & (ma_short > ma_long)  # 上穿标志
        dc_flags = (ma_short.shift(1) >= ma_long.shift(1)) & (ma_short < ma_long)  # 下穿标志
        if uc_flags.iloc[-1]:
            g.up_cross_signaled.add(code)
        if dc_flags.iloc[-1]:
            g.down_cross_signaled.add(code)

# 开盘时运行函数
def market_open(context):
    cur_dt = context.current_dt.date()  # 当前日期
    p = context.portfolio  # 资金账户
    current_data = get_current_data()

    each_cash = g.init_cash / len(g.stock_pool)  # 每只股票分配的资金
    
    # 卖出均线死叉信号的持仓股
    for code, pos in p.positions.items():
        if code in g.down_cross_signaled:
            order_target(code, 0)

    # 买入均线金叉信号的持仓股
    for code in g.up_cross_signaled:
        if code not in p.positions:
            if current_data[code].paused:
                continue
            open_price = current_data[code].day_open
            num_to_buy = each_cash / open_price // 100 * 100
            order(code, num_to_buy)

# 收盘后运行函数  
def after_market_close(context):
    p = context.portfolio
    pos_level = p.positions_value / p.total_value
    record(pos_level=pos_level)

回测结果:

仓位管理与风险控制

2、按盈利比例均匀加仓

  • 记录每只持仓股最后一次的买入价
  • 如以当日开盘价相对于前一次买入价的盈利比例超过某个阈值,则等额加一次仓
    注:前提是当日没有任何止损或其他买入信号的发生

代码实现:

# 导入函数库
from jqdata import *
import pandas as pd
# 均线
MA_WIN_1 = 10
MA_WIN_2 = 30

# #加仓判断阈值
INC_POS_PF_RATE = 0.05


# 初始化函数,设定基准等等
def initialize(context):
    # 设定沪深300作为基准
    set_benchmark('000300.XSHG')
    # 开启动态复权模式(真实价格)
    set_option('use_real_price', True)
    # 输出内容到日志 log.info()
    log.info('初始函数开始运行且全局只运行一次')
    # 过滤掉order系列API产生的比error级别低的log
    # log.set_level('order', 'error')

    ### 股票相关设定 ###
    # 股票类每笔交易时的手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
    set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')

    ## 运行函数(reference_security为运行时间的参考标的;传入的标的只做种类区分,因此传入'000300.XSHG'或'510300.XSHG'是一样的)
      # 开盘前运行
    run_daily(before_market_open, time='before_open', reference_security='000300.XSHG')
      # 每分钟运行
    run_daily(market_open, time='every_bar', reference_security='000300.XSHG')
      # 收盘后运行
    run_daily(after_market_close, time='after_close', reference_security='000300.XSHG')
    #股票池-上证50
    g.stock_pool = get_index_stocks('000016.XSHG',date=context.current_dt)
    g.init_cash = context.portfolio.starting_cash#启动资金
    g.last_entry_prices = {code:None for code in g.stock_pool}
    
## 开盘前运行函数
def before_market_open(context):
    look_ahead_n = max(MA_WIN_1,MA_WIN_2)+1
    g.up_cross_signaled = set()
    g.down_cross_signaled = set()
    for code in g.stock_pool:
        df = attribute_history(code,look_ahead_n,'1d',fields=['close'],skip_paused=True)
        if len(df) != look_ahead_n:
            continue
        close = df['close']
        ma_short = close.rolling(MA_WIN_1).mean()#短时均线
        ma_long = close.rolling(MA_WIN_2).mean()#长时均线
        uc_flags = (ma_short.shift(1) <= ma_long.shift(1)) & (ma_short > ma_long)#上穿标志
        dc_flags = (ma_short.shift(1) >= ma_long.shift(1)) & (ma_short < ma_long)#下穿标志
        if uc_flags.iloc[-1]:
            g.up_cross_signaled.add(code)
        if dc_flags.iloc[-1]:
            g.down_cross_signaled.add(code)
        print(g.up_cross_signaled)  

## 开盘时运行函数
def market_open(context):
    log.info('函数运行时间(market_open):'+str(context.current_dt.time()))
    p = context.portfolio#资金账户
    current_data = get_current_data()
    
    #每只股票分配的资金
    each_cash = g.init_cash/len(g.stock_pool)
    #卖出均线、死叉信号的持仓股
    for code ,pos in p.positions.items():
        if code in g.down_cross_signaled:
            order_target(code,0)
            g.last_entry_prices[code]= None#更完美的实现还要判断成交状态,后面同理
            
    #买入均线金叉信号的持仓股
    for code in g.up_cross_signaled:
        if code not in p.positions:
            if current_data[code].paused:
                continue
            open_price = current_data[code].day_open
            num_to_buy = each_cash / open_price // 100 *100
            order(code,num_to_buy)
            g.last_entry_prices[code] = open_price
    #检查有无符合加仓条件的持仓股
    for code,pos  in p.positions.items():
        if current_data[code].paused:
            continue
        if pos.today_amount==0 and pos.closeable_amount > 0:
            open_price = current_data[code].day_open
            last_entry = g.last_entry_prices[code]
            if (open_price - last_entry) / last_entry >= INC_POS_PF_RATE:
                order_value(code, each_cash)
                g.last_entry_prices[code] = open_price

## 收盘后运行函数
def after_market_close(context):
    log.info(str('函数运行时间(after_market_close):'+str(context.current_dt.time())))
    p = context.portfolio
    pos_level = p.positions_value / p.total_value
    record(pos_level=pos_level)
    trades = get_trades()
    for _trade in trades.values():
        log.info('成交记录:'+str(_trade))
    log.info('一天结束')
    log.info('##############################################################')

回测结果:

仓位管理与风险控制

3、按波动率分配资金

  • 计算一段时间内的ATR作为波动率
  • 每只股票允许承担一定百分比的风险,折算到具体的风险金额
  • 以该风险金额和波动率的值来确定建仓个股的资金头寸

波动率越大,分配的头寸越小
波动率越小,分配的头寸越大

代码实现:

# 导入函数库
from jqdata import *
import pandas as pd
# 均线
MA_WIN_1 = 10
MA_WIN_2 = 30

# #加仓判断阈值
# INC_POS_PF_RATE = 0.05
#ATR计算窗口大小
ATR_WIN_SIZE= 20 
#总风险因子
RISK_RATIO=0.001
#个股分析因子
STOCK_RISK_RATIO =1

# 初始化函数,设定基准等等
def initialize(context):
    # 设定沪深300作为基准
    set_benchmark('000300.XSHG')
    # 开启动态复权模式(真实价格)
    set_option('use_real_price', True)
    # 输出内容到日志 log.info()
    log.info('初始函数开始运行且全局只运行一次')
    # 过滤掉order系列API产生的比error级别低的log
    # log.set_level('order', 'error')

    ### 股票相关设定 ###
    # 股票类每笔交易时的手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
    set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')

    ## 运行函数(reference_security为运行时间的参考标的;传入的标的只做种类区分,因此传入'000300.XSHG'或'510300.XSHG'是一样的)
      # 开盘前运行
    run_daily(before_market_open, time='before_open', reference_security='000300.XSHG')
      # 每分钟运行
    run_daily(market_open, time='every_bar', reference_security='000300.XSHG')
      # 收盘后运行
    run_daily(after_market_close, time='after_close', reference_security='000300.XSHG')
    #股票池-上证50
    g.stock_pool = get_index_stocks('000016.XSHG',date=context.current_dt)
    g.init_cash = context.portfolio.starting_cash#启动资金
    g.last_entry_prices = {code:None for code in g.stock_pool}
    
## 开盘前运行函数
def before_market_open(context):
    look_ahead_n = max(MA_WIN_1,MA_WIN_2)+1
    g.up_cross_signaled = set()
    g.down_cross_signaled = set()
    for code in g.stock_pool:
        df = attribute_history(code,look_ahead_n,'1d',fields=['close'],skip_paused=True)
        if len(df) != look_ahead_n:
            continue
        close = df['close']
        ma_short = close.rolling(MA_WIN_1).mean()#短时均线
        ma_long = close.rolling(MA_WIN_2).mean()#长时均线
        uc_flags = (ma_short.shift(1) <= ma_long.shift(1)) & (ma_short > ma_long)#上穿标志
        dc_flags = (ma_short.shift(1) >= ma_long.shift(1)) & (ma_short < ma_long)#下穿标志
        if uc_flags.iloc[-1]:
            g.up_cross_signaled.add(code)
        if dc_flags.iloc[-1]:
            g.down_cross_signaled.add(code)
        print(g.up_cross_signaled)  

## 开盘时运行函数
def market_open(context):
    log.info('函数运行时间(market_open):'+str(context.current_dt.time()))
    p = context.portfolio#资金账户
    current_data = get_current_data()
    
    #每只股票分配的资金
    each_cash = g.init_cash/len(g.stock_pool)
    #卖出均线、死叉信号的持仓股
    for code ,pos in p.positions.items():
        if code in g.down_cross_signaled:
            order_target(code,0)
            g.last_entry_prices[code]= None#更完美的实现还要判断成交状态,后面同理
            
    #买入均线金叉信号的持仓股
    for code in g.up_cross_signaled:
        if code not in p.positions:
            if current_data[code].paused:
                continue
            df = attribute_history(code, ATR_WIN_SIZE+1, "1d", ["high", "low", "close"], skip_paused=True)
            if len(df) != ATR_WIN_SIZE+1:
                continue
            df["pdc"] = df["close"].shift(1)
            tr = df.apply(lambda x: max(x["high"]-x["low"], abs(x["high"]-x["pdc"]), abs(x["pdc"]-x["low"])), axis=1)
            atr = tr[-ATR_WIN_SIZE:].mean()
            num_to_buy = (g.init_cash * RISK_RATIO) / (atr * STOCK_RISK_RATIO) 
            
            order(code,num_to_buy)
  
## 收盘后运行函数
def after_market_close(context):
    log.info(str('函数运行时间(after_market_close):'+str(context.current_dt.time())))
    p = context.portfolio
    pos_level = p.positions_value / p.total_value
    record(pos_level=pos_level)
    trades = get_trades()
    for _trade in trades.values():
        log.info('成交记录:'+str(_trade))
    log.info('一天结束')
    log.info('##############################################################')

回测结果:

仓位管理与风险控制

4 回撤止盈(浮动止损)
算法描述:

  • 建仓后,时刻记录股价走势的最高点
  • 计算最新价相对于前期高点下跌幅度的百分比
  • 如果下跌百分比大于某个阈值,则止盈退出

基于按波动率分配资金的方案进行改进实验

  • 目标:减小回撤

代码实现:

# 导入函数库
from jqdata import *
import pandas as pd

# 回撤的幅度
MAX_DROP_RATE = 0.03
# 初始化函数,设定基准等等
def initialize(context):
    # 设定沪深300作为基准
    set_benchmark('000300.XSHG')
    # 开启动态复权模式(真实价格)
    set_option('use_real_price', True)
    # 输出内容到日志 log.info()
    log.info('初始函数开始运行且全局只运行一次')
    # 过滤掉order系列API产生的比error级别低的log
    # log.set_level('order', 'error')

    ### 股票相关设定 ###
    # 股票类每笔交易时的手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
    set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')

    ## 运行函数(reference_security为运行时间的参考标的;传入的标的只做种类区分,因此传入'000300.XSHG'或'510300.XSHG'是一样的)
      # 开盘前运行
    run_daily(before_market_open, time='before_open', reference_security='000300.XSHG')
      # 每分钟运行
    run_daily(market_open, time='every_bar', reference_security='000300.XSHG')
      # 收盘后运行
    run_daily(after_market_close, time='after_close', reference_security='000300.XSHG')
    #股票池-上证50
    g.stock_pool = get_index_stocks('000016.XSHG',date=context.current_dt)
    g.init_cash = context.portfolio.starting_cash#启动资金
    g.entry_dates = {code:None for code in g.stock_pool}
    
## 开盘前运行函数
def before_market_open(context):
    look_ahead_n = max(MA_WIN_1,MA_WIN_2)+1
    g.up_cross_signaled = set()
    g.down_cross_signaled = set()
    for code in g.stock_pool:
        df = attribute_history(code,look_ahead_n,'1d',fields=['close'],skip_paused=True)
        if len(df) != look_ahead_n:
            continue
        close = df['close']
        ma_short = close.rolling(MA_WIN_1).mean()#短时均线
        ma_long = close.rolling(MA_WIN_2).mean()#长时均线
        uc_flags = (ma_short.shift(1) <= ma_long.shift(1)) & (ma_short > ma_long)#上穿标志
        dc_flags = (ma_short.shift(1) >= ma_long.shift(1)) & (ma_short < ma_long)#下穿标志
        if uc_flags.iloc[-1]:
            g.up_cross_signaled.add(code)
        if dc_flags.iloc[-1]:
            g.down_cross_signaled.add(code)

## 开盘时运行函数
def market_open(context):
    log.info('函数运行时间(market_open):'+str(context.current_dt.time()))
    cur_dt = context.current_dt.date()  # 当前日期
    p = context.portfolio#资金账户
    current_data = get_current_data()
    
    #每只股票分配的资金
    each_cash = g.init_cash/len(g.stock_pool)
    #卖出均线、死叉信号的持仓股
    for code ,pos in p.positions.items():
        if code in g.down_cross_signaled:
            order_target(code,0)
            
    # 买入均线金叉信号的持仓股
    for code in g.up_cross_signaled:
        if code not in p.positions:
            if current_data[code].paused:
                continue
            df = attribute_history(code, ATR_WIN_SIZE+1, "1d", ["high", "low", "close"], skip_paused=True)
            if len(df) != ATR_WIN_SIZE+1:
                continue
            df["pdc"] = df["close"].shift(1)
            tr = df.apply(lambda x: max(x["high"]-x["low"], abs(x["high"]-x["pdc"]), abs(x["pdc"]-x["low"])), axis=1)
            atr = tr[-ATR_WIN_SIZE:].mean()
            num_to_buy = g.init_cash * RISK_RATIO / atr // 100 * 100
            order(code, num_to_buy)
            
            g.entry_dates[code] = cur_dt

    # 检查有无符合回撤止盈条件的持仓股
    for code, pos in p.positions.items():
        if current_data[code].paused:
            continue

        if pos.today_amount == 0 and pos.closeable_amount > 0:
            last_entry_date = g.entry_dates[code]
            prev_date = context.current_dt - timedelta(days=1)

            # 计算截止到前一天的HHV,避免未来函数,并且考虑到前复权问题,每个交易日都是重新取数据计算
            df = get_price(code, start_date=last_entry_date, end_date=prev_date, frequency="1d", fields=["high"], skip_paused=True)
            hhv = df["high"].max()

            drop_rate = (hhv - current_data[code].day_open) / hhv  # 以当日开盘价计算,或者,也可以用前一交易日的收盘价计算
            if drop_rate > MAX_DROP_RATE:
                order_target(code, 0)
  
## 收盘后运行函数
def after_market_close(context):
    log.info(str('函数运行时间(after_market_close):'+str(context.current_dt.time())))
    p = context.portfolio
    pos_level = p.positions_value / p.total_value
    record(pos_level=pos_level)
    trades = get_trades()
    for _trade in trades.values():
        log.info('成交记录:'+str(_trade))
    log.info('一天结束')
    log.info('##############################################################')

回测结果

仓位管理与风险控制