huice/futures_player_enhanced.py
Your Name 7f4f88e853 feat: 添加期货数据播放器及相关测试和文档
新增期货数据动态播放器功能,包括基础版和增强版实现,添加测试脚本和详细文档说明。主要变更包括:
1. 实现买卖盘深度可视化播放功能
2. 添加播放控制、速度调节和跳转功能
3. 提供统一价格轴显示优化版本
4. 添加测试脚本验证功能
5. 编写详细使用文档和README说明
2025-11-02 23:57:10 +08:00

539 lines
21 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Enhanced Futures Data Player - Market Depth Time Series Visualization
Based on au2512_20251013.parquet data
Features:
- Play market depth changes by sequence number
- Support play/pause/speed up/slow down
- Jump to specific sequence number
- Base playback speed 500ms/data point
- English interface to avoid font issues
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import tkinter as tk
from tkinter import ttk, messagebox
import threading
import time
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk
from matplotlib.figure import Figure
import matplotlib.gridspec as gridspec
class EnhancedFuturesPlayer:
def __init__(self, data_path='data/au2512_20251013.parquet'):
"""初始化增强版期货数据播放器"""
self.data_path = data_path
self.df = None
self.current_index = 0
self.is_playing = False
self.base_speed = 500 # Base speed in milliseconds
self.speed_multiplier = 1.0
self.play_thread = None
# Load and prepare data
self.load_data()
# Create GUI
self.setup_gui()
# Initial display
self.update_display()
def load_data(self):
"""加载并预处理期货数据"""
print("Loading futures data...")
self.df = pd.read_parquet(self.data_path)
# Column mapping
columns_mapping = {
'UTC': 'UTC', 'UTC.1': 'UTC_1', '时间': 'time',
'累积成交量': 'cumulative_volume', '成交价': 'price', '成交额': 'amount',
'买1价': 'bid1_price', '卖1价': 'ask1_price',
'买1量': 'bid1_volume', '卖1量': 'ask1_volume',
'买2价': 'bid2_price', '卖2价': 'ask2_price',
'买2量': 'bid2_volume', '卖2量': 'ask2_volume',
'买3价': 'bid3_price', '卖3价': 'ask3_price',
'买3量': 'bid3_volume', '卖3量': 'ask3_volume',
'买4价': 'bid4_price', '卖4价': 'ask4_price',
'买4量': 'bid4_volume', '卖4量': 'ask4_volume',
'买5价': 'bid5_price', '卖5价': 'ask5_price',
'买5量': 'bid5_volume', '卖5量': 'ask5_volume'
}
self.df = self.df.rename(columns=columns_mapping)
# Clean data: remove invalid market depth data
bid_cols = [f'bid{i}_price' for i in range(1, 6)]
ask_cols = [f'ask{i}_price' for i in range(1, 6)]
bid_vol_cols = [f'bid{i}_volume' for i in range(1, 6)]
ask_vol_cols = [f'ask{i}_volume' for i in range(1, 6)]
# Filter out rows with invalid bid/ask data
valid_mask = (
self.df[bid_cols].notnull().all(axis=1) &
self.df[ask_cols].notnull().all(axis=1) &
(self.df[bid_vol_cols] > 0).any(axis=1) &
(self.df[ask_vol_cols] > 0).any(axis=1)
)
self.df = self.df[valid_mask].reset_index(drop=True)
print(f"Data loaded successfully! Valid data points: {len(self.df)}")
print(f"Time range: {self.df['time'].iloc[0]} - {self.df['time'].iloc[-1]}")
print(f"Price range: {self.df['price'].min():.2f} - {self.df['price'].max():.2f}")
def setup_gui(self):
"""设置GUI界面"""
self.root = tk.Tk()
self.root.title("Enhanced Futures Data Player - AU2512 Market Depth Analysis")
self.root.geometry("1600x900")
# Create main container with paned window
main_paned = ttk.PanedWindow(self.root, orient=tk.HORIZONTAL)
main_paned.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# Left frame for charts
chart_frame = ttk.Frame(main_paned)
main_paned.add(chart_frame, weight=3)
# Right frame for controls
control_frame = ttk.Frame(main_paned, width=400)
main_paned.add(control_frame, weight=1)
# Setup charts
self.setup_charts(chart_frame)
# Setup controls
self.setup_controls(control_frame)
def setup_charts(self, parent):
"""设置图表区域"""
# Create figure with custom layout
self.fig = Figure(figsize=(12, 8), dpi=80)
gs = gridspec.GridSpec(3, 2, figure=self.fig, height_ratios=[1, 1, 1], width_ratios=[1, 1])
# Create subplots
self.ax_bid = self.fig.add_subplot(gs[0, :]) # Bid depth (top, spanning both columns)
self.ax_ask = self.fig.add_subplot(gs[1, :]) # Ask depth (middle, spanning both columns)
self.ax_price = self.fig.add_subplot(gs[2, 0]) # Price trend (bottom left)
self.ax_volume = self.fig.add_subplot(gs[2, 1]) # Volume (bottom right)
# Set titles and labels
self.ax_bid.set_title('Bid Market Depth', fontsize=14, fontweight='bold', color='darkgreen')
self.ax_bid.set_xlabel('Volume')
self.ax_bid.set_ylabel('Price')
self.ax_bid.grid(True, alpha=0.3)
self.ax_ask.set_title('Ask Market Depth', fontsize=14, fontweight='bold', color='darkred')
self.ax_ask.set_xlabel('Volume')
self.ax_ask.set_ylabel('Price')
self.ax_ask.grid(True, alpha=0.3)
self.ax_price.set_title('Price Trend', fontsize=12, fontweight='bold')
self.ax_price.set_xlabel('Data Points')
self.ax_price.set_ylabel('Price')
self.ax_price.grid(True, alpha=0.3)
self.ax_volume.set_title('Volume Profile', fontsize=12, fontweight='bold')
self.ax_volume.set_xlabel('Data Points')
self.ax_volume.set_ylabel('Cumulative Volume')
self.ax_volume.grid(True, alpha=0.3)
self.fig.tight_layout()
# Embed in tkinter
canvas_frame = ttk.Frame(parent)
canvas_frame.pack(fill=tk.BOTH, expand=True)
self.canvas = FigureCanvasTkAgg(self.fig, canvas_frame)
self.canvas.draw()
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
# Add toolbar
toolbar = NavigationToolbar2Tk(self.canvas, canvas_frame)
toolbar.update()
def setup_controls(self, parent):
"""设置控制面板"""
# Title
title_label = ttk.Label(parent, text="Playback Controls", font=('Arial', 16, 'bold'))
title_label.pack(pady=10)
# Current status frame
status_frame = ttk.LabelFrame(parent, text="Current Status", padding=10)
status_frame.pack(fill=tk.X, pady=10)
self.status_labels = {}
status_items = [
('index', 'Sequence: 0'),
('time', 'Time: --'),
('price', 'Price: --'),
('volume', 'Cumulative Volume: --'),
('spread', 'Bid-Ask Spread: --')
]
for key, text in status_items:
label = ttk.Label(status_frame, text=text, font=('Arial', 10))
label.pack(anchor=tk.W, pady=2)
self.status_labels[key] = label
# Playback controls
control_frame = ttk.LabelFrame(parent, text="Playback Controls", padding=10)
control_frame.pack(fill=tk.X, pady=10)
# Play/Pause button
self.play_button = ttk.Button(
control_frame,
text="▶ Play",
command=self.toggle_playback,
width=25
)
self.play_button.pack(pady=5)
# Stop button
self.stop_button = ttk.Button(
control_frame,
text="■ Stop",
command=self.stop_playback,
width=25
)
self.stop_button.pack(pady=5)
# Step buttons
step_frame = ttk.Frame(control_frame)
step_frame.pack(pady=5)
ttk.Button(step_frame, text="◀◀ Step -10", command=lambda: self.step_backward(10), width=12).pack(side=tk.LEFT, padx=2)
ttk.Button(step_frame, text="◀ Step -1", command=lambda: self.step_backward(1), width=12).pack(side=tk.LEFT, padx=2)
step_frame2 = ttk.Frame(control_frame)
step_frame2.pack(pady=5)
ttk.Button(step_frame2, text="Step +1 ▶", command=lambda: self.step_forward(1), width=12).pack(side=tk.LEFT, padx=2)
ttk.Button(step_frame2, text="Step +10 ▶▶", command=lambda: self.step_forward(10), width=12).pack(side=tk.LEFT, padx=2)
# Speed control
speed_frame = ttk.LabelFrame(parent, text="Speed Control", padding=10)
speed_frame.pack(fill=tk.X, pady=10)
# Speed slider
self.speed_var = tk.DoubleVar(value=1.0)
speed_slider = ttk.Scale(
speed_frame,
from_=0.1,
to=10.0,
variable=self.speed_var,
orient=tk.HORIZONTAL,
command=self.update_speed
)
speed_slider.pack(fill=tk.X, pady=5)
self.speed_label = ttk.Label(speed_frame, text="Speed: 1.0x", font=('Arial', 11))
self.speed_label.pack()
# Quick speed buttons
speed_buttons = ttk.Frame(speed_frame)
speed_buttons.pack(pady=5)
speeds = [(0.25, '0.25x'), (0.5, '0.5x'), (1.0, '1x'), (2.0, '2x'), (5.0, '5x'), (10.0, '10x')]
for speed, text in speeds:
ttk.Button(speed_buttons, text=text, command=lambda s=speed: self.set_speed(s), width=6).pack(side=tk.LEFT, padx=2)
# Jump control
jump_frame = ttk.LabelFrame(parent, text="Jump Control", padding=10)
jump_frame.pack(fill=tk.X, pady=10)
ttk.Label(jump_frame, text="Jump to sequence:", font=('Arial', 10)).pack(anchor=tk.W)
jump_input_frame = ttk.Frame(jump_frame)
jump_input_frame.pack(fill=tk.X, pady=5)
self.jump_entry = ttk.Entry(jump_input_frame, font=('Arial', 10))
self.jump_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
ttk.Button(jump_input_frame, text="Jump", command=self.jump_to_index, width=10).pack(side=tk.RIGHT, padx=(5, 0))
# Quick jump buttons
quick_jump_frame = ttk.Frame(jump_frame)
quick_jump_frame.pack(pady=5)
jumps = [(0, 'Start'), (len(self.df)//4, '25%'), (len(self.df)//2, '50%'),
(3*len(self.df)//4, '75%'), (len(self.df)-1, 'End')]
for idx, text in jumps:
ttk.Button(quick_jump_frame, text=text, command=lambda i=idx: self.jump_to_index_direct(i), width=8).pack(side=tk.LEFT, padx=2)
# Progress
progress_frame = ttk.LabelFrame(parent, text="Progress", padding=10)
progress_frame.pack(fill=tk.X, pady=10)
self.progress_var = tk.IntVar()
self.progress_bar = ttk.Progressbar(
progress_frame,
variable=self.progress_var,
maximum=len(self.df) - 1,
orient=tk.HORIZONTAL
)
self.progress_bar.pack(fill=tk.X, pady=5)
self.progress_label = ttk.Label(progress_frame, text=f"0 / {len(self.df) - 1}", font=('Arial', 10))
self.progress_label.pack()
# Data statistics
stats_frame = ttk.LabelFrame(parent, text="Data Statistics", padding=10)
stats_frame.pack(fill=tk.X, pady=10)
stats_text = f"Total Data Points: {len(self.df):,}\\n"
stats_text += f"Time Span: {self.df['time'].iloc[0]} - {self.df['time'].iloc[-1]}\\n"
stats_text += f"Price Range: ${self.df['price'].min():.2f} - ${self.df['price'].max():.2f}\\n"
stats_text += f"Price Change: ${self.df['price'].iloc[-1] - self.df['price'].iloc[0]:+.2f}"
ttk.Label(stats_frame, text=stats_text, font=('Arial', 9), justify=tk.LEFT).pack(anchor=tk.W)
def get_market_depth_data(self, row_data):
"""获取买卖盘深度数据"""
bid_prices = []
bid_volumes = []
ask_prices = []
ask_volumes = []
for i in range(1, 6):
bid_price = row_data[f'bid{i}_price']
bid_volume = row_data[f'bid{i}_volume']
ask_price = row_data[f'ask{i}_price']
ask_volume = row_data[f'ask{i}_volume']
if pd.notna(bid_price) and pd.notna(bid_volume) and bid_volume > 0:
bid_prices.append(bid_price)
bid_volumes.append(bid_volume)
if pd.notna(ask_price) and pd.notna(ask_volume) and ask_volume > 0:
ask_prices.append(ask_price)
ask_volumes.append(ask_volume)
return bid_prices, bid_volumes, ask_prices, ask_volumes
def update_display(self):
"""更新显示内容"""
if self.df is None or self.current_index >= len(self.df):
return
row_data = self.df.iloc[self.current_index]
# Clear all axes
self.ax_bid.clear()
self.ax_ask.clear()
self.ax_price.clear()
self.ax_volume.clear()
# Get market depth data
bid_prices, bid_volumes, ask_prices, ask_volumes = self.get_market_depth_data(row_data)
# Plot bid depth (horizontal bars)
if bid_prices:
bid_data = sorted(zip(bid_prices, bid_volumes), reverse=True)
sorted_bid_prices, sorted_bid_volumes = zip(*bid_data)
cumulative_bid_volumes = np.cumsum(sorted_bid_volumes)
# Create color gradient
colors = plt.cm.Greens(np.linspace(0.3, 0.9, len(sorted_bid_prices)))
bars = self.ax_bid.barh(range(len(sorted_bid_prices)), sorted_bid_volumes,
color=colors, alpha=0.8, edgecolor='darkgreen', linewidth=1.5)
self.ax_bid.set_yticks(range(len(sorted_bid_prices)))
self.ax_bid.set_yticklabels([f'${p:.2f}' for p in sorted_bid_prices])
self.ax_bid.set_xlabel('Volume')
self.ax_bid.set_title('Bid Market Depth', fontsize=14, fontweight='bold', color='darkgreen')
self.ax_bid.grid(True, alpha=0.3, axis='x')
# Add volume labels
for i, (price, volume) in enumerate(bid_data):
self.ax_bid.text(volume + max(sorted_bid_volumes) * 0.02, i, f'{int(volume):,}',
va='center', ha='left', fontsize=9, fontweight='bold')
max_bid_vol = max(sorted_bid_volumes)
self.ax_bid.set_xlim(0, max_bid_vol * 1.3)
# Plot ask depth (horizontal bars)
if ask_prices:
ask_data = sorted(zip(ask_prices, ask_volumes))
sorted_ask_prices, sorted_ask_volumes = zip(*ask_data)
cumulative_ask_volumes = np.cumsum(sorted_ask_volumes)
colors = plt.cm.Reds(np.linspace(0.3, 0.9, len(sorted_ask_prices)))
bars = self.ax_ask.barh(range(len(sorted_ask_prices)), sorted_ask_volumes,
color=colors, alpha=0.8, edgecolor='darkred', linewidth=1.5)
self.ax_ask.set_yticks(range(len(sorted_ask_prices)))
self.ax_ask.set_yticklabels([f'${p:.2f}' for p in sorted_ask_prices])
self.ax_ask.set_xlabel('Volume')
self.ax_ask.set_title('Ask Market Depth', fontsize=14, fontweight='bold', color='darkred')
self.ax_ask.grid(True, alpha=0.3, axis='x')
# Add volume labels
for i, (price, volume) in enumerate(ask_data):
self.ax_ask.text(volume + max(sorted_ask_volumes) * 0.02, i, f'{int(volume):,}',
va='center', ha='left', fontsize=9, fontweight='bold')
max_ask_vol = max(sorted_ask_volumes)
self.ax_ask.set_xlim(0, max_ask_vol * 1.3)
# Plot price trend (last 100 points)
window = min(100, self.current_index + 1)
start_idx = max(0, self.current_index - window + 1)
price_slice = self.df['price'].iloc[start_idx:self.current_index + 1]
self.ax_price.plot(range(len(price_slice)), price_slice, 'b-', linewidth=2, label='Price')
self.ax_price.scatter(len(price_slice) - 1, price_slice.iloc[-1], color='red', s=100, zorder=5, label='Current')
self.ax_price.set_title('Price Trend (Recent 100 points)', fontsize=12, fontweight='bold')
self.ax_price.set_xlabel('Data Points')
self.ax_price.set_ylabel('Price ($)')
self.ax_price.grid(True, alpha=0.3)
self.ax_price.legend()
# Plot volume profile
volume_slice = self.df['cumulative_volume'].iloc[start_idx:self.current_index + 1]
self.ax_volume.plot(range(len(volume_slice)), volume_slice, 'g-', linewidth=2)
self.ax_volume.scatter(len(volume_slice) - 1, volume_slice.iloc[-1], color='red', s=100, zorder=5)
self.ax_volume.set_title('Cumulative Volume', fontsize=12, fontweight='bold')
self.ax_volume.set_xlabel('Data Points')
self.ax_volume.set_ylabel('Volume')
self.ax_volume.grid(True, alpha=0.3)
# Add current price line to bid/ask charts
current_price = row_data['price']
if bid_prices:
self.ax_bid.axvline(x=current_price, color='blue', linestyle='--', alpha=0.7, linewidth=2)
if ask_prices:
self.ax_ask.axvline(x=current_price, color='blue', linestyle='--', alpha=0.7, linewidth=2)
self.fig.tight_layout()
self.canvas.draw()
# Update status labels
self.status_labels['index'].config(text=f"Sequence: {self.current_index:,} / {len(self.df)-1:,}")
self.status_labels['time'].config(text=f"Time: {row_data['time']}")
self.status_labels['price'].config(text=f"Price: ${current_price:.2f}")
self.status_labels['volume'].config(text=f"Cumulative Volume: {int(row_data['cumulative_volume']):,}")
# Calculate and display spread
if bid_prices and ask_prices:
best_bid = max(bid_prices)
best_ask = min(ask_prices)
spread = best_ask - best_bid
spread_pct = (spread / current_price) * 100
self.status_labels['spread'].config(text=f"Bid-Ask Spread: ${spread:.2f} ({spread_pct:.3f}%)")
else:
self.status_labels['spread'].config(text="Bid-Ask Spread: N/A")
# Update progress
self.progress_var.set(self.current_index)
self.progress_label.config(text=f"{self.current_index:,} / {len(self.df)-1:,}")
def toggle_playback(self):
"""切换播放状态"""
if not self.is_playing:
self.is_playing = True
self.play_button.config(text="⏸ Pause")
self.play_thread = threading.Thread(target=self.playback_loop, daemon=True)
self.play_thread.start()
else:
self.is_playing = False
self.play_button.config(text="▶ Play")
def stop_playback(self):
"""停止播放"""
self.is_playing = False
self.play_button.config(text="▶ Play")
self.current_index = 0
self.update_display()
def playback_loop(self):
"""播放循环"""
while self.is_playing and self.current_index < len(self.df) - 1:
self.current_index += 1
self.root.after(0, self.update_display)
delay = self.base_speed / self.speed_multiplier
time.sleep(delay / 1000.0)
if self.current_index >= len(self.df) - 1:
self.is_playing = False
self.root.after(0, lambda: self.play_button.config(text="▶ Play"))
def step_forward(self, steps=1):
"""向前步进"""
new_index = min(self.current_index + steps, len(self.df) - 1)
self.current_index = new_index
self.update_display()
def step_backward(self, steps=1):
"""向后步进"""
new_index = max(self.current_index - steps, 0)
self.current_index = new_index
self.update_display()
def update_speed(self, value):
"""更新播放速度"""
self.speed_multiplier = float(value)
self.speed_label.config(text=f"Speed: {self.speed_multiplier:.1f}x")
def set_speed(self, speed):
"""设置播放速度"""
self.speed_var.set(speed)
self.speed_multiplier = speed
self.speed_label.config(text=f"Speed: {self.speed_multiplier:.1f}x")
def jump_to_index(self):
"""跳转到指定索引"""
try:
target_index = int(self.jump_entry.get())
if 0 <= target_index < len(self.df):
self.current_index = target_index
self.update_display()
self.jump_entry.delete(0, tk.END)
else:
messagebox.showwarning("Invalid Input", f"Please enter a number between 0 and {len(self.df)-1}")
except ValueError:
messagebox.showwarning("Invalid Input", "Please enter a valid number")
def jump_to_index_direct(self, index):
"""直接跳转到指定索引"""
self.current_index = index
self.update_display()
def run(self):
"""运行应用"""
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
self.root.mainloop()
def on_closing(self):
"""关闭处理"""
self.is_playing = False
self.root.destroy()
def main():
"""主函数"""
print("=== Enhanced Futures Data Player ===")
print("Initializing...")
player = EnhancedFuturesPlayer()
print("Initialization complete!")
print("\\nFeatures:")
print("- Play/pause market depth changes")
print("- Adjustable playback speed (0.1x to 10x)")
print("- Step forward/backward controls")
print("- Jump to specific sequence")
print("- Real-time bid-ask spread display")
print("- Price trend and volume charts")
player.run()
if __name__ == "__main__":
main()