|

楼主 |
发表于 2025-2-13 12:45
|
显示全部楼层
DeepSeek,给我了一个代码。但验证有些问题。其#条都是红色。请老师们看看。修改、修改!谢谢了
import pandas as pd
import os
import datetime
import csv
from openpyxl.utils import column_index_from_string
# ====================== 用户配置区域 ======================
SOURCE_FILE = r'D:\数据表\DTO-SNG4.xlsx' # 确保文件路径存在
SHEET_NAMES = ['A1', 'A2', 'b1', 'b2', 'c1'] # 工作表需实际存在
OUTPUT_DIR = r'D:\数据表' # 自动创建输出目录
PROJECT_CODE = "DTO-SNG4" # 项目标识(注意字母O和数字0)
ADD_TIMESTAMP = True # 输出文件添加时间戳
DEBUG_MODE = True # 显示详细错误信息
SPECIAL_CHARS_TO_REMOVE = [ # 完整特殊符号列表
'△', '▲', '☆', '★', # 三角形/星形符号
'|', '∣', '︱', '丨' # 竖线及其变体
]
# ========================================================
def clean_special_characters(text):
"""五层深度字符清洗(修复所有语法错误)"""
try:
if pd.isna(text) or text is None:
return ""
# 第一层:基础清洗
cleaned = str(text).strip()
# 第二层:移除特定符号
for char in SPECIAL_CHARS_TO_REMOVE:
cleaned = cleaned.replace(char, '')
# 第三层:过滤控制字符
cleaned = "".join(
c for c in cleaned
if c.isprintable() and c != '\ufeff' # 修正不可见字符过滤
)
# 第四层:处理Unicode异常
try:
cleaned = cleaned.encode('utf-16', 'surrogatepass').decode('utf-16')
except UnicodeError:
cleaned = cleaned.encode('utf-16', 'replace').decode('utf-16')
# 第五层:过滤四字节字符
cleaned = ''.join(
c for c in cleaned
if len(c.encode('utf-8', errors='replace')) < 4 # 修正编码参数
)
return cleaned
except Exception as e:
if DEBUG_MODE:
print(f"字符清洗失败: {str(e)}")
return ""
def get_actual_sheet_names(expected_names):
"""智能匹配实际存在的工作表(添加异常处理)"""
try:
with pd.ExcelFile(SOURCE_FILE, engine='openpyxl') as xls:
actual_sheets = xls.sheet_names
sheet_map = {}
for expected in expected_names:
expected_clean = expected.strip().lower()
for actual in actual_sheets:
if expected_clean == actual.strip().lower():
sheet_map[expected] = actual
break
if expected not in sheet_map:
print(f"⚠️ 警告:未找到工作表 '{expected}'")
return sheet_map
except Exception as e:
print(f"❌ Excel文件读取失败: {str(e)}")
return {}
def validate_row_data(row_values, row_num):
"""增强型数据验证(支持动态列范围)"""
try:
category = clean_special_characters(row_values[0]).upper()
if category not in {'A', 'B'}:
return None, f"行{row_num}: 列1 无效类别 '{category}'"
valid_values = []
# 处理从CM到BK的列(包含第63列BK)
columns_to_check = ['CM', 'CO', 'CQ', 'CS', 'CW', 'CY', 'DA', 'DC', 'BK'] # 新增BK列
for col in columns_to_check:
try:
col_idx = column_index_from_string(col) - 1 # 转换为0-based索引
if col_idx >= len(row_values):
return None, f"行{row_num}列{col}: 列不存在"
raw_value = row_values[col_idx]
cleaned_value = clean_special_characters(raw_value)
# 特殊处理竖线变体
cleaned_value = cleaned_value.replace('∣', '').replace('︱', '')
try:
num = float(cleaned_value)
if not 0.3 < num < 2:
return None, f"行{row_num}列{col}: 数值越界 {num:.4f}"
valid_values.append(num)
except ValueError:
bad_chars = set(raw_value) - set('0123456789.-')
return None, f"行{row_num}列{col}: 非法字符 {bad_chars} 在 '{raw_value}'"
except Exception as e:
return None, f"行{row_num}列{col}: 列处理失败 - {str(e)}"
return (category, valid_values), None
except Exception as e:
return None, f"行{row_num}: 验证失败 - {str(e)}"
def process_sheet(sheet_name):
"""动态列处理支持(完整实现)"""
stats = {
'total': 0, 'success': 0, 'empty': 0,
'invalid': 0, 'stopped': False
}
try:
df = pd.read_excel(
SOURCE_FILE,
sheet_name=sheet_name,
header=None,
dtype=str,
engine='openpyxl',
converters={
column_index_from_string('AO')-1: clean_special_characters,
column_index_from_string('CM')-1: clean_special_characters
}
)
valid_data = []
for excel_row in range(2, 77, 2):
stats['total'] += 1
pandas_row = excel_row - 1
current_excel_row = excel_row
if pandas_row >= len(df):
break
try:
# AO列验证
ao_col = column_index_from_string('AO') - 1
ao_cell = clean_special_characters(df.iloc[pandas_row, ao_col])
if ao_cell == '//':
stats['stopped'] = True
break
if not ao_cell:
stats['empty'] += 1
continue
except:
stats['empty'] += 1
continue
# 提取整行数据
columns_to_process = ['AO', 'CM', 'CO', 'CQ', 'CS', 'CW', 'CY', 'DA', 'DC', 'BK']
row_values = []
for col in columns_to_process:
try:
col_idx = column_index_from_string(col) - 1
row_values.append(df.iloc[pandas_row, col_idx])
except:
row_values.append('') # 处理不存在的列
validated, error = validate_row_data(row_values, current_excel_row)
if validated:
valid_data.append(validated)
stats['success'] += 1
else:
stats['invalid'] += 1
if DEBUG_MODE:
print(f"❌ 无效数据:{error}")
print(f"\n📊 工作表 [{sheet_name}] 处理结果:")
print(f"├─ 扫描行数:{stats['total']}")
print(f"├─ 成功行数:{stats['success']} ({stats['success']/stats['total']:.1%})")
print(f"├─ 空行/无效:{stats['empty'] + stats['invalid']}")
if stats['stopped']:
print(f"└─ 遇到终止符 '//' 提前结束")
return valid_data
except Exception as e:
print(f"❌ 处理工作表 [{sheet_name}] 时发生错误:{str(e)}")
return []
def main():
"""主处理流程(工业级健壮性)"""
print("\n" + "="*40 + " 数据汇总开始 " + "="*40)
try:
os.makedirs(OUTPUT_DIR, exist_ok=True)
print(f"📂 输出目录已验证:{OUTPUT_DIR}")
except Exception as e:
print(f"❌ 目录创建失败:{str(e)}")
return
sheet_map = get_actual_sheet_names(SHEET_NAMES)
if not sheet_map:
print("⛔ 没有有效工作表可处理")
return
all_data = []
for expected, actual in sheet_map.items():
print(f"\n🔍 正在处理:{expected} → {actual}")
if data := process_sheet(actual):
all_data.extend(data)
if all_data:
try:
df = pd.DataFrame(
all_data,
columns=['类别', 'CM', 'CO', 'CQ', 'CS', 'CW', 'CY', 'DA', 'DC', 'BK']
).sort_values('类别')
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
output_file = f'质心数据汇总-{PROJECT_CODE}-{timestamp}.csv' if ADD_TIMESTAMP \
else f'质心数据汇总-{PROJECT_CODE}.csv'
output_path = os.path.join(OUTPUT_DIR, output_file)
df.to_csv(
output_path,
index=False,
encoding='utf_8_sig',
quoting=csv.QUOTE_NONNUMERIC,
float_format='%.4f'
)
print(f"\n✅ 文件已安全保存:{output_path}")
print("\n📋 数据摘要:")
print(f"总数据行:{len(df)}")
print(f"A类样本:{len(df[df['类别']=='A'])}")
print(f"B类样本:{len(df[df['类别']=='B'])}")
if DEBUG_MODE:
print("\n🔬 数据质量检查:")
print("前5行数据样本:")
print(df.head().to_string(index=False))
print("\n数值分布统计:")
print(df.iloc[:, 1:].describe().round(4))
except Exception as e:
print(f"❌ 文件保存失败:{str(e)}")
else:
print("\n⛔ 未提取到有效数据")
if __name__ == '__main__':
main()
print("\n" + "="*40 + " 处理完成 " + "="*40) |
|