1234

ExcelHome技术论坛

用户名  找回密码
 免费注册

QQ登录

只需一步,快速开始

快捷登录

帖子
EH技术汇-专业的职场技能充电站 妙哉!函数段子手趣味讲函数 Excel服务器-会Excel,做管理系统 效率神器,一键搞定繁琐工作
HR薪酬管理数字化实战 Excel 2021函数公式学习大典 Excel数据透视表实战秘技 打造核心竞争力的职场宝典
让更多数据处理,一键完成 数据工作者的案头书 免费直播课集锦 ExcelHome出品 - VBA代码宝免费下载
用ChatGPT与VBA一键搞定Excel WPS表格从入门到精通 Excel VBA经典代码实践指南
查看: 342|回复: 5

[求助] 把多表数据汇总到一个表里的代码

[复制链接]

TA的精华主题

TA的得分主题

发表于 2025-2-11 19:25 | 显示全部楼层 |阅读模式
本帖最后由 27234aa 于 2025-2-13 12:40 编辑

DT0-SM84a.7z (270.43 KB, 下载次数: 6)

  请老师们Python帮我写一个把多表数据汇总到一个表里的代码,具体要求如下:
1、读取工作簿”DT0-SM84”里的表A1、A2、b1、b2、c1
所在的行列:
AO2到AO76
CM2到CM76
CO2到CO76
CQ2到CQ76
CS2到CS76
CW2到CW76
CY2到CY76
DA2到DA76
DC2到DC76里的数据到表”质心数据汇总-DT0-SM84.csv”里,
类别(AO2~AO76)放在A列,后面8个数依序排列,如下排列(假设数据示列):
A 0.86 0.78 0.63 0.54 0.89 1.03 1.00 1.08
B 0.3 1.04 0.61 0.35 0.84 0.78 0.47 0.98
...,....................................
读取时,遇有“//”时,该列(行)停止读取。
2.当工作簿“DT0-SM84”的路径是D:\数据表\DT0-SM84.xlsx
D:\数据表\DT0-SM84\质心数据汇总-DT0-SM84.cvs.xlsx
先谢谢了!

TA的精华主题

TA的得分主题

发表于 2025-2-13 09:50 | 显示全部楼层
不用写代码就用power query,要写代码就用pandas

TA的精华主题

TA的得分主题

 楼主| 发表于 2025-2-13 12:33 | 显示全部楼层

TA的精华主题

TA的得分主题

 楼主| 发表于 2025-2-13 12:45 | 显示全部楼层
DeepSeek,给我了一个代码。但验证有些问题。其#条都是红色。请老师们看看。修改、修改!谢谢了
import pandas as pd
import os
import datetime
import csv
from openpyxl.utils import column_index_from_string

# ====================== 用户配置区域 ======================
SOURCE_FILE = r'D:\数据表\DTO-SNG4.xlsx'    # 确保文件路径存在
SHEET_NAMES = ['A1', 'A2', 'b1', 'b2', 'c1']  # 工作表需实际存在
OUTPUT_DIR = r'D:\数据表'                  # 自动创建输出目录
PROJECT_CODE = "DTO-SNG4"                # 项目标识(注意字母O和数字0)
ADD_TIMESTAMP = True                     # 输出文件添加时间戳
DEBUG_MODE = True                        # 显示详细错误信息
SPECIAL_CHARS_TO_REMOVE = [              # 完整特殊符号列表
    '△', '▲', '☆', '★',                 # 三角形/星形符号
    '|', '∣', '︱', '丨'                # 竖线及其变体
]
# ========================================================

