#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 期货数据动态播放器 - 买卖盘深度时序可视化 基于au2512_20251013.parquet数据 功能特性: - 按数列号顺序播放买卖盘深度变化 - 支持播放/暂停/加速/减速 - 可跳转到指定数列号 - 基础播放速度500ms/数据点 """ import pandas as pd import numpy as np import matplotlib.pyplot as plt import matplotlib.animation as animation from matplotlib.widgets import Button, Slider import tkinter as tk from tkinter import ttk import threading import time from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import matplotlib.patches as mpatches from datetime import datetime import matplotlib.font_manager as fm # 设置matplotlib支持中文显示 plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS', 'DejaVu Sans'] plt.rcParams['axes.unicode_minus'] = False class FuturesDataPlayer: def __init__(self, data_path='data/au2512_20251013.parquet'): """初始化期货数据播放器""" self.data_path = data_path self.df = None self.current_index = 0 self.is_playing = False self.play_speed = 500 # 基础播放速度(毫秒) self.speed_multiplier = 1.0 # 加载数据 self.load_data() # 创建GUI界面 self.setup_gui() def load_data(self): """加载并预处理期货数据""" print("正在加载数据...") self.df = pd.read_parquet(self.data_path) # 列名映射 columns_mapping = { 'UTC': 'UTC', 'UTC.1': 'UTC_1', '时间': 'time', '累积成交量': 'cumulative_volume', '成交价': 'price', '成交额': 'amount', '买1价': 'bid1_price', '卖1价': 'ask1_price', '买1量': 'bid1_volume', '卖1量': 'ask1_volume', '买2价': 'bid2_price', '卖2价': 'ask2_price', '买2量': 'bid2_volume', '卖2量': 'ask2_volume', '买3价': 'bid3_price', '卖3价': 'ask3_price', '买3量': 'bid3_volume', '卖3量': 'ask3_volume', '买4价': 'bid4_price', '卖4价': 'ask4_price', '买4量': 'bid4_volume', '卖4量': 'ask4_volume', '买5价': 'bid5_price', '卖5价': 'ask5_price', '买5量': 'bid5_volume', '卖5量': 'ask5_volume' } # 重命名列 self.df = self.df.rename(columns=columns_mapping) # 数据清洗:去除无效的买卖盘数据 bid_cols = [f'bid{i}_price' for i in range(1, 6)] ask_cols = [f'ask{i}_price' for i in range(1, 6)] # 过滤掉买卖盘价格无效的数据 valid_mask = ( self.df[bid_cols].notnull().all(axis=1) & self.df[ask_cols].notnull().all(axis=1) ) self.df = self.df[valid_mask].reset_index(drop=True) print(f"数据加载完成!有效数据点: {len(self.df)}") print(f"时间范围: {self.df['time'].iloc[0]} - {self.df['time'].iloc[-1]}") print(f"价格范围: {self.df['price'].min():.2f} - {self.df['price'].max():.2f}") def setup_gui(self): """设置GUI界面""" # 创建主窗口 self.root = tk.Tk() self.root.title("期货数据动态播放器 - AU2512买卖盘深度分析") self.root.geometry("1400x900") # 创建主框架 main_frame = ttk.Frame(self.root) main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) # 左侧:图表区域 chart_frame = ttk.Frame(main_frame) chart_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) # 右侧:控制面板 control_frame = ttk.Frame(main_frame, width=300) control_frame.pack(side=tk.RIGHT, fill=tk.Y, padx=(10, 0)) control_frame.pack_propagate(False) # 创建matplotlib图表 self.setup_chart(chart_frame) # 创建控制面板 self.setup_controls(control_frame) # 初始化显示 self.update_display() def setup_chart(self, parent): """设置图表""" # 创建图形和轴 self.fig, (self.ax1, self.ax2) = plt.subplots(2, 1, figsize=(10, 8)) self.fig.patch.set_facecolor('#f0f0f0') # 买盘深度图(上方) self.ax1.set_title('买盘深度 (Bid Depth)', fontsize=14, fontweight='bold') self.ax1.set_xlabel('价格') self.ax1.set_ylabel('累积成交量') self.ax1.grid(True, alpha=0.3) self.ax1.set_facecolor('#ffffff') # 卖盘深度图(下方) self.ax2.set_title('卖盘深度 (Ask Depth)', fontsize=14, fontweight='bold') self.ax2.set_xlabel('价格') self.ax2.set_ylabel('累积成交量') self.ax2.grid(True, alpha=0.3) self.ax2.set_facecolor('#ffffff') plt.tight_layout() # 嵌入到tkinter self.canvas = FigureCanvasTkAgg(self.fig, parent) self.canvas.draw() self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) def setup_controls(self, parent): """设置控制面板""" # 标题 title_label = ttk.Label(parent, text="播放控制", font=('Arial', 16, 'bold')) title_label.pack(pady=10) # 当前状态显示 status_frame = ttk.LabelFrame(parent, text="当前状态", padding=10) status_frame.pack(fill=tk.X, pady=10) self.index_label = ttk.Label(status_frame, text="数列号: 0", font=('Arial', 11)) self.index_label.pack(anchor=tk.W) self.time_label = ttk.Label(status_frame, text="时间: --", font=('Arial', 11)) self.time_label.pack(anchor=tk.W) self.price_label = ttk.Label(status_frame, text="成交价: --", font=('Arial', 11)) self.price_label.pack(anchor=tk.W) self.volume_label = ttk.Label(status_frame, text="累积成交量: --", font=('Arial', 11)) self.volume_label.pack(anchor=tk.W) # 播放控制按钮 control_frame = ttk.LabelFrame(parent, text="播放控制", padding=10) control_frame.pack(fill=tk.X, pady=10) # 播放/暂停按钮 self.play_button = ttk.Button( control_frame, text="▶ 播放", command=self.toggle_play, width=20 ) self.play_button.pack(pady=5) # 停止按钮 self.stop_button = ttk.Button( control_frame, text="■ 停止", command=self.stop_playback, width=20 ) self.stop_button.pack(pady=5) # 速度控制 speed_frame = ttk.LabelFrame(parent, text="速度控制", padding=10) speed_frame.pack(fill=tk.X, pady=10) # 速度滑块 self.speed_var = tk.DoubleVar(value=1.0) speed_slider = ttk.Scale( speed_frame, from_=0.1, to=5.0, variable=self.speed_var, orient=tk.HORIZONTAL, command=self.update_speed ) speed_slider.pack(fill=tk.X, pady=5) self.speed_label = ttk.Label(speed_frame, text="播放速度: 1.0x") self.speed_label.pack() # 快速速度按钮 speed_buttons_frame = ttk.Frame(speed_frame) speed_buttons_frame.pack(pady=5) ttk.Button(speed_buttons_frame, text="0.5x", command=lambda: self.set_speed(0.5), width=8).pack(side=tk.LEFT, padx=2) ttk.Button(speed_buttons_frame, text="1x", command=lambda: self.set_speed(1.0), width=8).pack(side=tk.LEFT, padx=2) ttk.Button(speed_buttons_frame, text="2x", command=lambda: self.set_speed(2.0), width=8).pack(side=tk.LEFT, padx=2) ttk.Button(speed_buttons_frame, text="5x", command=lambda: self.set_speed(5.0), width=8).pack(side=tk.LEFT, padx=2) # 跳转控制 jump_frame = ttk.LabelFrame(parent, text="跳转控制", padding=10) jump_frame.pack(fill=tk.X, pady=10) # 数列号输入 ttk.Label(jump_frame, text="跳转到数列号:").pack(anchor=tk.W) jump_input_frame = ttk.Frame(jump_frame) jump_input_frame.pack(fill=tk.X, pady=5) self.jump_entry = ttk.Entry(jump_input_frame) self.jump_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) ttk.Button(jump_input_frame, text="跳转", command=self.jump_to_index, width=10).pack(side=tk.RIGHT, padx=(5, 0)) # 进度条 progress_frame = ttk.LabelFrame(parent, text="播放进度", padding=10) progress_frame.pack(fill=tk.X, pady=10) self.progress_var = tk.IntVar() self.progress_bar = ttk.Progressbar( progress_frame, variable=self.progress_var, maximum=len(self.df) - 1, orient=tk.HORIZONTAL ) self.progress_bar.pack(fill=tk.X, pady=5) self.progress_label = ttk.Label(progress_frame, text=f"0 / {len(self.df) - 1}") self.progress_label.pack() # 数据统计 stats_frame = ttk.LabelFrame(parent, text="数据统计", padding=10) stats_frame.pack(fill=tk.X, pady=10) stats_text = f"总数据点: {len(self.df)}\\n" stats_text += f"时间跨度: {self.df['time'].iloc[0]} - {self.df['time'].iloc[-1]}\\n" stats_text += f"价格区间: {self.df['price'].min():.2f} - {self.df['price'].max():.2f}" ttk.Label(stats_frame, text=stats_text, font=('Arial', 9)).pack(anchor=tk.W) def get_market_depth_data(self, row_data): """获取买卖盘深度数据""" bid_prices = [] bid_volumes = [] ask_prices = [] ask_volumes = [] # 收集买卖盘数据(从1到5档) for i in range(1, 6): bid_price = row_data[f'bid{i}_price'] bid_volume = row_data[f'bid{i}_volume'] ask_price = row_data[f'ask{i}_price'] ask_volume = row_data[f'ask{i}_volume'] if pd.notna(bid_price) and pd.notna(bid_volume) and bid_volume > 0: bid_prices.append(bid_price) bid_volumes.append(bid_volume) if pd.notna(ask_price) and pd.notna(ask_volume) and ask_volume > 0: ask_prices.append(ask_price) ask_volumes.append(ask_volume) return bid_prices, bid_volumes, ask_prices, ask_volumes def update_display(self): """更新显示内容""" if self.df is None or self.current_index >= len(self.df): return # 获取当前行数据 row_data = self.df.iloc[self.current_index] # 清空图表 self.ax1.clear() self.ax2.clear() # 重新设置图表属性 self.ax1.set_title('买盘深度 (Bid Depth)', fontsize=14, fontweight='bold') self.ax1.set_xlabel('价格') self.ax1.set_ylabel('成交量') self.ax1.grid(True, alpha=0.3) self.ax1.set_facecolor('#ffffff') self.ax2.set_title('卖盘深度 (Ask Depth)', fontsize=14, fontweight='bold') self.ax2.set_xlabel('价格') self.ax2.set_ylabel('成交量') self.ax2.grid(True, alpha=0.3) self.ax2.set_facecolor('#ffffff') # 获取买卖盘深度数据 bid_prices, bid_volumes, ask_prices, ask_volumes = self.get_market_depth_data(row_data) # 绘制买盘深度(绿色,从左到右递减) if bid_prices: # 按价格排序(买盘价格从高到低) bid_data = sorted(zip(bid_prices, bid_volumes), reverse=True) sorted_bid_prices, sorted_bid_volumes = zip(*bid_data) # 计算累积成交量 cumulative_bid_volumes = np.cumsum(sorted_bid_volumes) # 绘制买盘柱状图 colors_bid = plt.cm.Greens(np.linspace(0.4, 0.9, len(sorted_bid_prices))) bars1 = self.ax1.barh(range(len(sorted_bid_prices)), sorted_bid_volumes, color=colors_bid, alpha=0.8, edgecolor='darkgreen', linewidth=1) # 设置Y轴标签为价格 self.ax1.set_yticks(range(len(sorted_bid_prices))) self.ax1.set_yticklabels([f'{p:.2f}' for p in sorted_bid_prices]) # 添加成交量标签 for i, (price, volume) in enumerate(bid_data): self.ax1.text(volume + max(sorted_bid_volumes) * 0.01, i, f'{int(volume)}', va='center', ha='left', fontsize=9) # 设置X轴范围 max_bid_vol = max(sorted_bid_volumes) self.ax1.set_xlim(0, max_bid_vol * 1.3) else: self.ax1.text(0.5, 0.5, '无买盘数据', ha='center', va='center', transform=self.ax1.transAxes, fontsize=12) # 绘制卖盘深度(红色,从左到右递增) if ask_prices: # 按价格排序(卖盘价格从低到高) ask_data = sorted(zip(ask_prices, ask_volumes)) sorted_ask_prices, sorted_ask_volumes = zip(*ask_data) # 绘制卖盘柱状图 colors_ask = plt.cm.Reds(np.linspace(0.4, 0.9, len(sorted_ask_prices))) bars2 = self.ax2.barh(range(len(sorted_ask_prices)), sorted_ask_volumes, color=colors_ask, alpha=0.8, edgecolor='darkred', linewidth=1) # 设置Y轴标签为价格 self.ax2.set_yticks(range(len(sorted_ask_prices))) self.ax2.set_yticklabels([f'{p:.2f}' for p in sorted_ask_prices]) # 添加成交量标签 for i, (price, volume) in enumerate(ask_data): self.ax2.text(volume + max(sorted_ask_volumes) * 0.01, i, f'{int(volume)}', va='center', ha='left', fontsize=9) # 设置X轴范围 max_ask_vol = max(sorted_ask_volumes) self.ax2.set_xlim(0, max_ask_vol * 1.3) else: self.ax2.text(0.5, 0.5, '无卖盘数据', ha='center', va='center', transform=self.ax2.transAxes, fontsize=12) # 添加当前成交价标记 current_price = row_data['price'] self.ax1.axvline(x=current_price, color='blue', linestyle='--', alpha=0.7, label=f'成交价: {current_price:.2f}') self.ax2.axvline(x=current_price, color='blue', linestyle='--', alpha=0.7, label=f'成交价: {current_price:.2f}') self.ax1.legend(loc='upper right') self.ax2.legend(loc='upper right') plt.tight_layout() self.canvas.draw() # 更新状态标签 self.index_label.config(text=f"数列号: {self.current_index}") self.time_label.config(text=f"时间: {row_data['time']}") self.price_label.config(text=f"成交价: {current_price:.2f}") self.volume_label.config(text=f"累积成交量: {int(row_data['cumulative_volume']):,}") # 更新进度条 self.progress_var.set(self.current_index) self.progress_label.config(text=f"{self.current_index} / {len(self.df) - 1}") def toggle_play(self): """切换播放/暂停状态""" if not self.is_playing: self.is_playing = True self.play_button.config(text="⏸ 暂停") # 启动播放线程 self.play_thread = threading.Thread(target=self.playback_loop, daemon=True) self.play_thread.start() else: self.is_playing = False self.play_button.config(text="▶ 播放") def stop_playback(self): """停止播放""" self.is_playing = False self.play_button.config(text="▶ 播放") self.current_index = 0 self.update_display() def playback_loop(self): """播放循环""" while self.is_playing and self.current_index < len(self.df) - 1: self.current_index += 1 # 在主线程中更新显示 self.root.after(0, self.update_display) # 根据速度设置延迟 delay = self.play_speed / self.speed_multiplier time.sleep(delay / 1000.0) # 转换为秒 # 播放结束 if self.current_index >= len(self.df) - 1: self.is_playing = False self.root.after(0, lambda: self.play_button.config(text="▶ 播放")) def update_speed(self, value): """更新播放速度""" self.speed_multiplier = float(value) self.speed_label.config(text=f"播放速度: {self.speed_multiplier:.1f}x") def set_speed(self, speed): """设置播放速度""" self.speed_var.set(speed) self.speed_multiplier = speed self.speed_label.config(text=f"播放速度: {self.speed_multiplier:.1f}x") def jump_to_index(self): """跳转到指定数列号""" try: target_index = int(self.jump_entry.get()) if 0 <= target_index < len(self.df): self.current_index = target_index self.update_display() else: tk.messagebox.showwarning("输入错误", f"请输入0到{len(self.df)-1}之间的数列号") except ValueError: tk.messagebox.showwarning("输入错误", "请输入有效的数字") def run(self): """运行应用""" # 绑定关闭事件 self.root.protocol("WM_DELETE_WINDOW", self.on_closing) # 启动主循环 self.root.mainloop() def on_closing(self): """关闭应用时的处理""" self.is_playing = False self.root.destroy() def main(): """主函数""" print("=== 期货数据动态播放器 ===") print("正在初始化...") # 创建播放器实例 player = FuturesDataPlayer() print("初始化完成!") print("\\n使用说明:") print("- 点击'播放'按钮开始播放买卖盘深度变化") print("- 使用速度控制调整播放速度") print("- 可以跳转到指定的数列号") print("- 观察买盘(绿色)和卖盘(红色)的深度变化") # 运行应用 player.run() if __name__ == "__main__": main()