huice/volume_price_sequence.py
Your Name e5dd5b5593 feat: 期货数据分析工具集 v2.0
## 核心功能
### 1. 成交量序列分析 (volume_price_sequence.py)
- 按累计成交量排序的价格趋势分析
- 三合一综合图表:价格序列+成交量分布+时间序列
- 关键价格水平自动标注

### 2. 成交量分布深度分析 (volume_distribution_analysis.py)
- 7种专业可视化图表
- 统计特征分析和分布拟合
- 交易模式识别和业务洞察

### 3. 大额订单分析工具集 (large_orders/)
- 买1/卖1量大单分析 (阈值99)
- 买卖挂单合计分析 (阈值200)
- 当前成交量分析 (阈值150)
- 信号抑制优化算法 (38%抑制率)

## 技术特性
- 信号抑制算法:有效减少重复信号干扰
- 多维度分析:支持多种信号类型
- 专业可视化:四宫格综合分析图
- 业务洞察:基于数据的交易建议

## 分析结果
- 卖1量大单:短期下跌,长期大幅上涨反转
- 买挂合计:各时间窗口小幅正收益
- 信号抑制:短期收益从-0.0778提升至+0.1347

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-02 15:15:53 +08:00

497 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AU2512期货成交量-成交价序列图生成工具
这个脚本专门用于生成高解释力的成交量-成交价序列图。
通过按累计成交量升序排列,清晰展示价格随交易推进的演变轨迹。
使用方法:
python volume_price_sequence.py [data_file]
如果不指定文件,默认分析 data/au2512_20251013.parquet
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
from datetime import datetime, timedelta
import pytz
import warnings
import os
import sys
import argparse
from pathlib import Path
warnings.filterwarnings('ignore')
class VolumePriceAnalyzer:
"""成交量-成交价序列分析器"""
def __init__(self, data_file=None):
"""
初始化分析器
Args:
data_file (str): 数据文件路径,默认为 data/au2512_20251013.parquet
"""
self.data_file = data_file or "data/au2512_20251013.parquet"
self.df = None
self.setup_chinese_font()
def setup_chinese_font(self):
"""设置中文字体支持"""
try:
# 尝试不同的中文字体
chinese_fonts = ['Microsoft YaHei', 'SimHei', 'SimSun', 'KaiTi', 'FangSong']
for font in chinese_fonts:
try:
plt.rcParams['font.sans-serif'] = [font]
plt.rcParams['axes.unicode_minus'] = False
# 测试字体是否可用
fig, ax = plt.subplots(figsize=(1, 1))
ax.text(0.5, 0.5, '测试', fontsize=12)
plt.close(fig)
print(f"使用中文字体: {font}")
return
except:
continue
# 如果都不行,使用默认字体
plt.rcParams['font.sans-serif'] = ['DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
print("警告: 无法加载中文字体,使用默认字体")
except Exception as e:
print(f"字体设置警告: {e}")
plt.rcParams['font.sans-serif'] = ['DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
def load_data(self):
"""加载数据"""
try:
if not os.path.exists(self.data_file):
raise FileNotFoundError(f"数据文件不存在: {self.data_file}")
print(f"正在加载数据: {self.data_file}")
# 根据文件扩展名选择读取方式
if self.data_file.endswith('.parquet'):
self.df = pd.read_parquet(self.data_file)
elif self.data_file.endswith('.csv'):
self.df = pd.read_csv(self.data_file)
else:
raise ValueError("不支持的文件格式,请使用 .parquet 或 .csv 文件")
print(f"数据加载成功: {len(self.df):,} 条记录")
return True
except Exception as e:
print(f"数据加载失败: {e}")
return False
def analyze_data(self):
"""分析数据并按成交量排序"""
print("\n=== 数据分析 ===")
# 按累计成交量升序排列
self.df = self.df.sort_values('累积成交量').reset_index(drop=True)
# 计算基础指标
self.total_volume = self.df['累积成交量'].max()
self.min_price = self.df['成交价'].min()
self.max_price = self.df['成交价'].max()
self.avg_price = self.df['成交价'].mean()
print(f"数据概况:")
print(f" 累计成交量: {self.total_volume:,}")
print(f" 价格区间: {self.min_price:.2f} - {self.max_price:.2f}")
print(f" 平均价格: {self.avg_price:.2f}")
print(f" 记录数量: {len(self.df):,}")
# 计算成交量变化
if '当前成交量' not in self.df.columns:
self.df['当前成交量'] = self.df['累积成交量'].diff().fillna(0)
self.df.loc[self.df.index[0], '当前成交量'] = self.df.loc[self.df.index[0], '累积成交量']
self.df['volume_change'] = self.df['当前成交量'] # 使用当前成交量
print(f" 平均单笔成交量: {self.df['volume_change'].mean():.1f}")
print(f" 最大单笔成交量: {self.df['volume_change'].max():.0f}")
print(f" 成交量标准差: {self.df['volume_change'].std():.1f}")
# 计算关键节点
self.calculate_key_milestones()
def calculate_key_milestones(self):
"""计算关键成交量节点"""
self.milestones = {
'起始': self.df.iloc[0],
'10%': self.df.iloc[len(self.df)//10],
'25%': self.df.iloc[len(self.df)//4],
'50%': self.df.iloc[len(self.df)//2],
'75%': self.df.iloc[3*len(self.df)//4],
'90%': self.df.iloc[9*len(self.df)//10],
'结束': self.df.iloc[-1]
}
print(f"\n关键成交量节点:")
for label, point in self.milestones.items():
print(f" {label}: 成交量 {point['累积成交量']:,} 手, 价格 {point['成交价']:.2f}")
def create_volume_price_chart(self):
"""创建成交量-成交价序列图(包含当前成交量柱状图)"""
print("\n=== 生成成交量-成交价序列图 ===")
# 创建复合图表布局:主图 + 右侧成交量分布图
fig = plt.figure(figsize=(28, 14))
gs = GridSpec(2, 3, figure=fig, width_ratios=[3, 0.1, 1], height_ratios=[3, 1],
hspace=0.1, wspace=0.05)
# 主图:成交价序列图
ax1 = fig.add_subplot(gs[0, 0])
# 右侧成交量分布图
ax_volume_dist = fig.add_subplot(gs[0, 2], sharey=ax1)
# 下方当前成交量图
ax2 = fig.add_subplot(gs[1, 0], sharex=ax1)
# 上方图表:成交价序列图
# 绘制主要序列图
ax1.plot(self.df['累积成交量'], self.df['成交价'],
linewidth=2.0, alpha=0.9, color='navy', label='成交价序列')
# 添加关键价格标注右侧成交量分布图中累计成交量最大的10个价格区间
# 使用与右侧成交量分布图相同的逻辑:按价格分组计算累计成交量
prices = self.df['成交价']
min_price = prices.min()
max_price = prices.max()
# 使用固定0.02价格间隔(与右侧图一致)
interval_size = 0.02
price_bins = np.arange(np.floor(min_price * 50) / 50,
np.ceil(max_price * 50) / 50 + interval_size,
interval_size)
bin_centers = (price_bins[:-1] + price_bins[1:]) / 2
# 确保有当前成交量列
if '当前成交量' not in self.df.columns:
self.df['当前成交量'] = self.df['累积成交量'].diff().fillna(0)
self.df.loc[self.df.index[0], '当前成交量'] = self.df.loc[self.df.index[0], '累积成交量']
# 计算每个价格分组的累计成交量(与右侧图完全一致)
volume_by_price = []
for i in range(len(price_bins) - 1):
mask = (self.df['成交价'] >= price_bins[i]) & (self.df['成交价'] < price_bins[i + 1])
volume_sum = self.df.loc[mask, '当前成交量'].sum()
volume_by_price.append(volume_sum)
volume_by_price = np.array(volume_by_price)
# 找出累计成交量最大的10个价格区间
top_indices = np.argsort(volume_by_price)[-10:] # 取最大的10个
top_volume_points = []
for idx in sorted(top_indices, reverse=True): # 按成交量降序排列
if volume_by_price[idx] > 0: # 只显示有成交量的区间
price = bin_centers[idx]
volume = volume_by_price[idx]
# 找到该价格区间内的任意一个数据点来获取累计成交量
mask = (self.df['成交价'] >= price_bins[idx]) & (self.df['成交价'] < price_bins[idx + 1])
if mask.any():
sample_point = self.df[mask].iloc[0]
top_volume_points.append({
'成交价': price,
'当前成交量': volume,
'累积成交量': sample_point['累积成交量']
})
# 获取最大累计成交量作为右边界
max_cumulative_volume = self.df['累积成交量'].max()
# 为每个关键价格添加浅绿色虚线标注
for point in top_volume_points:
price = point['成交价']
volume = point['当前成交量'] # 这是价格区间的累计成交量
cumulative_vol = point['累积成交量']
# 绘制水平虚线
ax1.axhline(y=price, color='lightgreen', linestyle='--', linewidth=1.5, alpha=0.7, zorder=1)
# 添加价格标注(左侧)
ax1.text(cumulative_vol, price, f'{price:.2f}',
fontsize=10, color='darkgreen', fontweight='bold',
ha='left', va='bottom',
bbox=dict(boxstyle='round,pad=0.2', facecolor='lightgreen', alpha=0.8))
# 添加成交量标注(右侧)
ax1.text(max_cumulative_volume, price, f'{volume:.0f}',
fontsize=9, color='darkgreen', fontweight='bold',
ha='right', va='bottom',
bbox=dict(boxstyle='round,pad=0.2', facecolor='lightyellow', alpha=0.9))
# 添加关键价格说明
ax1.text(0.02, 0.98, f'关键价格标注\n(右侧图中累计成交量前10)', transform=ax1.transAxes,
fontsize=11, ha='left', va='top', color='darkgreen', fontweight='bold',
bbox=dict(boxstyle='round,pad=0.4', facecolor='lightgreen', alpha=0.9))
# 设置上方图表标题和标签
ax1.set_title('AU2512期货成交价序列图 (按累计成交量升序排列)',
fontsize=20, fontweight='bold', pad=20)
ax1.set_ylabel('成交价', fontsize=16, labelpad=10)
ax1.grid(True, alpha=0.3, linestyle='-', linewidth=0.5)
# 添加分析结论到上方图表
final_price = self.df['成交价'].iloc[-1]
initial_price = self.df['成交价'].iloc[0]
total_change = final_price - initial_price
total_change_pct = (total_change / initial_price) * 100
conclusion_text = f'''分析结论:
• 整体价格趋势: {"上涨" if total_change > 0 else "下跌"} ({total_change:+.2f}, {total_change_pct:+.2f}%)
• 价格波动区间: {self.min_price:.2f} - {self.max_price:.2f}
• 总成交量: {self.total_volume:,}
• 总Tick数: {len(self.df):,}
• 平均每1000手成交量影响: {abs(total_change)/(self.total_volume/1000):.4f}'''
ax1.text(0.98, 0.02, conclusion_text, transform=ax1.transAxes,
fontsize=12, ha='right', va='bottom',
bbox=dict(boxstyle='round,pad=0.5', facecolor='lightgreen', alpha=0.9))
# 下方图表:当前成交量柱状图
# 确保有当前成交量列,如果没有则计算
if '当前成交量' not in self.df.columns:
self.df['当前成交量'] = self.df['累积成交量'].diff().fillna(0)
self.df.loc[self.df.index[0], '当前成交量'] = self.df.loc[self.df.index[0], '累积成交量']
# 直接绘制所有当前成交量数据点
# 使用细线图代替柱状图,因为数据点太多
ax2.plot(self.df['累积成交量'], self.df['当前成交量'],
linewidth=0.5, alpha=0.8, color='steelblue', label='当前成交量')
# 添加填充区域以增强视觉效果
ax2.fill_between(self.df['累积成交量'], 0, self.df['当前成交量'],
alpha=0.3, color='steelblue')
# 设置下方图表标题和标签
ax2.set_title('当前成交量分布 (按累计成交量)', fontsize=16, fontweight='bold', pad=10)
ax2.set_xlabel('累计成交量 (手)', fontsize=16, labelpad=10)
ax2.set_ylabel('当前成交量 (手)', fontsize=14, labelpad=10)
ax2.grid(True, alpha=0.3, linestyle='-', linewidth=0.5)
ax2.legend(fontsize=14, loc='upper right')
# 添加成交量统计信息
max_volume = self.df['当前成交量'].max()
avg_volume = self.df['当前成交量'].mean()
total_ticks = len(self.df)
volume_stats = f'''成交量统计:
• 最大单笔成交量: {max_volume:.0f}
• 平均成交量: {avg_volume:.1f}
• 总Tick数: {total_ticks:,}'''
ax2.text(0.02, 0.98, volume_stats, transform=ax2.transAxes,
fontsize=11, ha='left', va='top',
bbox=dict(boxstyle='round,pad=0.4', facecolor='lightyellow', alpha=0.9))
# 右侧图表:成交量在价格上的分布
# 创建价格分组0.02间隔)
prices = self.df['成交价']
min_price = prices.min()
max_price = prices.max()
# 使用固定0.02价格间隔
interval_size = 0.02
price_bins = np.arange(np.floor(min_price * 50) / 50,
np.ceil(max_price * 50) / 50 + interval_size,
interval_size)
bin_centers = (price_bins[:-1] + price_bins[1:]) / 2
# 计算每个价格分组的成交量
if '当前成交量' not in self.df.columns:
self.df['当前成交量'] = self.df['累积成交量'].diff().fillna(0)
self.df.loc[self.df.index[0], '当前成交量'] = self.df.loc[self.df.index[0], '累积成交量']
volume_by_price = []
for i in range(len(price_bins) - 1):
mask = (self.df['成交价'] >= price_bins[i]) & (self.df['成交价'] < price_bins[i + 1])
volume_sum = self.df.loc[mask, '当前成交量'].sum()
volume_by_price.append(volume_sum)
volume_by_price = np.array(volume_by_price)
# 绘制右侧成交量分布图(水平柱状图)
colors = plt.cm.RdYlBu_r(np.linspace(0, 1, len(bin_centers)))
bars = ax_volume_dist.barh(bin_centers, volume_by_price,
height=0.015, # 设置柱子高度
color=colors, alpha=0.8, edgecolor='black', linewidth=0.3)
# 设置右侧图表标题和标签
ax_volume_dist.set_title('成交量分布', fontsize=14, fontweight='bold', pad=10)
ax_volume_dist.set_xlabel('成交量 (手)', fontsize=12)
ax_volume_dist.grid(True, alpha=0.3, axis='x')
# 格式化x轴标签
ax_volume_dist.xaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'{x/1000:.0f}K'))
# 隐藏右侧图表的y轴刻度与主图共享
ax_volume_dist.tick_params(axis='y', which='both', left=False, right=False,
labelleft=False, labelright=False)
# 找出成交量最大的几个价格区间并标注
max_volume = volume_by_price.max()
if max_volume > 0:
max_idx = np.argmax(volume_by_price)
max_price_range = f"{price_bins[max_idx]:.2f}-{price_bins[max_idx+1]:.2f}"
ax_volume_dist.annotate(f'{max_volume:.0f}\n{max_price_range}',
xy=(max_volume, bin_centers[max_idx]),
xytext=(max_volume * 0.7, bin_centers[max_idx]),
ha='right', va='center',
fontsize=10, fontweight='bold', color='red',
bbox=dict(boxstyle='round,pad=0.3', facecolor='yellow', alpha=0.8))
# 设置两个图表的坐标轴格式
from matplotlib.ticker import FuncFormatter
formatter = FuncFormatter(lambda x, p: f'{x/1000:.0f}K')
ax1.xaxis.set_major_formatter(formatter)
ax2.xaxis.set_major_formatter(formatter)
# 设置y轴格式
ax2.yaxis.set_major_formatter(FuncFormatter(lambda x, p: f'{x/1000:.1f}K' if x >= 1000 else f'{x:.0f}'))
# 调整布局
plt.tight_layout()
plt.subplots_adjust(wspace=0.05) # 调整子图间距
# 保存图表
chart_file = 'au2512_volume_price_sequence.png'
plt.savefig(chart_file, dpi=300, bbox_inches='tight',
facecolor='white', edgecolor='none')
plt.close()
print(f"成交量-成交价序列图已保存: {chart_file}")
return chart_file
def print_detailed_analysis(self):
"""打印详细分析结果"""
print("\n" + "="*60)
print("AU2512期货成交量-成交价序列分析报告")
print("="*60)
# 基础信息
print(f"\n【基础数据】")
print(f"数据文件: {self.data_file}")
print(f"记录数量: {len(self.df):,}")
print(f"累计成交量: {self.total_volume:,}")
# 价格分析
print(f"\n【价格分析】")
print(f"起始价格: {self.df['成交价'].iloc[0]:.2f}")
print(f"结束价格: {self.df['成交价'].iloc[-1]:.2f}")
print(f"最高价格: {self.max_price:.2f}")
print(f"最低价格: {self.min_price:.2f}")
print(f"平均价格: {self.avg_price:.2f}")
print(f"价格波动: {(self.max_price - self.min_price):.2f}")
# 成交量分析
print(f"\n【成交量分析】")
print(f"平均单笔成交量: {self.df['volume_change'].mean():.1f}")
print(f"最大单笔成交量: {self.df['volume_change'].max():.0f}")
print(f"成交量标准差: {self.df['volume_change'].std():.1f}")
# 价格变化分析
final_price = self.df['成交价'].iloc[-1]
initial_price = self.df['成交价'].iloc[0]
total_change = final_price - initial_price
total_change_pct = (total_change / initial_price) * 100
print(f"\n【价格变化分析】")
print(f"总体变化: {total_change:+.2f} ({total_change_pct:+.2f}%)")
print(f"最大涨幅: {self.max_price - initial_price:+.2f}")
print(f"最大跌幅: {self.min_price - initial_price:+.2f}")
# 关键节点分析
print(f"\n【关键节点分析】")
for label, point in self.milestones.items():
if label != '起始':
prev_point = self.milestones.get('起始', self.df.iloc[0])
if label in ['10%', '25%', '50%', '75%', '90%', '结束']:
price_change = point['成交价'] - prev_point['成交价']
price_change_pct = (price_change / prev_point['成交价']) * 100
volume_added = point['累积成交量'] - prev_point['累积成交量']
print(f" {label:>6}: 价格 {point['成交价']:7.2f} ({price_change_pct:+6.2f}%) | "
f"成交量 +{volume_added:7,}")
print(f"\n【数据质量】")
missing_values = self.df.isnull().sum().sum()
duplicates = self.df.duplicated().sum()
print(f"缺失值: {missing_values}")
print(f"重复记录: {duplicates}")
print(f"数据完整性: {'优秀' if missing_values == 0 else '需要关注'}")
print(f"\n【业务洞察】")
print(f"1. 价格随成交量变化的完整轨迹清晰可见")
print(f"2. 整体趋势: {'上涨' if total_change > 0 else '下跌'} ({total_change_pct:+.2f}%)")
print(f"3. 交易过程中价格呈现 {'逐步上涨' if total_change > 0 else '逐步下跌'} 趋势")
print(f"4. 关键成交量节点对价格的影响显著")
print(f"\n【生成的图表】")
print(f" au2512_volume_price_sequence.png - 成交量-成交价序列图(含当前成交量柱状图)")
print(f"\n" + "="*60)
print("分析完成!")
print("="*60)
def run_analysis(self):
"""运行完整分析"""
print("开始AU2512期货成交量-成交价序列分析...")
# 加载数据
if not self.load_data():
return False
# 分析数据
self.analyze_data()
# 创建图表
self.create_volume_price_chart()
# 打印详细分析
self.print_detailed_analysis()
return True
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='AU2512期货成交量-成交价序列分析工具')
parser.add_argument('data_file', nargs='?',
default='data/au2512_20251013.parquet',
help='数据文件路径 (默认: data/au2512_20251013.parquet)')
parser.add_argument('--output-dir', '-o', default='.',
help='输出目录 (默认: 当前目录)')
args = parser.parse_args()
# 检查数据文件是否存在
if not os.path.exists(args.data_file):
print(f"错误: 数据文件不存在 - {args.data_file}")
print("\n使用方法:")
print(" python volume_price_sequence.py [数据文件路径]")
print(f" python volume_price_sequence.py # 使用默认文件: {args.data_file}")
sys.exit(1)
# 创建分析器并运行
analyzer = VolumePriceAnalyzer(args.data_file)
success = analyzer.run_analysis()
if success:
print(f"\n分析完成!图表已保存到: {args.output_dir}")
else:
print(f"\n分析失败!")
sys.exit(1)
if __name__ == "__main__":
main()