def clean_special_characters(text):
    """五层深度字符清洗(修复所有语法错误)"""
    try:
        if pd.isna(text) or text is None:
            return ""
        
        # 第一层:基础清洗
        cleaned = str(text).strip()
        
        # 第二层:移除特定符号
        for char in SPECIAL_CHARS_TO_REMOVE:
            cleaned = cleaned.replace(char, '')
        
        # 第三层:过滤控制字符
        cleaned = "".join(
            c for c in cleaned
            if c.isprintable() and c != '\ufeff'  # 修正不可见字符过滤
        )
        
        # 第四层:处理Unicode异常
        try:
            cleaned = cleaned.encode('utf-16', 'surrogatepass').decode('utf-16')
        except UnicodeError:
            cleaned = cleaned.encode('utf-16', 'replace').decode('utf-16')
        
        # 第五层:过滤四字节字符
        cleaned = ''.join(
            c for c in cleaned
            if len(c.encode('utf-8', errors='replace')) < 4  # 修正编码参数
        )
        
        return cleaned
    except Exception as e:
        if DEBUG_MODE:
            print(f"字符清洗失败: {str(e)}")
        return ""

def get_actual_sheet_names(expected_names):
    """智能匹配实际存在的工作表(添加异常处理)"""
    try:
        with pd.ExcelFile(SOURCE_FILE, engine='openpyxl') as xls:
            actual_sheets = xls.sheet_names
        
        sheet_map = {}
        for expected in expected_names:
            expected_clean = expected.strip().lower()
            for actual in actual_sheets:
                if expected_clean == actual.strip().lower():
                    sheet_map[expected] = actual
                    break
            if expected not in sheet_map:
                print(f"&#9888;&#65039; 警告:未找到工作表 '{expected}'")
        return sheet_map
    except Exception as e:
        print(f"&#10060; Excel文件读取失败: {str(e)}")
        return {}

def validate_row_data(row_values, row_num):
    """增强型数据验证(支持动态列范围)"""
    try:
        category = clean_special_characters(row_values[0]).upper()
        if category not in {'A', 'B'}:
            return None, f"行{row_num}: 列1 无效类别 '{category}'"
        
        valid_values = []
        # 处理从CM到BK的列(包含第63列BK)
        columns_to_check = ['CM', 'CO', 'CQ', 'CS', 'CW', 'CY', 'DA', 'DC', 'BK']  # 新增BK列
        
        for col in columns_to_check:
            try:
                col_idx = column_index_from_string(col) - 1  # 转换为0-based索引
                if col_idx >= len(row_values):
                    return None, f"行{row_num}列{col}: 列不存在"
               
                raw_value = row_values[col_idx]
                cleaned_value = clean_special_characters(raw_value)
               
                # 特殊处理竖线变体
                cleaned_value = cleaned_value.replace('∣', '').replace('︱', '')
               
                try:
                    num = float(cleaned_value)
                    if not 0.3 < num < 2:
                        return None, f"行{row_num}列{col}: 数值越界 {num:.4f}"
                    valid_values.append(num)
                except ValueError:
                    bad_chars = set(raw_value) - set('0123456789.-')
                    return None, f"行{row_num}列{col}: 非法字符 {bad_chars} 在 '{raw_value}'"
            except Exception as e:
                return None, f"行{row_num}列{col}: 列处理失败 - {str(e)}"
        
        return (category, valid_values), None
    except Exception as e:
        return None, f"行{row_num}: 验证失败 - {str(e)}"

