量化交易实战攻略,高频交易Python源码深度解析与分享

量化交易实战攻略,高频交易Python源码深度解析与分享"/

由于高频交易(High-Frequency Trading, HFT)涉及复杂的算法、大量的资金和极高的技术要求,且通常包含敏感的商业机密,因此公开分享完整的高频交易源码是非常罕见的。
然而,我可以提供一个简化的高频交易策略的示例代码,这个示例仅用于教育和研究目的,并不适合实际的高频交易应用。以下是一个简单的高频交易策略的Python伪代码示例,它使用模拟数据来演示如何执行一个简单的策略。
```python import numpy as np import pandas as pd import matplotlib.pyplot as plt
# 模拟数据生成 def generate_data(): # 假设我们有一个时间序列的数据集 dates = pd.date_range(start='20210101', periods=1000) prices = np.random.normal(loc=100, scale=10, size=len(dates)) data = pd.DataFrame({'Date': dates, 'Price': prices}) data.set_index('Date', inplace=True) return data
# 简单的高频交易策略 def high_frequency_trading_strategy(data): trades = [] for i in range(1, len(data) - 1): # 简单的规则:如果价格连续上涨,则买入 if data['Price'][i] > data['Price'][i - 1] and data['Price'][i] > data['Price'][i + 1]: trades.append(('

相关内容:

之前看到一篇介绍高频交易的博文,就想想着用python实现一下。于是把博文


把上面内容复制到豆包,让它根据上面内容编写一个运行于掘金量化平台的策略,可以使用talib和akshare。

然后把豆包生成的代码经过数轮优化和亲自调试,得到如下代码:

# -*- coding: utf-8 -*-
import numpy as np
import talib
from gm.api import *  # 使用掘金量化标准API

# === 策略参数配置 ===
def init_params(context):
    context.params = {
        # VWAP相关
        'vwap_window': 60 * 60,  # 使用多少秒计算VWAP(如60分钟=3600秒)
        'vwap_interval': 60,  # 每隔多少秒刷新一次VWAP计算

        # 乖离率交易参数
        'pergence_upper_threshold': 1.2,
        'pergence_lower_threshold': -1.2,

        # 四维共振模型参数
        'volume_multiplier': 1.3,
        'rsi_buy_threshold': 30,
        'macd_hist_trend_days': 2,

        # 动态止盈参数
        'atr_period': 30,
        'atr_take_profit_multiplier': 1.5,

        # 梯度止损参数
        'first_stop_loss_ratio': 0.997,
        'final_stop_loss_ratio': 0.988,

        # 仓位管理
        'max_position_ratio': 0.95,
        'max_holdings': 3,
    }

    # 交易标的和周期设置
    context.symbol = 'SHSE.600000'  # 示例股票,格式为交易所.证券代码
    context.frequency_seconds = '600s'  # K线周期:5分钟 = 300 秒
    context.bar_count = 300  # 获取历史K线条数

# === 计算当日 VWAP ===
def calculate_daily_vwap(context, bar):
    now = bar
    today = now.strftime('%Y-%m-%d')

    # 获取过去60分钟的1分钟K线 => 总共60条
    kline_1min = history_n(symbol=context.symbol, frequency='60s', count=int(context.params / 60),  # 从秒转为分钟级数量
        end_time=today, fields=,  df=True)

    if len(kline_1min) == 0:
        return None

    vwap = (kline_1min * kline_1min).cumsum() / kline_1min.cumsum()
    return vwap.iloc


# === 判断乖离率交易逻辑 ===
def check_pergence(context, bar, vwap):
    current_price = bar
    pergence = (current_price - vwap) / vwap * 100

    pos = context.account().position(symbol=context.symbol, side=PositionSide_Long)
    buy_thres = context.params
    sell_thres = context.params

    signals = 
    if pergence > sell_thres and pos and pos.volume > 0:
        signals.append(('sell', 'pergence'))
    elif pergence < buy_thres:
        signals.append(('buy', 'pergence'))

    return signals


# === 量价时空四维共振模型判断逻辑 ===
def check_four_dimension_resonance(context, bar, kline_5min):
    volume = bar
    recent_vol = history_n(symbol=context.symbol, frequency='60s', count=20, end_time=context.now, fields='volume',
                           df=True)
    avg_volume_20 = recent_vol.mean()

    if volume > avg_volume_20 * context.params:
        close = kline_5min
        macd, signal, hist = talib.MACD(close, fastperiod=12, slowperiod=26, signalperiod=9)
        rsi = talib.RSI(close, timeperiod=14).values
        if len(hist) >= context.params:
            recent_hist = hist:].values
            if all(recent_hist < recent_hist for i in range(len(recent_hist) - 1)) and 
                    rsi < context.params:
                return 
    return 


# === 动态止盈模型逻辑 ===
def check_dynamic_take_profit(context, bar, kline_5min):
    high = kline_5min
    low = kline_5min
    close = kline_5min
    atr = talib.ATR(high, low, close, timeperiod=context.params).values
    if len(atr) < 2:
        return 
    current_atr = atr
    prev_atr = atr

    if current_atr > prev_atr * context.params:
        pos = context.account().position(symbol=context.symbol, side=PositionSide_Long)
        if pos and pos.volume > 0:
            return 
    return 


# === 梯度止损策略逻辑 ===
def check_gradient_stop_loss(context, bar):
    pos = context.account().position(symbol=context.symbol, side=PositionSide_Long)
    if not pos or pos.volume <= 0:
        return 

    cost_price = pos.vwap
    current_price = bar

    signals = 

    # 第一止损位
    if current_price < cost_price * context.params:
        signals.append(('sell', 'stop_loss'))

    # 支撑位止损
    lows = history_n(symbol=context.symbol, frequency='60s', count=20, end_time=context.now, fields='low', df=True)
    support_level = np.min(lows)
    if current_price < support_level:
        signals.append(('sell', 'support_stop_loss'))

    # 终极止损位
    if current_price < cost_price * context.params:
        signals.append(('sell', 'final_stop_loss'))

    return signals


# === 统一决策层 ===
def trade_engine(context, bar, vwap, kline_5min):
    signals = 

    signals.extend(check_pergence(context, bar, vwap))
    signals.extend(check_four_dimension_resonance(context, bar, kline_5min))
    signals.extend(check_dynamic_take_profit(context, bar, kline_5min))
    signals.extend(check_gradient_stop_loss(context, bar))
    priority_order = 
    for action, signal_type in sorted(signals, key=lambda x: priority_order.index(x)):
        if action == 'buy':
            if len(context.account().positions()) >= context.params:
                print("达到最大持仓数量限制,无法买入")
                continue

            cash = context.account().cash.available
            price = bar
            max_shares = int(cash / price)
            if max_shares <= 0:
                print("资金不足,无法买入")
                continue

            target_value = cash * context.params
            volume = int(target_value / price)
            if volume <= 0:
                print("目标成交量小于等于零,无法下单")
                continue

            order_target_percent(symbol=context.symbol, percent=context.params,
                                 order_type=OrderType_Limit, price=price, position_side=1)
            break
        elif action == 'sell':
            pos = context.account().position(symbol=context.symbol, side=PositionSide_Long)
            vol = pos.volume - pos.volume_today
            if pos and vol > 0:
                order_target_volume(symbol=context.symbol, volume=pos.volume_today, order_type=OrderType_Limit, price=bar, position_side=2)
            break


# === 策略初始化函数 ===
def init(context):
    init_params(context)
    subscribe(symbols=context.symbol, frequency=context.frequency_seconds, count=10)

# === 每根K线回调函数 ===
def on_bar(context, bars):
    # 获取当前时间
    if not is_trading_time(context.now):
        return
    bar = bars
    current_time = bar

    try:
        # 获取5分钟K线数据
        kline_5min = history_n(symbol=context.symbol, frequency=context.frequency_seconds, count=context.bar_count,
                                end_time=current_time, fields="open, high, low, close, volume", df=True)
        # 计算VWAP
        vwap = calculate_daily_vwap(context, bar)
        if vwap is None:
            print(f"{current_time}: 无法获取VWAP数据,跳过本次处理")
            return

        # 执行统一决策层
        trade_engine(context, bar, vwap, kline_5min)

    except Exception as e:
        print(f"数据获取或计算出错: {e}")

def on_backtest_finished(context, indicator):
    print('累计收益率:', str(indicator.pnl_ratio * 100), ',年化收益率:', str(indicator.pnl_ratio_annual * 100), ',最大回撤:',
          indicator.max_drawdown * 100, ',夏普比率:', str(indicator.sharp_ratio), ',胜率:', indicator.win_ratio * 100,
        ',开仓次数:', indicator.open_count, ',平仓次数:', indicator.close_count)

if __name__ == '__main__':
    run(strategy_id='6c3dfc86-03fb-11ed-ba2d-c85b7639cf56',
        filename='main_test.py',
        mode=MODE_BACKTEST,
        token='2a73fd387f79b2c096806a91a6b13f915f981007',
        backtest_start_time='2024-04-03 09:30:00',
        backtest_end_time='2025-04-30 15:00:00',
        backtest_adjust=ADJUST_PREV,
        backtest_initial_cash=10000000,
        backtest_commission_ratio=0.00025,
        backtest_slippage_ratio=0.0001)

def on_order_status(context, order):
    # 标的代码
    symbol = order
    # 委托价格
    price = order
    # 委托数量
    volume = order
    # 查看下单后的委托状态,等于3代表委托全部成交
    status = order
    # 买卖方向,1为买入,2为卖出
    side = order
    # 开平仓类型,1为开仓,2为平仓
    effect = order
    # 委托类型,1为限价委托,2为市价委托
    order_type = order
    if status == 3:
        if effect == 1:
            if side == 1:
                side_effect = '开多仓'
            else:
                side_effect = '开空仓'
        else:
            if side == 1:
                side_effect = '平空仓'
            else:
                side_effect = '平多仓'
        order_type_word = '限价' if order_type == 1 else '市价'
        print('{}:标的:{},操作:以{}{},委托价格:{},委托数量:{}'.format(context.now, symbol, order_type_word, side_effect, price, volume))

def is_trading_time(current_time):
    """
    判断当前时间是否为交易时间
    """
    trading_sessions = 
    current_time_str = current_time.strftime('%H:%M:%S')
    for start, end in trading_sessions:
        if start <= current_time_str <= end:
            return True
    return False

该代码实现了一个基于VWAP、乖离率、四维共振模型、动态止盈和梯度止损的量化交易策略。主要功能包括:

init_params:初始化策略参数。

calculate_daily_vwap:计算当日VWAP。

check_pergence:判断乖离率交易信号。


check_four_dimension_resonance:判断四维共振模型信号。

check_dynamic_take_profit:判断动态止盈信号。

check_gradient_stop_loss:判断梯度止损信号。

trade_engine:统一决策层,根据信号执行买卖操作。

init:策略初始化。

on_bar:每根K线回调函数,执行交易逻辑。

on_backtest_finished:回测结束后的统计信息输出。

on_order_status:订单状态回调函数。

is_trading_time:判断当前是否为交易时间。

经过测试,数据订阅频率600s和3600s的收益相当,也就是说高频率并没有提升收益率。

此策略还只是单只股票的,虽然胜率很高,但是收益是根据股票的不同而变化,之后可以考虑通过选股程序选取优质股票,再加上判断股票是否强上升趋势。强上升趋势的票再执行此策略。估计收益还能进一步提高。

创作不易,对量有兴趣的朋友欢迎点赞,关注,转发,留言。我会不定期的分享自己编写的策略源码.

发布于 2025-07-02 14:47
收藏
1
上一篇:国家税务总局发布新公告,互联网平台企业须报送涉税信息 下一篇:深思,每次交易为何总是失败?你考虑过这些原因吗?