import pandas as pd import numpy as np def add_volume_and_sequence_columns(input_file, output_file=None): """ 为期货数据添加当前成交量和数列号列 Parameters: - input_file: 输入的parquet文件路径 - output_file: 输出文件路径,如果为None则覆盖原文件 Returns: - 处理后的DataFrame """ # 读取数据 print(f"正在读取文件: {input_file}") df = pd.read_parquet(input_file) print(f"原始数据形状: {df.shape}") # 确定累计成交量列(第4列,索引为3) cumulative_volume_col = df.columns[3] # �ۻ��ɽ��� print(f"累计成交量列: {cumulative_volume_col}") # 计算当前成交量(累计成交量的差值) print("正在计算当前成交量...") df['当前成交量'] = df[cumulative_volume_col].diff() # 第一行的当前成交量设为第一行的累计成交量 df.loc[df.index[0], '当前成交量'] = df.loc[df.index[0], cumulative_volume_col] # 按累计成交量排序并添加数列号 print("正在按累计成交量排序并添加数列号...") df_sorted = df.sort_values(by=cumulative_volume_col, ascending=True).copy() df_sorted['数列号'] = range(1, len(df_sorted) + 1) # 恢复原始顺序(按时间顺序) df_final = df_sorted.sort_index().copy() # 显示添加的列信息 print("\n添加的新列信息:") print(f"当前成交量列统计:") print(f" 最小值: {df_final['当前成交量'].min()}") print(f" 最大值: {df_final['当前成交量'].max()}") print(f" 平均值: {df_final['当前成交量'].mean():.2f}") print(f" 总和: {df_final['当前成交量'].sum()}") print(f"数列号列统计:") print(f" 最小值: {df_final['数列号'].min()}") print(f" 最大值: {df_final['数列号'].max()}") # 验证计算是否正确 print("\n验证计算结果:") print(f"累计成交量最后一行: {df_final[cumulative_volume_col].iloc[-1]}") print(f"当前成交量总和: {df_final['当前成交量'].sum()}") print(f"两者是否相等: {df_final[cumulative_volume_col].iloc[-1] == df_final['当前成交量'].sum()}") # 显示前几行数据以验证 print("\n前5行数据(包含新列):") display_cols = [cumulative_volume_col, '当前成交量', '数列号'] print(df_final[display_cols].head()) # 保存文件 if output_file is None: output_file = input_file print(f"\n正在保存到文件: {output_file}") df_final.to_parquet(output_file, index=False) print("文件保存完成!") return df_final if __name__ == "__main__": # 处理au2512数据 input_file = "data/au2512_20251013.parquet" output_file = "data/au2512_20251013_updated.parquet" try: result_df = add_volume_and_sequence_columns(input_file, output_file) print(f"\n处理完成! 输出文件: {output_file}") print(f"最终数据形状: {result_df.shape}") except Exception as e: print(f"处理过程中出现错误: {e}")