def process_sheet(sheet_name):
    """动态列处理支持(完整实现)"""
    stats = {
        'total': 0, 'success': 0, 'empty': 0,
        'invalid': 0, 'stopped': False
    }
   
    try:
        df = pd.read_excel(
            SOURCE_FILE,
            sheet_name=sheet_name,
            header=None,
            dtype=str,
            engine='openpyxl',
            converters={
                column_index_from_string('AO')-1: clean_special_characters,
                column_index_from_string('CM')-1: clean_special_characters
            }
        )
        valid_data = []
        
        for excel_row in range(2, 77, 2):
            stats['total'] += 1
            pandas_row = excel_row - 1
            current_excel_row = excel_row
            
            if pandas_row >= len(df):
                break
            
            try:
                # AO列验证
                ao_col = column_index_from_string('AO') - 1
                ao_cell = clean_special_characters(df.iloc[pandas_row, ao_col])
                if ao_cell == '//':
                    stats['stopped'] = True
                    break
                if not ao_cell:
                    stats['empty'] += 1
                    continue
            except:
                stats['empty'] += 1
                continue
            
            # 提取整行数据
            columns_to_process = ['AO', 'CM', 'CO', 'CQ', 'CS', 'CW', 'CY', 'DA', 'DC', 'BK']
            row_values = []
            for col in columns_to_process:
                try:
                    col_idx = column_index_from_string(col) - 1
                    row_values.append(df.iloc[pandas_row, col_idx])
                except:
                    row_values.append('')  # 处理不存在的列
            
            validated, error = validate_row_data(row_values, current_excel_row)
            if validated:
                valid_data.append(validated)
                stats['success'] += 1
            else:
                stats['invalid'] += 1
                if DEBUG_MODE:
                    print(f"&#10060; 无效数据:{error}")
        
        print(f"\n&#128202; 工作表 [{sheet_name}] 处理结果:")
        print(f"├─ 扫描行数:{stats['total']}")
        print(f"├─ 成功行数:{stats['success']} ({stats['success']/stats['total']:.1%})")
        print(f"├─ 空行/无效:{stats['empty'] + stats['invalid']}")
        if stats['stopped']:
            print(f"└─ 遇到终止符 '//' 提前结束")
        
        return valid_data
    except Exception as e:
        print(f"&#10060; 处理工作表 [{sheet_name}] 时发生错误:{str(e)}")
        return []

def main():
    """主处理流程(工业级健壮性)"""
    print("\n" + "="*40 + " 数据汇总开始 " + "="*40)
   
    try:
        os.makedirs(OUTPUT_DIR, exist_ok=True)
        print(f"&#128194; 输出目录已验证:{OUTPUT_DIR}")
    except Exception as e:
        print(f"&#10060; 目录创建失败:{str(e)}")
        return
   
    sheet_map = get_actual_sheet_names(SHEET_NAMES)
    if not sheet_map:
        print("&#9940; 没有有效工作表可处理")
        return
   
    all_data = []
    for expected, actual in sheet_map.items():
        print(f"\n&#128269; 正在处理:{expected} → {actual}")
        if data := process_sheet(actual):
            all_data.extend(data)
   
    if all_data:
        try:
            df = pd.DataFrame(
                all_data,
                columns=['类别', 'CM', 'CO', 'CQ', 'CS', 'CW', 'CY', 'DA', 'DC', 'BK']
            ).sort_values('类别')
            
            timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
            output_file = f'质心数据汇总-{PROJECT_CODE}-{timestamp}.csv' if ADD_TIMESTAMP \
                else f'质心数据汇总-{PROJECT_CODE}.csv'
            
            output_path = os.path.join(OUTPUT_DIR, output_file)
            
            df.to_csv(
                output_path,
                index=False,
                encoding='utf_8_sig',
                quoting=csv.QUOTE_NONNUMERIC,
                float_format='%.4f'
            )
            print(f"\n&#9989; 文件已安全保存:{output_path}")
            
            print("\n&#128203; 数据摘要:")
            print(f"总数据行:{len(df)}")
            print(f"A类样本:{len(df[df['类别']=='A'])}")
            print(f"B类样本:{len(df[df['类别']=='B'])}")
            
            if DEBUG_MODE:
                print("\n&#128300; 数据质量检查:")
                print("前5行数据样本:")
                print(df.head().to_string(index=False))
                print("\n数值分布统计:")
                print(df.iloc[:, 1:].describe().round(4))
        except Exception as e:
            print(f"&#10060; 文件保存失败:{str(e)}")
    else:
        print("\n&#9940; 未提取到有效数据")

if __name__ == '__main__':
    main()
    print("\n" + "="*40 + " 处理完成 " + "="*40)

