huice/futures_player_unified.py
Your Name 7f4f88e853 feat: 添加期货数据播放器及相关测试和文档
新增期货数据动态播放器功能,包括基础版和增强版实现,添加测试脚本和详细文档说明。主要变更包括:
1. 实现买卖盘深度可视化播放功能
2. 添加播放控制、速度调节和跳转功能
3. 提供统一价格轴显示优化版本
4. 添加测试脚本验证功能
5. 编写详细使用文档和README说明
2025-11-02 23:57:10 +08:00

625 lines
25 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Unified Futures Data Player - Combined Market Depth Visualization
基于au2512_20251013.parquet数据的统一价格轴买卖盘深度显示
优化特性:
- 买卖方挂单在同一价格轴图表中显示
- 真实最小tick刻度标注 (0.02)
- 显示买卖盘之间的价格间隙
- 统一价格轴,便于观察价差和流动性
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import tkinter as tk
from tkinter import ttk, messagebox
import threading
import time
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk
from matplotlib.figure import Figure
import matplotlib.gridspec as gridspec
class UnifiedFuturesPlayer:
def __init__(self, data_path='data/au2512_20251013.parquet'):
"""初始化统一价格轴期货数据播放器"""
self.data_path = data_path
self.df = None
self.current_index = 0
self.is_playing = False
self.base_speed = 500 # Base speed in milliseconds
self.speed_multiplier = 1.0
self.play_thread = None
self.min_tick = 0.02 # AU2512最小tick
# Load and prepare data
self.load_data()
# Create GUI
self.setup_gui()
# Initial display
self.update_display()
def load_data(self):
"""加载并预处理期货数据"""
print("Loading futures data...")
self.df = pd.read_parquet(self.data_path)
# Column mapping
columns_mapping = {
'UTC': 'UTC', 'UTC.1': 'UTC_1', '时间': 'time',
'累积成交量': 'cumulative_volume', '成交价': 'price', '成交额': 'amount',
'买1价': 'bid1_price', '卖1价': 'ask1_price',
'买1量': 'bid1_volume', '卖1量': 'ask1_volume',
'买2价': 'bid2_price', '卖2价': 'ask2_price',
'买2量': 'bid2_volume', '卖2量': 'ask2_volume',
'买3价': 'bid3_price', '卖3价': 'ask3_price',
'买3量': 'bid3_volume', '卖3量': 'ask3_volume',
'买4价': 'bid4_price', '卖4价': 'ask4_price',
'买4量': 'bid4_volume', '卖4量': 'ask4_volume',
'买5价': 'bid5_price', '卖5价': 'ask5_price',
'买5量': 'bid5_volume', '卖5量': 'ask5_volume'
}
self.df = self.df.rename(columns=columns_mapping)
# Clean data: remove invalid market depth data
bid_cols = [f'bid{i}_price' for i in range(1, 6)]
ask_cols = [f'ask{i}_price' for i in range(1, 6)]
bid_vol_cols = [f'bid{i}_volume' for i in range(1, 6)]
ask_vol_cols = [f'ask{i}_volume' for i in range(1, 6)]
# Filter out rows with invalid bid/ask data
valid_mask = (
self.df[bid_cols].notnull().all(axis=1) &
self.df[ask_cols].notnull().all(axis=1) &
(self.df[bid_vol_cols] > 0).any(axis=1) &
(self.df[ask_vol_cols] > 0).any(axis=1)
)
self.df = self.df[valid_mask].reset_index(drop=True)
print(f"Data loaded successfully! Valid data points: {len(self.df)}")
print(f"Time range: {self.df['time'].iloc[0]} - {self.df['time'].iloc[-1]}")
print(f"Price range: {self.df['price'].min():.2f} - {self.df['price'].max():.2f}")
def setup_gui(self):
"""设置GUI界面"""
self.root = tk.Tk()
self.root.title("Unified Futures Player - AU2512 Combined Market Depth")
self.root.geometry("1600x900")
# Create main container with paned window
main_paned = ttk.PanedWindow(self.root, orient=tk.HORIZONTAL)
main_paned.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# Left frame for charts
chart_frame = ttk.Frame(main_paned)
main_paned.add(chart_frame, weight=3)
# Right frame for controls
control_frame = ttk.Frame(main_paned, width=350)
main_paned.add(control_frame, weight=1)
control_frame.pack_propagate(False)
# Create matplotlib figure with unified layout
self.setup_charts(chart_frame)
# Create control panel
self.setup_controls(control_frame)
# Configure style
style = ttk.Style()
style.theme_use('clam')
def setup_charts(self, parent):
"""设置统一图表布局"""
# Create figure with custom layout
self.fig = Figure(figsize=(12, 8), dpi=100)
self.fig.patch.set_facecolor('#f8f9fa')
# Create GridSpec for layout: main chart takes 3/4, smaller charts take 1/4
gs = gridspec.GridSpec(3, 2, height_ratios=[3, 1, 1], width_ratios=[2, 1],
hspace=0.3, wspace=0.2)
# Main unified market depth chart
self.ax_main = self.fig.add_subplot(gs[0, :])
# Price trend chart (bottom left)
self.ax_price = self.fig.add_subplot(gs[1, 0])
# Volume profile chart (bottom right)
self.ax_volume = self.fig.add_subplot(gs[1, 1])
# Statistics text area (bottom)
self.ax_stats = self.fig.add_subplot(gs[2, :])
self.ax_stats.axis('off')
# Embed in tkinter
self.canvas = FigureCanvasTkAgg(self.fig, parent)
self.canvas.draw()
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
# Add navigation toolbar
toolbar_frame = ttk.Frame(parent)
toolbar_frame.pack(fill=tk.X)
self.toolbar = NavigationToolbar2Tk(self.canvas, toolbar_frame)
self.toolbar.update()
def setup_controls(self, parent):
"""设置控制面板"""
# Title
title_label = ttk.Label(parent, text="Unified Market Depth Controls",
font=('Arial', 14, 'bold'))
title_label.pack(pady=10)
# Current status frame
status_frame = ttk.LabelFrame(parent, text="Current Status", padding=10)
status_frame.pack(fill=tk.X, pady=5)
self.index_label = ttk.Label(status_frame, text="Sequence: 0", font=('Arial', 10))
self.index_label.pack(anchor=tk.W)
self.time_label = ttk.Label(status_frame, text="Time: --", font=('Arial', 10))
self.time_label.pack(anchor=tk.W)
self.price_label = ttk.Label(status_frame, text="Last Price: --", font=('Arial', 10, 'bold'))
self.price_label.pack(anchor=tk.W)
self.spread_label = ttk.Label(status_frame, text="Bid-Ask Spread: --", font=('Arial', 10))
self.spread_label.pack(anchor=tk.W)
# Playback controls
playback_frame = ttk.LabelFrame(parent, text="Playback Controls", padding=10)
playback_frame.pack(fill=tk.X, pady=5)
# Play/Pause button
self.play_button = ttk.Button(
playback_frame,
text="▶ Play",
command=self.toggle_play,
width=25
)
self.play_button.pack(pady=3)
# Step controls frame
step_frame = ttk.Frame(playback_frame)
step_frame.pack(pady=5)
ttk.Button(step_frame, text="◀◀ -10", command=lambda: self.step_play(-10), width=10).pack(side=tk.LEFT, padx=2)
ttk.Button(step_frame, text="◀ -1", command=lambda: self.step_play(-1), width=10).pack(side=tk.LEFT, padx=2)
ttk.Button(step_frame, text="+1 ▶", command=lambda: self.step_play(1), width=10).pack(side=tk.LEFT, padx=2)
ttk.Button(step_frame, text="+10 ▶▶", command=lambda: self.step_play(10), width=10).pack(side=tk.LEFT, padx=2)
# Stop button
self.stop_button = ttk.Button(
playback_frame,
text="■ Stop",
command=self.stop_playback,
width=25
)
self.stop_button.pack(pady=3)
# Speed controls
speed_frame = ttk.LabelFrame(parent, text="Speed Control", padding=10)
speed_frame.pack(fill=tk.X, pady=5)
# Speed slider
self.speed_var = tk.DoubleVar(value=1.0)
speed_slider = ttk.Scale(
speed_frame,
from_=0.1,
to=10.0,
variable=self.speed_var,
orient=tk.HORIZONTAL,
command=self.update_speed
)
speed_slider.pack(fill=tk.X, pady=3)
self.speed_label = ttk.Label(speed_frame, text="Speed: 1.0x", font=('Arial', 10))
self.speed_label.pack()
# Quick speed buttons
speed_buttons_frame = ttk.Frame(speed_frame)
speed_buttons_frame.pack(pady=3)
ttk.Button(speed_buttons_frame, text="0.25x", command=lambda: self.set_speed(0.25), width=8).pack(side=tk.LEFT, padx=1)
ttk.Button(speed_buttons_frame, text="0.5x", command=lambda: self.set_speed(0.5), width=8).pack(side=tk.LEFT, padx=1)
ttk.Button(speed_buttons_frame, text="1x", command=lambda: self.set_speed(1.0), width=8).pack(side=tk.LEFT, padx=1)
ttk.Button(speed_buttons_frame, text="2x", command=lambda: self.set_speed(2.0), width=8).pack(side=tk.LEFT, padx=1)
ttk.Button(speed_buttons_frame, text="5x", command=lambda: self.set_speed(5.0), width=8).pack(side=tk.LEFT, padx=1)
ttk.Button(speed_buttons_frame, text="10x", command=lambda: self.set_speed(10.0), width=8).pack(side=tk.LEFT, padx=1)
# Navigation controls
nav_frame = ttk.LabelFrame(parent, text="Navigation", padding=10)
nav_frame.pack(fill=tk.X, pady=5)
# Quick jump buttons
quick_jump_frame = ttk.Frame(nav_frame)
quick_jump_frame.pack(pady=5)
ttk.Button(quick_jump_frame, text="Start", command=lambda: self.jump_to(0), width=10).pack(side=tk.LEFT, padx=2)
ttk.Button(quick_jump_frame, text="25%", command=lambda: self.jump_to(len(self.df) // 4), width=10).pack(side=tk.LEFT, padx=2)
ttk.Button(quick_jump_frame, text="50%", command=lambda: self.jump_to(len(self.df) // 2), width=10).pack(side=tk.LEFT, padx=2)
ttk.Button(quick_jump_frame, text="75%", command=lambda: self.jump_to(3 * len(self.df) // 4), width=10).pack(side=tk.LEFT, padx=2)
ttk.Button(quick_jump_frame, text="End", command=lambda: self.jump_to(len(self.df) - 1), width=10).pack(side=tk.LEFT, padx=2)
# Precise jump
jump_input_frame = ttk.Frame(nav_frame)
jump_input_frame.pack(fill=tk.X, pady=5)
ttk.Label(jump_input_frame, text="Jump to:").pack(side=tk.LEFT)
self.jump_entry = ttk.Entry(jump_input_frame, width=10)
self.jump_entry.pack(side=tk.LEFT, padx=5)
ttk.Button(jump_input_frame, text="Go", command=self.jump_to_entry, width=8).pack(side=tk.LEFT)
# Progress bar
progress_frame = ttk.LabelFrame(parent, text="Progress", padding=10)
progress_frame.pack(fill=tk.X, pady=5)
self.progress_var = tk.IntVar()
self.progress_bar = ttk.Progressbar(
progress_frame,
variable=self.progress_var,
maximum=len(self.df) - 1,
orient=tk.HORIZONTAL
)
self.progress_bar.pack(fill=tk.X, pady=3)
self.progress_label = ttk.Label(progress_frame, text=f"0 / {len(self.df) - 1}")
self.progress_label.pack()
def get_market_depth_data(self, row_data):
"""获取买卖盘深度数据"""
bid_prices = []
bid_volumes = []
ask_prices = []
ask_volumes = []
# 收集买卖盘数据
for i in range(1, 6):
bid_price = row_data[f'bid{i}_price']
bid_volume = row_data[f'bid{i}_volume']
ask_price = row_data[f'ask{i}_price']
ask_volume = row_data[f'ask{i}_volume']
if pd.notna(bid_price) and pd.notna(bid_volume) and bid_volume > 0:
bid_prices.append(bid_price)
bid_volumes.append(bid_volume)
if pd.notna(ask_price) and pd.notna(ask_volume) and ask_volume > 0:
ask_prices.append(ask_price)
ask_volumes.append(ask_volume)
return bid_prices, bid_volumes, ask_prices, ask_volumes
def update_display(self):
"""更新统一价格轴显示"""
if self.df is None or self.current_index >= len(self.df):
return
# Get current data
row_data = self.df.iloc[self.current_index]
current_price = row_data['price']
# Clear all axes
self.ax_main.clear()
self.ax_price.clear()
self.ax_volume.clear()
self.ax_stats.clear()
# Get market depth data
bid_prices, bid_volumes, ask_prices, ask_volumes = self.get_market_depth_data(row_data)
# === Main Unified Market Depth Chart ===
self.ax_main.set_title('Unified Market Depth - AU2512', fontsize=16, fontweight='bold', pad=20)
self.ax_main.set_xlabel('Volume', fontsize=12)
self.ax_main.set_ylabel('Price (¥)', fontsize=12)
self.ax_main.grid(True, alpha=0.3, axis='x')
if bid_prices and ask_prices:
# Define volume scale points (equal spacing)
volume_scale_points = [10, 30, 60, 150, 300]
# Create price range centered around current price
price_range = 2.0 # Show ±1元 around current price
min_price = current_price - price_range/2
max_price = current_price + price_range/2
# Generate price levels with minimum tick spacing
tick_prices = np.arange(np.floor(min_price / self.min_tick) * self.min_tick,
np.ceil(max_price / self.min_tick) * self.min_tick + self.min_tick,
self.min_tick)
# Prepare data for unified display
all_prices = []
all_volumes = []
all_colors = []
# Add bid data (green horizontal bars)
for bp, bv in zip(bid_prices, bid_volumes):
all_prices.append(bp)
all_volumes.append(bv)
all_colors.append('green')
# Add ask data (red horizontal bars)
for ap, av in zip(ask_prices, ask_volumes):
all_prices.append(ap)
all_volumes.append(av)
all_colors.append('red')
# Map volumes to scale positions
def map_volume_to_scale(volume):
"""Map actual volume to scale position"""
if volume <= 10:
return 10
elif volume <= 30:
return 30
elif volume <= 60:
return 60
elif volume <= 150:
return 150
elif volume <= 300:
return 300
else:
return 300 # Cap at 300 for display
# Map volumes to display scale
mapped_volumes = [map_volume_to_scale(v) for v in all_volumes]
# Create horizontal bar chart (volume on x-axis, price on y-axis)
bars = self.ax_main.barh(all_prices, mapped_volumes,
height=self.min_tick * 0.8, # Bar height based on tick size
color=all_colors, alpha=0.7,
edgecolor='darkgreen' if 'green' in all_colors else 'darkred',
linewidth=1)
# Add volume labels on bars
for price, volume, mapped_vol in zip(all_prices, all_volumes, mapped_volumes):
if volume > 0:
self.ax_main.text(mapped_vol + 5, price, f'{int(volume):,}',
ha='left', va='center', fontsize=8, fontweight='bold')
# Set y-axis with true tick spacing (price)
self.ax_main.set_yticks(tick_prices[::3]) # Show every 3rd tick to avoid crowding
self.ax_main.set_yticklabels([f'{p:.2f}' for p in tick_prices[::3]])
self.ax_main.set_ylim(min_price, max_price)
# Set x-axis with defined volume scale points
self.ax_main.set_xticks(volume_scale_points)
self.ax_main.set_xticklabels([f'{v}' for v in volume_scale_points])
self.ax_main.set_xlim(0, max(volume_scale_points) * 1.2)
# Add current price line
self.ax_main.axhline(y=current_price, color='blue', linestyle='--', alpha=0.8, linewidth=2,
label=f'Last: {current_price:.2f}')
# Add spread visualization
if bid_prices and ask_prices:
best_bid = max(bid_prices)
best_ask = min(ask_prices)
spread = best_ask - best_bid
# Highlight spread area
self.ax_main.axhspan(best_bid, best_ask, alpha=0.2, color='yellow',
label=f'Spread: {spread:.2f}')
# Add legend
bid_patch = mpatches.Patch(color='green', alpha=0.7, label='Bid Volume')
ask_patch = mpatches.Patch(color='red', alpha=0.7, label='Ask Volume')
self.ax_main.legend(handles=[bid_patch, ask_patch], loc='upper right')
else:
self.ax_main.text(0.5, 0.5, 'No Market Depth Data Available',
ha='center', va='center', transform=self.ax_main.transAxes,
fontsize=14, color='red')
# === Price Trend Chart ===
self.plot_price_trend()
# === Volume Profile Chart ===
self.plot_volume_profile()
# === Statistics Display ===
self.display_statistics(row_data, bid_prices, ask_prices)
# Update layout
self.fig.tight_layout()
self.canvas.draw()
# Update status labels
self.update_status_labels(row_data, bid_prices, ask_prices)
# Update progress
self.progress_var.set(self.current_index)
self.progress_label.config(text=f"{self.current_index} / {len(self.df) - 1}")
def plot_price_trend(self):
"""绘制价格趋势图"""
self.ax_price.set_title('Price Trend (Last 100 points)', fontsize=10)
self.ax_price.set_xlabel('Sequence', fontsize=8)
self.ax_price.set_ylabel('Price', fontsize=8)
self.ax_price.grid(True, alpha=0.3)
# Get recent data
start_idx = max(0, self.current_index - 99)
end_idx = self.current_index + 1
if end_idx > start_idx:
recent_data = self.df.iloc[start_idx:end_idx]
# Plot price line
self.ax_price.plot(recent_data.index, recent_data['price'],
'b-', linewidth=1.5, label='Price')
# Plot moving average
if len(recent_data) >= 10:
ma = recent_data['price'].rolling(window=10, min_periods=1).mean()
self.ax_price.plot(recent_data.index, ma, 'r--', linewidth=1,
label='10-MA', alpha=0.7)
# Mark current position
self.ax_price.plot(self.current_index, self.df.iloc[self.current_index]['price'],
'ro', markersize=6, label='Current')
self.ax_price.legend(loc='upper left', fontsize=8)
self.ax_price.tick_params(axis='both', labelsize=8)
def plot_volume_profile(self):
"""绘制成交量分析图"""
self.ax_volume.set_title('Volume Analysis', fontsize=10)
self.ax_volume.set_xlabel('Volume', fontsize=8)
self.ax_volume.set_ylabel('Cumulative Volume', fontsize=8)
self.ax_volume.grid(True, alpha=0.3)
# Get recent data
start_idx = max(0, self.current_index - 99)
end_idx = self.current_index + 1
if end_idx > start_idx:
recent_data = self.df.iloc[start_idx:end_idx]
# Plot cumulative volume
self.ax_volume.plot(recent_data['cumulative_volume'], recent_data['price'],
'g-', linewidth=1.5, alpha=0.7)
# Mark current position
current_row = self.df.iloc[self.current_index]
self.ax_volume.plot(current_row['cumulative_volume'], current_row['price'],
'ro', markersize=6)
self.ax_volume.tick_params(axis='both', labelsize=8)
def display_statistics(self, row_data, bid_prices, ask_prices):
"""显示统计信息"""
self.ax_stats.axis('off')
stats_text = f"Time: {row_data['time']} | "
stats_text += f"Last Price: ¥{row_data['price']:.2f} | "
stats_text += f"Cumulative Volume: {int(row_data['cumulative_volume']):,} | "
if bid_prices and ask_prices:
best_bid = max(bid_prices)
best_ask = min(ask_prices)
spread = best_ask - best_bid
total_bid_volume = sum([row_data[f'bid{i}_volume'] for i in range(1, 6)
if pd.notna(row_data[f'bid{i}_volume'])])
total_ask_volume = sum([row_data[f'ask{i}_volume'] for i in range(1, 6)
if pd.notna(row_data[f'ask{i}_volume'])])
stats_text += f"Best Bid: ¥{best_bid:.2f} ({total_bid_volume:,}) | "
stats_text += f"Best Ask: ¥{best_ask:.2f} ({total_ask_volume:,}) | "
stats_text += f"Spread: ¥{spread:.2f}"
self.ax_stats.text(0.5, 0.5, stats_text, ha='center', va='center',
transform=self.ax_stats.transAxes, fontsize=11,
bbox=dict(boxstyle='round,pad=0.5', facecolor='lightgray', alpha=0.8))
def update_status_labels(self, row_data, bid_prices, ask_prices):
"""更新状态标签"""
self.index_label.config(text=f"Sequence: {self.current_index}")
self.time_label.config(text=f"Time: {row_data['time']}")
self.price_label.config(text=f"Last Price: ¥{row_data['price']:.2f}")
if bid_prices and ask_prices:
best_bid = max(bid_prices)
best_ask = min(ask_prices)
spread = best_ask - best_bid
self.spread_label.config(text=f"Bid-Ask Spread: ¥{spread:.2f}")
else:
self.spread_label.config(text="Bid-Ask Spread: --")
def toggle_play(self):
"""切换播放/暂停"""
if not self.is_playing:
self.is_playing = True
self.play_button.config(text="⏸ Pause")
self.play_thread = threading.Thread(target=self.playback_loop, daemon=True)
self.play_thread.start()
else:
self.is_playing = False
self.play_button.config(text="▶ Play")
def stop_playback(self):
"""停止播放"""
self.is_playing = False
self.play_button.config(text="▶ Play")
self.current_index = 0
self.update_display()
def playback_loop(self):
"""播放循环"""
while self.is_playing and self.current_index < len(self.df) - 1:
self.current_index += 1
self.root.after(0, self.update_display)
delay = self.base_speed / self.speed_multiplier
time.sleep(delay / 1000.0)
if self.current_index >= len(self.df) - 1:
self.is_playing = False
self.root.after(0, lambda: self.play_button.config(text="▶ Play"))
def step_play(self, steps):
"""步进播放"""
self.current_index = max(0, min(len(self.df) - 1, self.current_index + steps))
self.update_display()
def jump_to(self, index):
"""跳转到指定位置"""
self.current_index = max(0, min(len(self.df) - 1, index))
self.update_display()
def jump_to_entry(self):
"""跳转到输入的位置"""
try:
target = int(self.jump_entry.get())
if 0 <= target < len(self.df):
self.jump_to(target)
else:
messagebox.showwarning("Invalid Input", f"Please enter a number between 0 and {len(self.df)-1}")
except ValueError:
messagebox.showwarning("Invalid Input", "Please enter a valid number")
def update_speed(self, value):
"""更新播放速度"""
self.speed_multiplier = float(value)
self.speed_label.config(text=f"Speed: {self.speed_multiplier:.1f}x")
def set_speed(self, speed):
"""设置播放速度"""
self.speed_var.set(speed)
self.speed_multiplier = speed
self.speed_label.config(text=f"Speed: {self.speed_multiplier:.1f}x")
def run(self):
"""运行应用"""
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
self.root.mainloop()
def on_closing(self):
"""关闭处理"""
self.is_playing = False
self.root.destroy()
def main():
"""主函数"""
print("=== Unified Futures Data Player ===")
print("Initializing unified market depth visualization...")
player = UnifiedFuturesPlayer()
print("Initialization complete!")
print("Features:")
print("- Unified price axis showing bid/ask orders together")
print("- True minimum tick spacing (0.02)")
print("- Real-time spread visualization")
print("- Combined market depth analysis")
player.run()
if __name__ == "__main__":
main()