boxmoe_header_banner_img

Hello!

加载中

文章导读

基于Python的Excel表格自动汇总对比脚本


avatar
genan 2025-10-08 111

(注:此脚本由AI生成)

功能介绍:

  • 对表格中的数据进行排序汇总:表格中有家店的多个日期的销售金额数据,此脚本会将数据按照日期排列,然后将相同门店的数据汇总在一起,汇总方式有“求和‘、”最大值“、”最小值’等。
  • 数据对比功能:脚本运行后可以导入一个或者两个excel表,导入两个excel表后,可以实现数据之间的比对,首先选择”比对基准列“,以此为依据可使两张表的数据对上,然后再选择”比对列“,开始对比数据。
  • 在脚本的下方可以看到”汇总结果“、’对比结果‘和’差异详情‘。还可导出结果。

运行脚本前,你可能需要的包:

import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import pandas as pd
from pandas import ExcelFile
import os

以下是完整代码,调整好编译环境和所需要的包后,可直接运行使用:

import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import pandas as pd
import numpy as np
from pandas import ExcelFile
import os


class ExcelGroupAnalyzer:
    def __init__(self, root):
        self.root = root
        self.root.title("Excel数据分组汇总与比对工具")
        self.root.geometry("1200x800")

        self.file_path = None
        self.compare_file_path = None
        self.sheet_names = []
        self.compare_sheet_names = []
        self.current_df = None
        self.compare_df = None
        self.current_sheet = None
        self.compare_sheet = None

        self.setup_ui()

    def setup_ui(self):
        """设置用户界面"""
        # 主框架
        main_frame = ttk.Frame(self.root, padding="10")
        main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))

        # 文件选择区域
        file_frame = ttk.LabelFrame(main_frame, text="文件选择", padding="5")
        file_frame.grid(row=0, column=0, columnspan=4, sticky=(tk.W, tk.E), pady=5)

        ttk.Button(file_frame, text="选择主Excel文件", command=self.load_file).grid(row=0, column=0, padx=5)
        self.file_label = ttk.Label(file_frame, text="未选择文件")
        self.file_label.grid(row=0, column=1, padx=5)

        ttk.Button(file_frame, text="选择比对Excel文件", command=self.load_compare_file).grid(row=0, column=2, padx=5)
        self.compare_file_label = ttk.Label(file_frame, text="未选择比对文件")
        self.compare_file_label.grid(row=0, column=3, padx=5)

        # 工作表选择区域
        sheet_frame = ttk.LabelFrame(main_frame, text="工作表选择", padding="5")
        sheet_frame.grid(row=1, column=0, columnspan=4, sticky=(tk.W, tk.E), pady=5)

        ttk.Label(sheet_frame, text="主工作表:").grid(row=0, column=0, padx=5)
        self.sheet_combo = ttk.Combobox(sheet_frame, state="readonly", width=20)
        self.sheet_combo.grid(row=0, column=1, padx=5)
        self.sheet_combo.bind('<<ComboboxSelected>>', self.on_sheet_select)

        ttk.Label(sheet_frame, text="比对工作表:").grid(row=0, column=2, padx=5)
        self.compare_sheet_combo = ttk.Combobox(sheet_frame, state="readonly", width=20)
        self.compare_sheet_combo.grid(row=0, column=3, padx=5)
        self.compare_sheet_combo.bind('<<ComboboxSelected>>', self.on_compare_sheet_select)

        # 数据显示区域(使用Notebook实现标签页)
        self.notebook = ttk.Notebook(main_frame)
        self.notebook.grid(row=2, column=0, columnspan=4, sticky=(tk.W, tk.E, tk.N, tk.S), pady=5)

        # 主数据标签页
        main_data_frame = ttk.Frame(self.notebook)
        self.notebook.add(main_data_frame, text="主数据预览")
        self.setup_data_treeview(main_data_frame, "main")

        # 比对数据标签页
        compare_data_frame = ttk.Frame(self.notebook)
        self.notebook.add(compare_data_frame, text="比对数据预览")
        self.setup_data_treeview(compare_data_frame, "compare")

        # 分组设置区域
        group_frame = ttk.LabelFrame(main_frame, text="分组设置", padding="5")
        group_frame.grid(row=3, column=0, columnspan=4, sticky=(tk.W, tk.E), pady=5)

        ttk.Label(group_frame, text="基准列(分组依据):").grid(row=0, column=0, padx=5)
        self.group_combo = ttk.Combobox(group_frame, state="readonly", width=15)
        self.group_combo.grid(row=0, column=1, padx=5)

        ttk.Label(group_frame, text="汇总列:").grid(row=0, column=2, padx=5)
        self.sum_combo = ttk.Combobox(group_frame, state="readonly", width=15)
        self.sum_combo.grid(row=0, column=3, padx=5)

        ttk.Label(group_frame, text="汇总方式:").grid(row=0, column=4, padx=5)
        self.agg_combo = ttk.Combobox(group_frame, values=['求和', '平均值', '最大值', '最小值', '计数'],
                                      state="readonly", width=10)
        self.agg_combo.set('求和')
        self.agg_combo.grid(row=0, column=5, padx=5)

        # 比对设置区域
        compare_setting_frame = ttk.LabelFrame(main_frame, text="比对设置", padding="5")
        compare_setting_frame.grid(row=4, column=0, columnspan=4, sticky=(tk.W, tk.E), pady=5)

        ttk.Label(compare_setting_frame, text="比对基准列:").grid(row=0, column=0, padx=5)
        self.compare_group_combo = ttk.Combobox(compare_setting_frame, state="readonly", width=15)
        self.compare_group_combo.grid(row=0, column=1, padx=5)

        ttk.Label(compare_setting_frame, text="比对汇总列:").grid(row=0, column=2, padx=5)
        self.compare_sum_combo = ttk.Combobox(compare_setting_frame, state="readonly", width=15)
        self.compare_sum_combo.grid(row=0, column=3, padx=5)

        ttk.Label(compare_setting_frame, text="容差范围:").grid(row=0, column=4, padx=5)
        self.tolerance_entry = ttk.Entry(compare_setting_frame, width=10)
        self.tolerance_entry.insert(0, "0.01")
        self.tolerance_entry.grid(row=0, column=5, padx=5)

        # 操作按钮区域
        button_frame = ttk.Frame(main_frame)
        button_frame.grid(row=5, column=0, columnspan=4, pady=10)

        ttk.Button(button_frame, text="执行分组汇总", command=self.execute_grouping).grid(row=0, column=0, padx=5)
        ttk.Button(button_frame, text="执行数据比对", command=self.execute_comparison).grid(row=0, column=1, padx=5)
        ttk.Button(button_frame, text="导出结果", command=self.export_result).grid(row=0, column=2, padx=5)
        ttk.Button(button_frame, text="显示原始数据", command=self.show_original_data).grid(row=0, column=3, padx=5)

        # 结果显示区域(使用Notebook)
        result_notebook = ttk.Notebook(main_frame)
        result_notebook.grid(row=6, column=0, columnspan=4, sticky=(tk.W, tk.E, tk.N, tk.S), pady=5)

        # 汇总结果标签页
        summary_frame = ttk.Frame(result_notebook)
        result_notebook.add(summary_frame, text="汇总结果")
        self.setup_result_treeview(summary_frame, "summary")

        # 比对结果标签页
        compare_result_frame = ttk.Frame(result_notebook)
        result_notebook.add(compare_result_frame, text="比对结果")
        self.setup_result_treeview(compare_result_frame, "compare")

        # 差异详情标签页
        diff_frame = ttk.Frame(result_notebook)
        result_notebook.add(diff_frame, text="差异详情")
        self.setup_result_treeview(diff_frame, "diff")

        # 配置主框架的网格权重
        main_frame.columnconfigure(0, weight=1)
        main_frame.rowconfigure(2, weight=1)
        main_frame.rowconfigure(6, weight=1)

        self.root.columnconfigure(0, weight=1)
        self.root.rowconfigure(0, weight=1)

    def setup_data_treeview(self, parent, tree_type):
        """设置数据预览的Treeview"""
        tree = ttk.Treeview(parent, show='headings', height=8)
        scrollbar_y = ttk.Scrollbar(parent, orient=tk.VERTICAL, command=tree.yview)
        scrollbar_x = ttk.Scrollbar(parent, orient=tk.HORIZONTAL, command=tree.xview)
        tree.configure(yscrollcommand=scrollbar_y.set, xscrollcommand=scrollbar_x.set)

        tree.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
        scrollbar_y.grid(row=0, column=1, sticky=(tk.N, tk.S))
        scrollbar_x.grid(row=1, column=0, sticky=(tk.W, tk.E))

        parent.columnconfigure(0, weight=1)
        parent.rowconfigure(0, weight=1)

        if tree_type == "main":
            self.tree = tree
        else:
            self.compare_tree = tree

    def setup_result_treeview(self, parent, tree_type):
        """设置结果显示的Treeview"""
        tree = ttk.Treeview(parent, show='headings', height=10)
        scrollbar_y = ttk.Scrollbar(parent, orient=tk.VERTICAL, command=tree.yview)
        scrollbar_x = ttk.Scrollbar(parent, orient=tk.HORIZONTAL, command=tree.xview)
        tree.configure(yscrollcommand=scrollbar_y.set, xscrollcommand=scrollbar_x.set)

        tree.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
        scrollbar_y.grid(row=0, column=1, sticky=(tk.N, tk.S))
        scrollbar_x.grid(row=1, column=0, sticky=(tk.W, tk.E))

        parent.columnconfigure(0, weight=1)
        parent.rowconfigure(0, weight=1)

        if tree_type == "summary":
            self.result_tree = tree
        elif tree_type == "compare":
            self.compare_result_tree = tree
        else:
            self.diff_tree = tree

    def load_file(self):
        """加载主Excel文件"""
        file_path = filedialog.askopenfilename(
            title="选择主Excel文件",
            filetypes=[("Excel files", "*.xlsx *.xls"), ("All files", "*.*")]
        )

        if file_path:
            try:
                self.file_path = file_path
                self.file_label.config(text=os.path.basename(file_path))

                # 获取所有工作表名称
                excel_file = ExcelFile(file_path)
                self.sheet_names = excel_file.sheet_names
                self.sheet_combo['values'] = self.sheet_names

                if self.sheet_names:
                    self.sheet_combo.set(self.sheet_names[0])
                    self.load_sheet_data(self.sheet_names[0])

            except Exception as e:
                messagebox.showerror("错误", f"读取文件失败: {str(e)}")

    def load_compare_file(self):
        """加载比对Excel文件"""
        file_path = filedialog.askopenfilename(
            title="选择比对Excel文件",
            filetypes=[("Excel files", "*.xlsx *.xls"), ("All files", "*.*")]
        )

        if file_path:
            try:
                self.compare_file_path = file_path
                self.compare_file_label.config(text=os.path.basename(file_path))

                # 获取所有工作表名称
                excel_file = ExcelFile(file_path)
                self.compare_sheet_names = excel_file.sheet_names
                self.compare_sheet_combo['values'] = self.compare_sheet_names

                if self.compare_sheet_names:
                    self.compare_sheet_combo.set(self.compare_sheet_names[0])
                    # 修复:方法名拼写错误
                    self.load_compare_sheet_data(self.compare_sheet_names[0])

            except Exception as e:
                messagebox.showerror("错误", f"读取比对文件失败: {str(e)}")

    def load_compare_sheet_data(self, sheet_name):
        """加载比对工作表的数据 - 修复方法名"""
        self.load_sheet_data(sheet_name, "compare")

    def load_sheet_data(self, sheet_name, tree_type="main"):
        """加载指定工作表的数据"""
        try:
            if tree_type == "main":
                file_path = self.file_path
                df = pd.read_excel(file_path, sheet_name=sheet_name)
                self.current_df = df
                self.current_sheet = sheet_name
                tree = self.tree
            else:
                file_path = self.compare_file_path
                df = pd.read_excel(file_path, sheet_name=sheet_name)
                self.compare_df = df
                self.compare_sheet = sheet_name
                tree = self.compare_tree

            # 清空Treeview
            for item in tree.get_children():
                tree.delete(item)
            tree['columns'] = []

            # 设置列
            columns = list(df.columns)
            tree['columns'] = columns

            # 配置列标题
            for col in columns:
                tree.heading(col, text=col)
                tree.column(col, width=100, minwidth=50)

            # 插入数据(限制显示行数,避免界面卡顿)
            display_rows = min(100, len(df))  # 最多显示100行
            for i in range(display_rows):
                row = df.iloc[i]
                values = [str(x) if not pd.isna(x) else "" for x in row]
                tree.insert("", "end", values=values)

            if len(df) > display_rows:
                tree.insert("", "end",
                            values=[f"... 还有 {len(df) - display_rows} 行数据 ..."] + [""] * (len(columns) - 1))

            # 更新组合框
            if tree_type == "main":
                self.group_combo['values'] = columns
                self.sum_combo['values'] = columns

                if columns:
                    self.group_combo.set(columns[0])
                    if len(columns) > 1:
                        self.sum_combo.set(columns[1])
                    else:
                        self.sum_combo.set(columns[0])
            else:
                self.compare_group_combo['values'] = columns
                self.compare_sum_combo['values'] = columns

                if columns:
                    self.compare_group_combo.set(columns[0])
                    if len(columns) > 1:
                        self.compare_sum_combo.set(columns[1])
                    else:
                        self.compare_sum_combo.set(columns[0])

            messagebox.showinfo("成功", f"已加载工作表: {sheet_name}\n数据行数: {len(df)}")

        except Exception as e:
            messagebox.showerror("错误", f"加载工作表失败: {str(e)}")

    def on_sheet_select(self, event):
        """主工作表选择事件"""
        sheet_name = self.sheet_combo.get()
        if sheet_name:
            self.load_sheet_data(sheet_name, "main")

    def on_compare_sheet_select(self, event):
        """比对工作表选择事件"""
        sheet_name = self.compare_sheet_combo.get()
        if sheet_name:
            self.load_sheet_data(sheet_name, "compare")

    def execute_grouping(self):
        """执行分组汇总"""
        if self.current_df is None:
            messagebox.showwarning("警告", "请先加载主数据")
            return

        group_col = self.group_combo.get()
        sum_col = self.sum_combo.get()
        agg_method = self.agg_combo.get()

        if not group_col or not sum_col:
            messagebox.showwarning("警告", "请选择基准列和汇总列")
            return

        try:
            # 检查列是否存在
            if group_col not in self.current_df.columns or sum_col not in self.current_df.columns:
                messagebox.showerror("错误", "选择的列不存在")
                return

            # 映射汇总方法
            agg_map = {
                '求和': 'sum',
                '平均值': 'mean',
                '最大值': 'max',
                '最小值': 'min',
                '计数': 'count'
            }

            # 执行分组汇总
            if agg_method == '计数':
                grouped = self.current_df.groupby(group_col).agg({
                    sum_col: agg_map[agg_method]
                }).round(2)
            else:
                self.current_df[sum_col] = pd.to_numeric(self.current_df[sum_col], errors='coerce')
                grouped = self.current_df.groupby(group_col).agg({
                    sum_col: agg_map[agg_method]
                }).round(2)

            # 重置索引
            grouped = grouped.reset_index()

            # 显示结果
            self.display_result(grouped, group_col, sum_col, agg_method, "summary")

        except Exception as e:
            messagebox.showerror("错误", f"分组汇总失败: {str(e)}")

    def execute_comparison(self):
        """执行数据比对"""
        if self.current_df is None or self.compare_df is None:
            messagebox.showwarning("警告", "请先加载主数据和比对数据")
            return

        main_group_col = self.group_combo.get()
        main_sum_col = self.sum_combo.get()
        compare_group_col = self.compare_group_combo.get()
        compare_sum_col = self.compare_sum_combo.get()
        agg_method = self.agg_combo.get()

        if not all([main_group_col, main_sum_col, compare_group_col, compare_sum_col]):
            messagebox.showwarning("警告", "请完整设置比对参数")
            return

        try:
            # 获取容差
            tolerance = float(self.tolerance_entry.get())

            # 对主数据进行分组汇总
            self.current_df[main_sum_col] = pd.to_numeric(self.current_df[main_sum_col], errors='coerce')
            main_grouped = self.current_df.groupby(main_group_col).agg({
                main_sum_col: 'sum'
            }).round(2).reset_index()

            # 对比对数据进行分组汇总
            self.compare_df[compare_sum_col] = pd.to_numeric(self.compare_df[compare_sum_col], errors='coerce')
            compare_grouped = self.compare_df.groupby(compare_group_col).agg({
                compare_sum_col: 'sum'
            }).round(2).reset_index()

            # 修复:确保合并列的数据类型一致
            main_grouped[main_group_col] = main_grouped[main_group_col].astype(str)
            compare_grouped[compare_group_col] = compare_grouped[compare_group_col].astype(str)

            # 合并两个结果
            merged = pd.merge(main_grouped, compare_grouped,
                              left_on=main_group_col, right_on=compare_group_col,
                              how='outer', suffixes=('_主数据', '_比对数据'), indicator=True)

            # 处理合并后的数据
            # 填充NaN值为0,便于计算差异
            merged[f'{main_sum_col}_主数据'] = merged[f'{main_sum_col}_主数据'].fillna(0)
            merged[f'{compare_sum_col}_比对数据'] = merged[f'{compare_sum_col}_比对数据'].fillna(0)

            # 计算差异
            merged['差异'] = merged[f'{main_sum_col}_主数据'] - merged[f'{compare_sum_col}_比对数据']
            merged['差异绝对值'] = abs(merged['差异'])
            merged['是否一致'] = merged['差异绝对值'] <= tolerance

            # 重命名合并指示列
            merged = merged.rename(columns={'_merge': '数据来源'})
            merged['数据来源'] = merged['数据来源'].map({
                'left_only': '仅主数据存在',
                'right_only': '仅比对数据存在',
                'both': '两边都存在'
            })

            # 显示比对结果
            self.display_result(merged, main_group_col, main_sum_col, agg_method, "compare")

            # 显示差异详情
            diff_data = merged[~merged['是否一致']].copy()
            if not diff_data.empty:
                self.display_result(diff_data, main_group_col, main_sum_col, agg_method, "diff")
                messagebox.showinfo("比对完成",
                                    f"比对完成!\n"
                                    f"总记录数: {len(merged)}\n"
                                    f"一致记录: {len(merged[merged['是否一致']])}\n"
                                    f"差异记录: {len(diff_data)}\n"
                                    f"仅主数据存在: {len(merged[merged['数据来源'] == '仅主数据存在'])}\n"
                                    f"仅比对数据存在: {len(merged[merged['数据来源'] == '仅比对数据存在'])}")
            else:
                self.display_result(pd.DataFrame(), main_group_col, main_sum_col, agg_method, "diff")
                messagebox.showinfo("比对完成", "比对完成!所有数据一致!")

            # 保存比对结果
            self.comparison_result = merged

        except Exception as e:
            messagebox.showerror("错误", f"数据比对失败: {str(e)}")

    def display_result(self, result_df, group_col, sum_col, agg_method, result_type):
        """显示结果"""
        if result_type == "summary":
            tree = self.result_tree
            title = "汇总结果"
        elif result_type == "compare":
            tree = self.compare_result_tree
            title = "比对结果"
        else:
            tree = self.diff_tree
            title = "差异详情"

        # 清空Treeview
        for item in tree.get_children():
            tree.delete(item)
        tree['columns'] = []

        if result_df.empty:
            tree['columns'] = ['提示']
            tree.heading('提示', text='提示')
            tree.column('提示', width=300)
            tree.insert("", "end", values=["没有数据"])
            return

        # 设置列
        columns = list(result_df.columns)
        tree['columns'] = columns

        # 配置列标题
        for col in columns:
            display_name = col
            tree.heading(col, text=display_name)
            # 根据列内容调整列宽
            if any(isinstance(x, (int, float)) for x in result_df[col].head() if not pd.isna(x)):
                tree.column(col, width=120, minwidth=80)
            else:
                tree.column(col, width=150, minwidth=100)

        # 插入数据
        for i, row in result_df.iterrows():
            values = []
            for x in row:
                if isinstance(x, (int, float)) and not isinstance(x, bool):
                    values.append(f"{x:.2f}")
                else:
                    values.append(str(x) if not pd.isna(x) else "")
            tree.insert("", "end", values=values)

        # 保存结果用于导出
        if result_type == "summary":
            self.summary_result = result_df
        elif result_type == "compare":
            self.comparison_result = result_df

    def export_result(self):
        """导出结果到Excel"""
        if not hasattr(self, 'summary_result') and not hasattr(self, 'comparison_result'):
            messagebox.showwarning("警告", "没有可导出的结果数据")
            return

        file_path = filedialog.asksaveasfilename(
            title="保存结果",
            defaultextension=".xlsx",
            filetypes=[("Excel files", "*.xlsx"), ("All files", "*.*")]
        )

        if file_path:
            try:
                with pd.ExcelWriter(file_path, engine='openpyxl') as writer:
                    if hasattr(self, 'summary_result'):
                        self.summary_result.to_excel(writer, sheet_name='汇总结果', index=False)

                    if hasattr(self, 'comparison_result'):
                        self.comparison_result.to_excel(writer, sheet_name='比对结果', index=False)

                    if hasattr(self, 'current_df'):
                        self.current_df.to_excel(writer, sheet_name='主原始数据', index=False)

                    if hasattr(self, 'compare_df'):
                        self.compare_df.to_excel(writer, sheet_name='比对原始数据', index=False)

                messagebox.showinfo("成功", f"结果已导出到: {file_path}")
            except Exception as e:
                messagebox.showerror("错误", f"导出失败: {str(e)}")

    def show_original_data(self):
        """显示原始数据"""
        if self.current_df is not None:
            self.load_sheet_data(self.current_sheet, "main")
        if hasattr(self, 'compare_df') and self.compare_df is not None:
            self.load_sheet_data(self.compare_sheet, "compare")


def main():
    root = tk.Tk()
    app = ExcelGroupAnalyzer(root)
    root.mainloop()


if __name__ == "__main__":
    main()


评论(0)

查看评论列表

暂无评论


发表评论

表情 颜文字
插入代码