TA的精华主题

TA的得分主题

发表于 2025-2-24 15:51 | 显示全部楼层
import os import pandas as pd from openpyxl import load_workbook  def check_paths(workbook_path, csv_path):     """     检查工作簿路径和 CSV 文件路径是否有效。     如果工作簿文件不存在,抛出 FileNotFoundError 异常;     如果 CSV 文件所在目录不存在,创建该目录。      :param workbook_path: 工作簿文件的路径     :param csv_path: CSV 文件的路径     """     if not os.path.exists(workbook_path):         raise FileNotFoundError(f"工作簿文件 {workbook_path} 不存在。")     csv_dir = os.path.dirname(csv_path)     if not os.path.exists(csv_dir):         os.makedirs(csv_dir)  def read_column(sheet, col_range):     """     从指定工作表中读取指定列范围的数据,遇到 "//" 停止读取。      :param sheet: 工作表对象     :param col_range: 列范围,格式如 'AO2:AO76'     :return: 读取到的列数据列表     """     start_col, start_row, end_col, end_row = col_range[0:2], int(col_range[2:4]), col_range[5:7], int(col_range[7:9])     col_data = []     for row in sheet.iter_rows(min_row=start_row, max_row=end_row, min_col=sheet[start_col + str(start_row)].column, max_col=sheet[end_col + str(end_row)].column):         cell_value = row[0].value         if cell_value == "//":             break         col_data.append(cell_value)     return col_data  def aggregate_data(workbook, sheet_names, column_ranges):     """     汇总多个工作表中指定列范围的数据。      :param workbook: 工作簿对象     :param sheet_names: 要读取的工作表名称列表     :param column_ranges: 要读取的列范围列表     :return: 汇总后的数据列表     """     all_data = []     for sheet_name in sheet_names:         sheet = workbook[sheet_name]         for i, col_range in enumerate(column_ranges):             col_data = read_column(sheet, col_range)             if i == 0:                 categories = col_data             else:                 for j, value in enumerate(col_data):                     if j < len(all_data):                         all_data[j].append(value)                     else:                         all_data.append([categories[j], value])     return all_data  def main():     # 定义工作簿路径和 CSV 文件路径     workbook_path = r"D:\数据表\DT0-SM84.xlsx"     csv_path = r"D:\数据表\DT0-SM84\质心数据汇总-DT0-SM84.csv"      # 检查路径有效性     check_paths(workbook_path, csv_path)      # 加载工作簿     try:         workbook = load_workbook(workbook_path)     except Exception as e:         print(f"加载工作簿时出现错误: {e}")         return      # 定义要读取的工作表和列范围     sheet_names = ['A1', 'A2', 'b1', 'b2', 'c1']     column_ranges = [         'AO2:AO76',         'CM2:CM76',         'CO2:CO76',         'CQ2:CQ76',         'CS2:CS76',         'CW2:CW76',         'CY2:CY76',         'DA2:DA76',         'DC2:DC76'     ]      # 汇总数据     all_data = aggregate_data(workbook, sheet_names, column_ranges)      # 创建 DataFrame     columns = ['类别'] + [f'数据{i}' for i in range(1, len(column_ranges))]     df = pd.DataFrame(all_data, columns=columns)      # 保存为 CSV 文件     try:         df.to_csv(csv_path, index=False)         print(f"数据已成功汇总到 {csv_path}")     except Exception as e:         print(f"保存 CSV 文件时出现错误: {e}")  if __name__ == "__main__":     main()

TA的精华主题

TA的得分主题

发表于 2025-2-24 15:51 | 显示全部楼层
import os
import pandas as pd
from openpyxl import load_workbook

