#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ AU2512期货当前成交量分布分析工具 这个脚本专门用于分析期货交易中当前成交量的分布特征, 提供深入的成交量模式分析和可视化展示。 使用方法: python volume_distribution_analysis.py [data_file] 如果不指定文件,默认分析 data/au2512_20251013.parquet """ import pandas as pd import numpy as np import matplotlib.pyplot as plt from matplotlib.gridspec import GridSpec import seaborn as sns from datetime import datetime import warnings import os import sys import argparse from pathlib import Path from scipy import stats warnings.filterwarnings('ignore') class VolumeDistributionAnalyzer: """当前成交量分布分析器""" def __init__(self, data_file=None): """ 初始化分析器 Args: data_file (str): 数据文件路径,默认为 data/au2512_20251013.parquet """ self.data_file = data_file or "data/au2512_20251013.parquet" self.df = None self.current_volume = None self.setup_chinese_font() def setup_chinese_font(self): """设置中文字体支持""" try: # 直接使用已知可用的中文字体 chinese_fonts = ['Microsoft YaHei', 'SimHei', 'SimSun', 'KaiTi', 'FangSong', 'DengXian'] for font in chinese_fonts: try: plt.rcParams['font.sans-serif'] = [font] plt.rcParams['axes.unicode_minus'] = False plt.rcParams['font.size'] = 10 plt.rcParams['axes.titlesize'] = 12 plt.rcParams['axes.labelsize'] = 10 plt.rcParams['xtick.labelsize'] = 9 plt.rcParams['ytick.labelsize'] = 9 # 简单测试字体 fig, ax = plt.subplots(figsize=(1, 1)) ax.text(0.5, 0.5, '测试', fontsize=12) plt.close(fig) print(f"[*] 成功设置中文字体: {font}") return except: continue # 如果都失败,使用默认设置 print("[!] 无法设置中文字体,使用默认字体") plt.rcParams['font.sans-serif'] = ['DejaVu Sans'] plt.rcParams['axes.unicode_minus'] = False plt.rcParams['font.size'] = 10 except Exception as e: print(f"[!] 字体设置警告: {e}") plt.rcParams['font.sans-serif'] = ['DejaVu Sans'] plt.rcParams['axes.unicode_minus'] = False plt.rcParams['font.size'] = 10 def load_data(self): """加载数据""" try: if not os.path.exists(self.data_file): raise FileNotFoundError(f"数据文件不存在: {self.data_file}") print(f"正在加载数据: {self.data_file}") # 根据文件扩展名选择读取方式 if self.data_file.endswith('.parquet'): self.df = pd.read_parquet(self.data_file) elif self.data_file.endswith('.csv'): self.df = pd.read_csv(self.data_file) else: raise ValueError("不支持的文件格式,请使用 .parquet 或 .csv 文件") print(f"数据加载成功: {len(self.df):,} 条记录") return True except Exception as e: print(f"数据加载失败: {e}") return False def prepare_volume_data(self): """准备当前成交量数据""" print("\n=== 准备成交量数据 ===") # 确保有当前成交量列 if '当前成交量' not in self.df.columns: cumulative_col = None for col in self.df.columns: if '累计' in col and '成交量' in col: cumulative_col = col break if cumulative_col is None: raise ValueError("无法找到累计成交量列") print(f"使用累计成交量列: {cumulative_col}") self.df['当前成交量'] = self.df[cumulative_col].diff().fillna(0) self.df.loc[self.df.index[0], '当前成交量'] = self.df.loc[self.df.index[0], cumulative_col] # 提取当前成交量数据 self.current_volume = self.df['当前成交量'].copy() # 移除零值和异常值(超过99.9%分位数的值) q99_9 = self.current_volume.quantile(0.999) self.current_volume_clean = self.current_volume[ (self.current_volume > 0) & (self.current_volume <= q99_9) ] print(f"原始成交量数据点: {len(self.current_volume):,}") print(f"有效成交量数据点: {len(self.current_volume_clean):,}") print(f"移除零值和异常值后: {len(self.current_volume_clean):,} 个") return True def calculate_basic_statistics(self): """计算基础统计信息""" print("\n=== 基础统计分析 ===") self.stats = { 'total_ticks': len(self.current_volume), 'valid_ticks': len(self.current_volume_clean), 'zero_volume_ticks': sum(self.current_volume == 0), 'mean_volume': self.current_volume_clean.mean(), 'median_volume': self.current_volume_clean.median(), 'std_volume': self.current_volume_clean.std(), 'min_volume': self.current_volume_clean.min(), 'max_volume': self.current_volume_clean.max(), 'q25': self.current_volume_clean.quantile(0.25), 'q75': self.current_volume_clean.quantile(0.75), 'q90': self.current_volume_clean.quantile(0.90), 'q95': self.current_volume_clean.quantile(0.95), 'q99': self.current_volume_clean.quantile(0.99) } print(f"总Tick数: {self.stats['total_ticks']:,}") print(f"有效成交量Tick数: {self.stats['valid_ticks']:,}") print(f"零成交量Tick数: {self.stats['zero_volume_ticks']:,}") print(f"平均成交量: {self.stats['mean_volume']:.2f} 手") print(f"中位数成交量: {self.stats['median_volume']:.2f} 手") print(f"成交量标准差: {self.stats['std_volume']:.2f} 手") print(f"最小成交量: {self.stats['min_volume']:.2f} 手") print(f"最大成交量: {self.stats['max_volume']:.2f} 手") print(f"25%分位数: {self.stats['q25']:.2f} 手") print(f"75%分位数: {self.stats['q75']:.2f} 手") print(f"90%分位数: {self.stats['q90']:.2f} 手") print(f"95%分位数: {self.stats['q95']:.2f} 手") print(f"99%分位数: {self.stats['q99']:.2f} 手") # 计算变异系数 cv = self.stats['std_volume'] / self.stats['mean_volume'] print(f"变异系数 (CV): {cv:.3f}") # 检查分布偏度 skewness = stats.skew(self.current_volume_clean) print(f"分布偏度: {skewness:.3f}") self.stats['cv'] = cv self.stats['skewness'] = skewness def analyze_volume_patterns(self): """分析成交量模式""" print("\n=== 成交量模式分析 ===") # 分析大单交易 large_threshold = self.stats['q90'] large_trades = self.current_volume_clean[self.current_volume_clean > large_threshold] print(f"大单交易定义: > {large_threshold:.1f} 手 (90%分位数)") print(f"大单交易次数: {len(large_trades):,}") print(f"大单交易占比: {len(large_trades)/len(self.current_volume_clean)*100:.2f}%") print(f"大单成交量总和: {large_trades.sum():,} 手") print(f"大单成交量占比: {large_trades.sum()/self.current_volume_clean.sum()*100:.2f}%") # 分析成交量聚集模式 volume_groups = pd.cut(self.current_volume_clean, bins=[0, 1, 5, 10, 20, 50, 100, float('inf')], labels=['1手', '2-5手', '6-10手', '11-20手', '21-50手', '51-100手', '100+手']) print(f"\n成交量分组统计:") for group_name, group_data in self.current_volume_clean.groupby(volume_groups): if not pd.isna(group_name): count = len(group_data) percentage = count / len(self.current_volume_clean) * 100 volume_sum = group_data.sum() volume_percentage = volume_sum / self.current_volume_clean.sum() * 100 print(f" {group_name:>6}: {count:>6,} 次 ({percentage:>5.1f}%) | " f"成交量: {volume_sum:>8,.0f} 手 ({volume_percentage:>5.1f}%)") # 存储模式分析结果 self.pattern_analysis = { 'large_threshold': large_threshold, 'large_trades_count': len(large_trades), 'large_trades_percentage': len(large_trades)/len(self.current_volume_clean)*100, 'large_volume_sum': large_trades.sum(), 'large_volume_percentage': large_trades.sum()/self.current_volume_clean.sum()*100, 'volume_groups': volume_groups, 'group_stats': {} } for group_name, group_data in self.current_volume_clean.groupby(volume_groups): if not pd.isna(group_name): self.pattern_analysis['group_stats'][group_name] = { 'count': len(group_data), 'percentage': len(group_data)/len(self.current_volume_clean)*100, 'volume_sum': group_data.sum(), 'volume_percentage': group_data.sum()/self.current_volume_clean.sum()*100 } def create_distribution_charts(self): """创建分布图表""" print("\n=== 生成成交量分布图表 ===") # 确保字体设置正确 plt.rcParams['font.sans-serif'] = plt.rcParams['font.sans-serif'] plt.rcParams['axes.unicode_minus'] = False # 创建复合图表布局 fig = plt.figure(figsize=(20, 16)) gs = GridSpec(3, 3, figure=fig, hspace=0.3, wspace=0.3) # 1. 直方图 + 概率密度图 ax1 = fig.add_subplot(gs[0, 0]) n, bins, patches = ax1.hist(self.current_volume_clean, bins=50, alpha=0.7, color='skyblue', density=True, edgecolor='black', linewidth=0.5) # 拟合正态分布 mu, sigma = self.stats['mean_volume'], self.stats['std_volume'] x = np.linspace(self.current_volume_clean.min(), self.current_volume_clean.max(), 100) ax1.plot(x, stats.norm.pdf(x, mu, sigma), 'r-', linewidth=2, label=f'正态分布拟合\n(μ={mu:.1f}, σ={sigma:.1f})') ax1.set_title('当前成交量分布直方图', fontsize=14, fontweight='bold') ax1.set_xlabel('成交量 (手)', fontsize=12) ax1.set_ylabel('概率密度', fontsize=12) ax1.legend(fontsize=10) ax1.grid(True, alpha=0.3) # 2. 箱线图 ax2 = fig.add_subplot(gs[0, 1]) boxplot_data = [self.current_volume_clean[self.current_volume_clean <= q] for q in [self.stats['q90'], self.stats['q95'], self.stats['q99']]] boxplot_labels = ['90%分位数内', '95%分位数内', '99%分位数内'] bp = ax2.boxplot(boxplot_data, labels=boxplot_labels, patch_artist=True) for patch in bp['boxes']: patch.set_facecolor('lightgreen') patch.set_alpha(0.7) ax2.set_title('当前成交量箱线图', fontsize=14, fontweight='bold') ax2.set_ylabel('成交量 (手)', fontsize=12) ax2.grid(True, alpha=0.3) # 3. 累积分布函数 ax3 = fig.add_subplot(gs[0, 2]) sorted_data = np.sort(self.current_volume_clean) cumulative = np.arange(1, len(sorted_data) + 1) / len(sorted_data) ax3.plot(sorted_data, cumulative, linewidth=2, color='purple') ax3.set_title('当前成交量累积分布函数', fontsize=14, fontweight='bold') ax3.set_xlabel('成交量 (手)', fontsize=12) ax3.set_ylabel('累积概率', fontsize=12) ax3.grid(True, alpha=0.3) ax3.axhline(y=0.5, color='red', linestyle='--', alpha=0.7, label='中位数') ax3.axhline(y=0.9, color='orange', linestyle='--', alpha=0.7, label='90%分位数') ax3.legend(fontsize=10) # 4. 成交量时间序列 ax4 = fig.add_subplot(gs[1, :]) sample_indices = np.linspace(0, len(self.current_volume)-1, 1000, dtype=int) ax4.plot(sample_indices, self.current_volume.iloc[sample_indices], linewidth=0.8, alpha=0.8, color='steelblue') ax4.fill_between(sample_indices, 0, self.current_volume.iloc[sample_indices], alpha=0.3, color='steelblue') ax4.set_title('当前成交量时间序列 (采样显示)', fontsize=14, fontweight='bold') ax4.set_xlabel('时间序列索引', fontsize=12) ax4.set_ylabel('成交量 (手)', fontsize=12) ax4.grid(True, alpha=0.3) # 添加大单标记 large_threshold = self.stats['q90'] large_indices = np.where(self.current_volume > large_threshold)[0] if len(large_indices) > 0: sample_large_indices = np.intersect1d(sample_indices, large_indices) if len(sample_large_indices) > 0: ax4.scatter(sample_large_indices, self.current_volume.iloc[sample_large_indices], color='red', s=20, alpha=0.8, zorder=5, label=f'大单交易 (>{large_threshold:.0f}手)') ax4.legend(fontsize=10) # 5. 成交量分组柱状图 ax5 = fig.add_subplot(gs[2, 0]) group_names = list(self.pattern_analysis['group_stats'].keys()) group_counts = [self.pattern_analysis['group_stats'][name]['count'] for name in group_names] colors = plt.cm.Set3(np.linspace(0, 1, len(group_names))) bars = ax5.bar(group_names, group_counts, color=colors, alpha=0.8, edgecolor='black', linewidth=0.5) # 在柱子上添加数值标签 for bar, count in zip(bars, group_counts): height = bar.get_height() ax5.text(bar.get_x() + bar.get_width()/2., height + max(group_counts)*0.01, f'{count:,}', ha='center', va='bottom', fontsize=10, fontweight='bold') ax5.set_title('成交量分组分布 (按交易笔数)', fontsize=14, fontweight='bold') ax5.set_xlabel('成交量分组', fontsize=12) ax5.set_ylabel('交易笔数', fontsize=12) ax5.grid(True, alpha=0.3, axis='y') # 旋转x轴标签以便更好地显示 plt.setp(ax5.get_xticklabels(), rotation=45, ha='right') # 6. 成交量贡献度条形图 ax6 = fig.add_subplot(gs[2, 1]) volume_contributions = [self.pattern_analysis['group_stats'][name]['volume_percentage'] for name in group_names] bars = ax6.barh(group_names, volume_contributions, color=colors, alpha=0.7) ax6.set_title('成交量贡献度 (按成交量占比)', fontsize=14, fontweight='bold') ax6.set_xlabel('成交量占比 (%)', fontsize=12) ax6.grid(True, alpha=0.3, axis='x') # 在条形上添加数值标签 for i, (bar, contribution) in enumerate(zip(bars, volume_contributions)): ax6.text(contribution + 0.5, bar.get_y() + bar.get_height()/2, f'{contribution:.1f}%', va='center', fontsize=10) # 7. 统计摘要表 ax7 = fig.add_subplot(gs[2, 2]) ax7.axis('off') # 创建统计摘要文本 summary_text = f"""成交量统计摘要 数据量: {self.stats['total_ticks']:,} Tick 有效成交量: {self.stats['valid_ticks']:,} Tick 基础统计: * 平均值: {self.stats['mean_volume']:.2f} 手 * 中位数: {self.stats['median_volume']:.2f} 手 * 标准差: {self.stats['std_volume']:.2f} 手 * 变异系数: {self.stats['cv']:.3f} * 偏度: {self.stats['skewness']:.3f} 分位数统计: * 最小值: {self.stats['min_volume']:.2f} 手 * 25%分位: {self.stats['q25']:.2f} 手 * 75%分位: {self.stats['q75']:.2f} 手 * 90%分位: {self.stats['q90']:.2f} 手 * 95%分位: {self.stats['q95']:.2f} 手 * 最大值: {self.stats['max_volume']:.2f} 手 大单交易分析: * 大单阈值: {self.pattern_analysis['large_threshold']:.1f} 手 * 大单次数: {self.pattern_analysis['large_trades_count']:,} * 大单占比: {self.pattern_analysis['large_trades_percentage']:.2f}% * 大单成交量占比: {self.pattern_analysis['large_volume_percentage']:.2f}% """ ax7.text(0.05, 0.95, summary_text, transform=ax7.transAxes, fontsize=10, va='top', ha='left', wrap=True, bbox=dict(boxstyle='round,pad=0.5', facecolor='lightyellow', alpha=0.9)) # 设置总标题 fig.suptitle('AU2512期货当前成交量分布综合分析', fontsize=18, fontweight='bold', y=0.98) # 保存图表 plt.tight_layout() chart_file = 'au2512_volume_distribution_analysis.png' plt.savefig(chart_file, dpi=300, bbox_inches='tight', facecolor='white', edgecolor='none') plt.close() print(f"成交量分布分析图表已保存: {chart_file}") return chart_file def generate_insights_report(self): """生成业务洞察报告""" print("\n=== 业务洞察报告 ===") insights = [] # 成交量分布特征分析 if self.stats['skewness'] > 1: insights.append("1. 成交量分布呈现严重的右偏特征,表明大部分交易集中在小单,大单交易较少但影响显著") elif self.stats['skewness'] > 0.5: insights.append("1. 成交量分布呈现中等程度的右偏特征,存在一定比例的大单交易") else: insights.append("1. 成交量分布相对均匀,大小单交易分布较为均衡") # 变异系数分析 if self.stats['cv'] > 2: insights.append("2. 成交量波动极大,市场交易活跃度变化剧烈,需要关注市场情绪变化") elif self.stats['cv'] > 1: insights.append("2. 成交量波动较大,市场存在明显的活跃期和沉寂期") else: insights.append("2. 成交量波动相对稳定,市场交易活跃度较为一致") # 大单交易分析 if self.pattern_analysis['large_volume_percentage'] > 50: insights.append("3. 大单交易占据主导地位,可能表明机构投资者参与度较高") elif self.pattern_analysis['large_volume_percentage'] > 30: insights.append("3. 大单交易占比较为重要,对市场流动性有显著影响") else: insights.append("3. 小单交易为主,市场参与者可能以散户为主") # 交易模式分析 dominant_group = max(self.pattern_analysis['group_stats'].items(), key=lambda x: x[1]['volume_percentage']) insights.append(f"4. 主要成交量贡献来自{dominant_group[0]}的交易,占比{dominant_group[1]['volume_percentage']:.1f}%") # 市场活跃度分析 if self.stats['mean_volume'] > 20: insights.append("5. 市场整体活跃度较高,平均单笔成交量较大") elif self.stats['mean_volume'] > 10: insights.append("5. 市场活跃度适中,交易较为频繁") else: insights.append("5. 市场活跃度相对较低,交易较为谨慎") # 风险提示 if self.stats['max_volume'] / self.stats['mean_volume'] > 50: insights.append("6. 风险提示:存在超大单交易,可能导致价格剧烈波动") # 打印所有洞察 for insight in insights: print(f" {insight}") self.insights = insights return insights def print_detailed_report(self): """打印详细分析报告""" print("\n" + "="*80) print("AU2512期货当前成交量分布分析报告") print("="*80) # 数据概况 print(f"\n【数据概况】") print(f"数据文件: {self.data_file}") print(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"总记录数: {self.stats['total_ticks']:,}") print(f"有效成交量记录: {self.stats['valid_ticks']:,}") print(f"数据完整性: {self.stats['valid_ticks']/self.stats['total_ticks']*100:.2f}%") # 基础统计 print(f"\n【基础统计】") print(f"平均成交量: {self.stats['mean_volume']:.2f} 手") print(f"中位数成交量: {self.stats['median_volume']:.2f} 手") print(f"成交量标准差: {self.stats['std_volume']:.2f} 手") print(f"变异系数: {self.stats['cv']:.3f}") print(f"分布偏度: {self.stats['skewness']:.3f}") # 分位数分析 print(f"\n【分位数分析】") percentiles = [0, 25, 50, 75, 90, 95, 99, 100] percentile_names = ['最小值', '25%分位', '中位数', '75%分位', '90%分位', '95%分位', '99%分位', '最大值'] for name, pct in zip(percentile_names, percentiles): value = self.current_volume_clean.quantile(pct/100) if pct > 0 else self.stats['min_volume'] if pct == 100: value = self.stats['max_volume'] print(f"{name:>8}: {value:>8.2f} 手") # 大单交易分析 print(f"\n【大单交易分析】") print(f"大单阈值 (90%分位数): {self.pattern_analysis['large_threshold']:.1f} 手") print(f"大单交易次数: {self.pattern_analysis['large_trades_count']:,}") print(f"大单交易占比: {self.pattern_analysis['large_trades_percentage']:.2f}%") print(f"大单成交量占比: {self.pattern_analysis['large_volume_percentage']:.2f}%") # 成交量分组分析 print(f"\n【成交量分组分析】") print(f"{'分组':>8} {'次数':>8} {'占比':>8} {'成交量':>10} {'占比':>8}") print("-" * 50) for group_name in self.pattern_analysis['group_stats']: stats = self.pattern_analysis['group_stats'][group_name] print(f"{group_name:>8} {stats['count']:>8,} {stats['percentage']:>7.1f}% " f"{stats['volume_sum']:>10,.0f} {stats['volume_percentage']:>7.1f}%") # 业务洞察 print(f"\n【业务洞察】") for insight in self.insights: print(f" {insight}") # 交易建议 print(f"\n【交易建议】") if self.stats['cv'] > 2: print(" * 建议关注市场情绪变化,适时调整交易策略") if self.pattern_analysis['large_volume_percentage'] > 40: print(" * 关注大单流向,可能预示价格趋势变化") if self.stats['mean_volume'] < 10: print(" * 市场活跃度较低,建议谨慎交易") else: print(" * 市场活跃度适中,适合正常交易策略") print(f"\n【生成的文件】") print(f" au2512_volume_distribution_analysis.png - 成交量分布综合分析图表") print(f"\n" + "="*80) print("分析完成!") print("="*80) def run_analysis(self): """运行完整分析""" print("开始AU2512期货当前成交量分布分析...") # 加载数据 if not self.load_data(): return False # 准备成交量数据 if not self.prepare_volume_data(): return False # 计算基础统计 self.calculate_basic_statistics() # 分析成交量模式 self.analyze_volume_patterns() # 创建图表 self.create_distribution_charts() # 生成业务洞察 self.generate_insights_report() # 打印详细报告 self.print_detailed_report() return True def main(): """主函数""" parser = argparse.ArgumentParser(description='AU2512期货当前成交量分布分析工具') parser.add_argument('data_file', nargs='?', default='data/au2512_20251013.parquet', help='数据文件路径 (默认: data/au2512_20251013.parquet)') parser.add_argument('--output-dir', '-o', default='.', help='输出目录 (默认: 当前目录)') args = parser.parse_args() # 检查数据文件是否存在 if not os.path.exists(args.data_file): print(f"错误: 数据文件不存在 - {args.data_file}") print("\n使用方法:") print(" python volume_distribution_analysis.py [数据文件路径]") print(f" python volume_distribution_analysis.py # 使用默认文件: {args.data_file}") sys.exit(1) # 创建分析器并运行 analyzer = VolumeDistributionAnalyzer(args.data_file) success = analyzer.run_analysis() if success: print(f"\n分析完成!图表已保存到: {args.output_dir}") else: print(f"\n分析失败!") sys.exit(1) if __name__ == "__main__": main()