huice/futures_data_player.py
Your Name 7f4f88e853 feat: 添加期货数据播放器及相关测试和文档
新增期货数据动态播放器功能,包括基础版和增强版实现,添加测试脚本和详细文档说明。主要变更包括:
1. 实现买卖盘深度可视化播放功能
2. 添加播放控制、速度调节和跳转功能
3. 提供统一价格轴显示优化版本
4. 添加测试脚本验证功能
5. 编写详细使用文档和README说明
2025-11-02 23:57:10 +08:00

482 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
期货数据动态播放器 - 买卖盘深度时序可视化
基于au2512_20251013.parquet数据
功能特性:
- 按数列号顺序播放买卖盘深度变化
- 支持播放/暂停/加速/减速
- 可跳转到指定数列号
- 基础播放速度500ms/数据点
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.widgets import Button, Slider
import tkinter as tk
from tkinter import ttk
import threading
import time
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import matplotlib.patches as mpatches
from datetime import datetime
import matplotlib.font_manager as fm
# 设置matplotlib支持中文显示
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
class FuturesDataPlayer:
def __init__(self, data_path='data/au2512_20251013.parquet'):
"""初始化期货数据播放器"""
self.data_path = data_path
self.df = None
self.current_index = 0
self.is_playing = False
self.play_speed = 500 # 基础播放速度(毫秒)
self.speed_multiplier = 1.0
# 加载数据
self.load_data()
# 创建GUI界面
self.setup_gui()
def load_data(self):
"""加载并预处理期货数据"""
print("正在加载数据...")
self.df = pd.read_parquet(self.data_path)
# 列名映射
columns_mapping = {
'UTC': 'UTC',
'UTC.1': 'UTC_1',
'时间': 'time',
'累积成交量': 'cumulative_volume',
'成交价': 'price',
'成交额': 'amount',
'买1价': 'bid1_price', '卖1价': 'ask1_price',
'买1量': 'bid1_volume', '卖1量': 'ask1_volume',
'买2价': 'bid2_price', '卖2价': 'ask2_price',
'买2量': 'bid2_volume', '卖2量': 'ask2_volume',
'买3价': 'bid3_price', '卖3价': 'ask3_price',
'买3量': 'bid3_volume', '卖3量': 'ask3_volume',
'买4价': 'bid4_price', '卖4价': 'ask4_price',
'买4量': 'bid4_volume', '卖4量': 'ask4_volume',
'买5价': 'bid5_price', '卖5价': 'ask5_price',
'买5量': 'bid5_volume', '卖5量': 'ask5_volume'
}
# 重命名列
self.df = self.df.rename(columns=columns_mapping)
# 数据清洗:去除无效的买卖盘数据
bid_cols = [f'bid{i}_price' for i in range(1, 6)]
ask_cols = [f'ask{i}_price' for i in range(1, 6)]
# 过滤掉买卖盘价格无效的数据
valid_mask = (
self.df[bid_cols].notnull().all(axis=1) &
self.df[ask_cols].notnull().all(axis=1)
)
self.df = self.df[valid_mask].reset_index(drop=True)
print(f"数据加载完成!有效数据点: {len(self.df)}")
print(f"时间范围: {self.df['time'].iloc[0]} - {self.df['time'].iloc[-1]}")
print(f"价格范围: {self.df['price'].min():.2f} - {self.df['price'].max():.2f}")
def setup_gui(self):
"""设置GUI界面"""
# 创建主窗口
self.root = tk.Tk()
self.root.title("期货数据动态播放器 - AU2512买卖盘深度分析")
self.root.geometry("1400x900")
# 创建主框架
main_frame = ttk.Frame(self.root)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# 左侧:图表区域
chart_frame = ttk.Frame(main_frame)
chart_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
# 右侧:控制面板
control_frame = ttk.Frame(main_frame, width=300)
control_frame.pack(side=tk.RIGHT, fill=tk.Y, padx=(10, 0))
control_frame.pack_propagate(False)
# 创建matplotlib图表
self.setup_chart(chart_frame)
# 创建控制面板
self.setup_controls(control_frame)
# 初始化显示
self.update_display()
def setup_chart(self, parent):
"""设置图表"""
# 创建图形和轴
self.fig, (self.ax1, self.ax2) = plt.subplots(2, 1, figsize=(10, 8))
self.fig.patch.set_facecolor('#f0f0f0')
# 买盘深度图(上方)
self.ax1.set_title('买盘深度 (Bid Depth)', fontsize=14, fontweight='bold')
self.ax1.set_xlabel('价格')
self.ax1.set_ylabel('累积成交量')
self.ax1.grid(True, alpha=0.3)
self.ax1.set_facecolor('#ffffff')
# 卖盘深度图(下方)
self.ax2.set_title('卖盘深度 (Ask Depth)', fontsize=14, fontweight='bold')
self.ax2.set_xlabel('价格')
self.ax2.set_ylabel('累积成交量')
self.ax2.grid(True, alpha=0.3)
self.ax2.set_facecolor('#ffffff')
plt.tight_layout()
# 嵌入到tkinter
self.canvas = FigureCanvasTkAgg(self.fig, parent)
self.canvas.draw()
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
def setup_controls(self, parent):
"""设置控制面板"""
# 标题
title_label = ttk.Label(parent, text="播放控制", font=('Arial', 16, 'bold'))
title_label.pack(pady=10)
# 当前状态显示
status_frame = ttk.LabelFrame(parent, text="当前状态", padding=10)
status_frame.pack(fill=tk.X, pady=10)
self.index_label = ttk.Label(status_frame, text="数列号: 0", font=('Arial', 11))
self.index_label.pack(anchor=tk.W)
self.time_label = ttk.Label(status_frame, text="时间: --", font=('Arial', 11))
self.time_label.pack(anchor=tk.W)
self.price_label = ttk.Label(status_frame, text="成交价: --", font=('Arial', 11))
self.price_label.pack(anchor=tk.W)
self.volume_label = ttk.Label(status_frame, text="累积成交量: --", font=('Arial', 11))
self.volume_label.pack(anchor=tk.W)
# 播放控制按钮
control_frame = ttk.LabelFrame(parent, text="播放控制", padding=10)
control_frame.pack(fill=tk.X, pady=10)
# 播放/暂停按钮
self.play_button = ttk.Button(
control_frame,
text="▶ 播放",
command=self.toggle_play,
width=20
)
self.play_button.pack(pady=5)
# 停止按钮
self.stop_button = ttk.Button(
control_frame,
text="■ 停止",
command=self.stop_playback,
width=20
)
self.stop_button.pack(pady=5)
# 速度控制
speed_frame = ttk.LabelFrame(parent, text="速度控制", padding=10)
speed_frame.pack(fill=tk.X, pady=10)
# 速度滑块
self.speed_var = tk.DoubleVar(value=1.0)
speed_slider = ttk.Scale(
speed_frame,
from_=0.1,
to=5.0,
variable=self.speed_var,
orient=tk.HORIZONTAL,
command=self.update_speed
)
speed_slider.pack(fill=tk.X, pady=5)
self.speed_label = ttk.Label(speed_frame, text="播放速度: 1.0x")
self.speed_label.pack()
# 快速速度按钮
speed_buttons_frame = ttk.Frame(speed_frame)
speed_buttons_frame.pack(pady=5)
ttk.Button(speed_buttons_frame, text="0.5x", command=lambda: self.set_speed(0.5), width=8).pack(side=tk.LEFT, padx=2)
ttk.Button(speed_buttons_frame, text="1x", command=lambda: self.set_speed(1.0), width=8).pack(side=tk.LEFT, padx=2)
ttk.Button(speed_buttons_frame, text="2x", command=lambda: self.set_speed(2.0), width=8).pack(side=tk.LEFT, padx=2)
ttk.Button(speed_buttons_frame, text="5x", command=lambda: self.set_speed(5.0), width=8).pack(side=tk.LEFT, padx=2)
# 跳转控制
jump_frame = ttk.LabelFrame(parent, text="跳转控制", padding=10)
jump_frame.pack(fill=tk.X, pady=10)
# 数列号输入
ttk.Label(jump_frame, text="跳转到数列号:").pack(anchor=tk.W)
jump_input_frame = ttk.Frame(jump_frame)
jump_input_frame.pack(fill=tk.X, pady=5)
self.jump_entry = ttk.Entry(jump_input_frame)
self.jump_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
ttk.Button(jump_input_frame, text="跳转", command=self.jump_to_index, width=10).pack(side=tk.RIGHT, padx=(5, 0))
# 进度条
progress_frame = ttk.LabelFrame(parent, text="播放进度", padding=10)
progress_frame.pack(fill=tk.X, pady=10)
self.progress_var = tk.IntVar()
self.progress_bar = ttk.Progressbar(
progress_frame,
variable=self.progress_var,
maximum=len(self.df) - 1,
orient=tk.HORIZONTAL
)
self.progress_bar.pack(fill=tk.X, pady=5)
self.progress_label = ttk.Label(progress_frame, text=f"0 / {len(self.df) - 1}")
self.progress_label.pack()
# 数据统计
stats_frame = ttk.LabelFrame(parent, text="数据统计", padding=10)
stats_frame.pack(fill=tk.X, pady=10)
stats_text = f"总数据点: {len(self.df)}\\n"
stats_text += f"时间跨度: {self.df['time'].iloc[0]} - {self.df['time'].iloc[-1]}\\n"
stats_text += f"价格区间: {self.df['price'].min():.2f} - {self.df['price'].max():.2f}"
ttk.Label(stats_frame, text=stats_text, font=('Arial', 9)).pack(anchor=tk.W)
def get_market_depth_data(self, row_data):
"""获取买卖盘深度数据"""
bid_prices = []
bid_volumes = []
ask_prices = []
ask_volumes = []
# 收集买卖盘数据从1到5档
for i in range(1, 6):
bid_price = row_data[f'bid{i}_price']
bid_volume = row_data[f'bid{i}_volume']
ask_price = row_data[f'ask{i}_price']
ask_volume = row_data[f'ask{i}_volume']
if pd.notna(bid_price) and pd.notna(bid_volume) and bid_volume > 0:
bid_prices.append(bid_price)
bid_volumes.append(bid_volume)
if pd.notna(ask_price) and pd.notna(ask_volume) and ask_volume > 0:
ask_prices.append(ask_price)
ask_volumes.append(ask_volume)
return bid_prices, bid_volumes, ask_prices, ask_volumes
def update_display(self):
"""更新显示内容"""
if self.df is None or self.current_index >= len(self.df):
return
# 获取当前行数据
row_data = self.df.iloc[self.current_index]
# 清空图表
self.ax1.clear()
self.ax2.clear()
# 重新设置图表属性
self.ax1.set_title('买盘深度 (Bid Depth)', fontsize=14, fontweight='bold')
self.ax1.set_xlabel('价格')
self.ax1.set_ylabel('成交量')
self.ax1.grid(True, alpha=0.3)
self.ax1.set_facecolor('#ffffff')
self.ax2.set_title('卖盘深度 (Ask Depth)', fontsize=14, fontweight='bold')
self.ax2.set_xlabel('价格')
self.ax2.set_ylabel('成交量')
self.ax2.grid(True, alpha=0.3)
self.ax2.set_facecolor('#ffffff')
# 获取买卖盘深度数据
bid_prices, bid_volumes, ask_prices, ask_volumes = self.get_market_depth_data(row_data)
# 绘制买盘深度(绿色,从左到右递减)
if bid_prices:
# 按价格排序(买盘价格从高到低)
bid_data = sorted(zip(bid_prices, bid_volumes), reverse=True)
sorted_bid_prices, sorted_bid_volumes = zip(*bid_data)
# 计算累积成交量
cumulative_bid_volumes = np.cumsum(sorted_bid_volumes)
# 绘制买盘柱状图
colors_bid = plt.cm.Greens(np.linspace(0.4, 0.9, len(sorted_bid_prices)))
bars1 = self.ax1.barh(range(len(sorted_bid_prices)), sorted_bid_volumes,
color=colors_bid, alpha=0.8, edgecolor='darkgreen', linewidth=1)
# 设置Y轴标签为价格
self.ax1.set_yticks(range(len(sorted_bid_prices)))
self.ax1.set_yticklabels([f'{p:.2f}' for p in sorted_bid_prices])
# 添加成交量标签
for i, (price, volume) in enumerate(bid_data):
self.ax1.text(volume + max(sorted_bid_volumes) * 0.01, i, f'{int(volume)}',
va='center', ha='left', fontsize=9)
# 设置X轴范围
max_bid_vol = max(sorted_bid_volumes)
self.ax1.set_xlim(0, max_bid_vol * 1.3)
else:
self.ax1.text(0.5, 0.5, '无买盘数据', ha='center', va='center',
transform=self.ax1.transAxes, fontsize=12)
# 绘制卖盘深度(红色,从左到右递增)
if ask_prices:
# 按价格排序(卖盘价格从低到高)
ask_data = sorted(zip(ask_prices, ask_volumes))
sorted_ask_prices, sorted_ask_volumes = zip(*ask_data)
# 绘制卖盘柱状图
colors_ask = plt.cm.Reds(np.linspace(0.4, 0.9, len(sorted_ask_prices)))
bars2 = self.ax2.barh(range(len(sorted_ask_prices)), sorted_ask_volumes,
color=colors_ask, alpha=0.8, edgecolor='darkred', linewidth=1)
# 设置Y轴标签为价格
self.ax2.set_yticks(range(len(sorted_ask_prices)))
self.ax2.set_yticklabels([f'{p:.2f}' for p in sorted_ask_prices])
# 添加成交量标签
for i, (price, volume) in enumerate(ask_data):
self.ax2.text(volume + max(sorted_ask_volumes) * 0.01, i, f'{int(volume)}',
va='center', ha='left', fontsize=9)
# 设置X轴范围
max_ask_vol = max(sorted_ask_volumes)
self.ax2.set_xlim(0, max_ask_vol * 1.3)
else:
self.ax2.text(0.5, 0.5, '无卖盘数据', ha='center', va='center',
transform=self.ax2.transAxes, fontsize=12)
# 添加当前成交价标记
current_price = row_data['price']
self.ax1.axvline(x=current_price, color='blue', linestyle='--', alpha=0.7, label=f'成交价: {current_price:.2f}')
self.ax2.axvline(x=current_price, color='blue', linestyle='--', alpha=0.7, label=f'成交价: {current_price:.2f}')
self.ax1.legend(loc='upper right')
self.ax2.legend(loc='upper right')
plt.tight_layout()
self.canvas.draw()
# 更新状态标签
self.index_label.config(text=f"数列号: {self.current_index}")
self.time_label.config(text=f"时间: {row_data['time']}")
self.price_label.config(text=f"成交价: {current_price:.2f}")
self.volume_label.config(text=f"累积成交量: {int(row_data['cumulative_volume']):,}")
# 更新进度条
self.progress_var.set(self.current_index)
self.progress_label.config(text=f"{self.current_index} / {len(self.df) - 1}")
def toggle_play(self):
"""切换播放/暂停状态"""
if not self.is_playing:
self.is_playing = True
self.play_button.config(text="⏸ 暂停")
# 启动播放线程
self.play_thread = threading.Thread(target=self.playback_loop, daemon=True)
self.play_thread.start()
else:
self.is_playing = False
self.play_button.config(text="▶ 播放")
def stop_playback(self):
"""停止播放"""
self.is_playing = False
self.play_button.config(text="▶ 播放")
self.current_index = 0
self.update_display()
def playback_loop(self):
"""播放循环"""
while self.is_playing and self.current_index < len(self.df) - 1:
self.current_index += 1
# 在主线程中更新显示
self.root.after(0, self.update_display)
# 根据速度设置延迟
delay = self.play_speed / self.speed_multiplier
time.sleep(delay / 1000.0) # 转换为秒
# 播放结束
if self.current_index >= len(self.df) - 1:
self.is_playing = False
self.root.after(0, lambda: self.play_button.config(text="▶ 播放"))
def update_speed(self, value):
"""更新播放速度"""
self.speed_multiplier = float(value)
self.speed_label.config(text=f"播放速度: {self.speed_multiplier:.1f}x")
def set_speed(self, speed):
"""设置播放速度"""
self.speed_var.set(speed)
self.speed_multiplier = speed
self.speed_label.config(text=f"播放速度: {self.speed_multiplier:.1f}x")
def jump_to_index(self):
"""跳转到指定数列号"""
try:
target_index = int(self.jump_entry.get())
if 0 <= target_index < len(self.df):
self.current_index = target_index
self.update_display()
else:
tk.messagebox.showwarning("输入错误", f"请输入0到{len(self.df)-1}之间的数列号")
except ValueError:
tk.messagebox.showwarning("输入错误", "请输入有效的数字")
def run(self):
"""运行应用"""
# 绑定关闭事件
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
# 启动主循环
self.root.mainloop()
def on_closing(self):
"""关闭应用时的处理"""
self.is_playing = False
self.root.destroy()
def main():
"""主函数"""
print("=== 期货数据动态播放器 ===")
print("正在初始化...")
# 创建播放器实例
player = FuturesDataPlayer()
print("初始化完成!")
print("\\n使用说明")
print("- 点击'播放'按钮开始播放买卖盘深度变化")
print("- 使用速度控制调整播放速度")
print("- 可以跳转到指定的数列号")
print("- 观察买盘(绿色)和卖盘(红色)的深度变化")
# 运行应用
player.run()
if __name__ == "__main__":
main()