def check_paths(workbook_path, csv_path):
    """
    检查工作簿路径和 CSV 文件路径是否有效。
    如果工作簿文件不存在,抛出 FileNotFoundError 异常;
    如果 CSV 文件所在目录不存在,创建该目录。

    :param workbook_path: 工作簿文件的路径
    :param csv_path: CSV 文件的路径
    """
    if not os.path.exists(workbook_path):
        raise FileNotFoundError(f"工作簿文件 {workbook_path} 不存在。")
    csv_dir = os.path.dirname(csv_path)
    if not os.path.exists(csv_dir):
        os.makedirs(csv_dir)

def read_column(sheet, col_range):
    """
    从指定工作表中读取指定列范围的数据,遇到 "//" 停止读取。

    :param sheet: 工作表对象
    :param col_range: 列范围,格式如 'AO2:AO76'
    :return: 读取到的列数据列表
    """
    start_col, start_row, end_col, end_row = col_range[0:2], int(col_range[2:4]), col_range[5:7], int(col_range[7:9])
    col_data = []
    for row in sheet.iter_rows(min_row=start_row, max_row=end_row, min_col=sheet[start_col + str(start_row)].column, max_col=sheet[end_col + str(end_row)].column):
        cell_value = row[0].value
        if cell_value == "//":
            break
        col_data.append(cell_value)
    return col_data

def aggregate_data(workbook, sheet_names, column_ranges):
    """
    汇总多个工作表中指定列范围的数据。

    :param workbook: 工作簿对象
    :param sheet_names: 要读取的工作表名称列表
    :param column_ranges: 要读取的列范围列表
    :return: 汇总后的数据列表
    """
    all_data = []
    for sheet_name in sheet_names:
        sheet = workbook[sheet_name]
        for i, col_range in enumerate(column_ranges):
            col_data = read_column(sheet, col_range)
            if i == 0:
                categories = col_data
            else:
                for j, value in enumerate(col_data):
                    if j < len(all_data):
                        all_data[j].append(value)
                    else:
                        all_data.append([categories[j], value])
    return all_data

def main():
    # 定义工作簿路径和 CSV 文件路径
    workbook_path = r"D:\数据表\DT0-SM84.xlsx"
    csv_path = r"D:\数据表\DT0-SM84\质心数据汇总-DT0-SM84.csv"

    # 检查路径有效性
    check_paths(workbook_path, csv_path)

    # 加载工作簿
    try:
        workbook = load_workbook(workbook_path)
    except Exception as e:
        print(f"加载工作簿时出现错误: {e}")
        return

    # 定义要读取的工作表和列范围
    sheet_names = ['A1', 'A2', 'b1', 'b2', 'c1']
    column_ranges = [
        'AO2:AO76',
        'CM2:CM76',
        'CO2:CO76',
        'CQ2:CQ76',
        'CS2:CS76',
        'CW2:CW76',
        'CY2:CY76',
        'DA2:DA76',
        'DC2:DC76'
    ]

    # 汇总数据
    all_data = aggregate_data(workbook, sheet_names, column_ranges)

    # 创建 DataFrame
    columns = ['类别'] + [f'数据{i}' for i in range(1, len(column_ranges))]
    df = pd.DataFrame(all_data, columns=columns)

    # 保存为 CSV 文件
    try:
        df.to_csv(csv_path, index=False)
        print(f"数据已成功汇总到 {csv_path}")
    except Exception as e:
        print(f"保存 CSV 文件时出现错误: {e}")

if __name__ == "__main__":
    main()
您需要登录后才可以回帖 登录 | 免费注册

本版积分规则

1234

手机版|关于我们|联系我们|ExcelHome

GMT+8, 2025-3-28 04:33 , Processed in 1.039388 second(s), 24 queries , Gzip On.

Powered by Discuz! X3.4

© 1999-2023 Wooffice Inc.

沪公网安备 31011702000001号 沪ICP备11019229号-2

本论坛言论纯属发表者个人意见,任何违反国家相关法律的言论,本站将协助国家相关部门追究发言者责任!     本站特聘法律顾问:李志群律师

快速回复 返回顶部 返回列表