PDF-Extract-Kit-1.0代码实例:Python封装Shell脚本实现批量PDF处理Pipeline

PDF-Extract-Kit-1.0代码实例:Python封装Shell脚本实现批量PDF处理Pipeline PDF-Extract-Kit-1.0代码实例Python封装Shell脚本实现批量PDF处理Pipeline1. 引言告别繁琐拥抱自动化如果你经常和PDF文档打交道尤其是需要批量处理大量PDF文件提取其中的表格、公式或者分析文档布局那么你一定对重复执行各种命令行工具感到头疼。不同的工具需要不同的命令、参数和环境手动操作不仅效率低下还容易出错。PDF-Extract-Kit-1.0 镜像为我们提供了一个强大的工具箱里面集成了表格识别、布局推理、公式识别与推理等多个功能。但它的使用方式是通过执行不同的Shell脚本来调用比如表格识别.sh、公式推理.sh。想象一下如果你有100个PDF文件需要依次进行这四种处理手动操作将是多么巨大的工作量。本文要解决的问题正是这个痛点。我们将一起动手用Python写一个“指挥官”把那些分散的Shell脚本命令封装起来构建一个自动化、可配置的批量PDF处理流水线。学完本文你将掌握如何用几十行Python代码让机器自动为你完成所有繁琐的PDF处理任务而你只需要喝杯咖啡等待结果。2. 核心思路Python作为流程调度器在开始写代码之前我们先理清思路。我们的目标不是重新发明轮子去写识别算法而是利用现有的、已经验证过的工具PDF-Extract-Kit-1.0中的脚本通过Python来编排和自动化它们的执行流程。2.1 我们想做什么批量处理指定一个包含多个PDF文件的文件夹程序能自动处理每一个文件。流水线作业对一个PDF文件可以按顺序执行多个处理步骤例如先识别布局再提取表格最后识别公式。灵活配置可以自由选择要对文件执行哪些处理步骤而不是每次都运行全套。结果管理自动将不同步骤的输出结果保存到有组织的目录结构中方便后续查找和使用。状态记录与容错处理成功或失败都有记录某个文件处理失败不影响其他文件。2.2 Python如何与Shell脚本协作PDF-Extract-Kit-1.0中的.sh脚本是已经写好的、功能完整的命令。Python的subprocess模块就是我们的“遥控器”它可以启动并控制这些外部程序Shell脚本的运行。我们将通过Python来拼接出正确的Shell命令。执行这些命令并捕获输出。根据命令执行的成功与否来决定下一步动作。简单来说Python负责逻辑控制和流程编排Shell脚本负责具体的重型PDF处理任务。这是一种非常经典且高效的工程实践。3. 环境准备与基础代码首先确保你已经按照指引部署了PDF-Extract-Kit-1.0镜像并进入了Jupyter环境。3.1 创建我们的Python项目目录虽然工具包在/root/PDF-Extract-Kit但为了不干扰原始文件我们新建一个工作目录。# 在你的工作空间例如 /root 或 /home创建新目录 mkdir -p ~/pdf_auto_pipeline cd ~/pdf_auto_pipeline3.2 编写核心封装函数我们创建一个名为pdf_pipeline.py的Python文件。第一个核心功能是封装执行Shell脚本的动作。# pdf_pipeline.py import subprocess import os import sys import time from pathlib import Path import json import logging # 设置日志方便查看运行情况 logging.basicConfig(levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s) logger logging.getLogger(__name__) # PDF-Extract-Kit 工具包根目录根据你的实际路径调整 KIT_ROOT Path(/root/PDF-Extract-Kit) def run_shell_script(script_name, pdf_file_path, output_dirNone, extra_argsNone): 执行指定的Shell脚本处理单个PDF文件。 参数: script_name (str): 脚本文件名如 表格识别.sh pdf_file_path (str or Path): 要处理的PDF文件绝对路径 output_dir (str or Path, optional): 指定输出目录。默认为None使用脚本默认输出。 extra_args (list, optional): 传递给脚本的额外参数列表。 返回: bool: 脚本执行成功返回True失败返回False str: 脚本的标准输出内容 script_path KIT_ROOT / script_name if not script_path.exists(): logger.error(f脚本不存在: {script_path}) return False, # 构建命令列表 cmd [sh, str(script_path), str(pdf_file_path)] # 添加额外参数 if extra_args: cmd.extend(extra_args) # 如果指定了输出目录可以作为一个参数传递给脚本前提是脚本支持 # 这里假设脚本支持 -o 或 --output 参数具体需要查看脚本说明。 # 示例cmd.extend([-o, str(output_dir)]) # 由于原始脚本可能不支持我们先注释掉。更通用的做法是在脚本外层控制输出位置。 logger.info(f执行命令: { .join(cmd)}) try: # 执行命令捕获输出和错误 result subprocess.run(cmd, checkTrue, # 如果返回非零状态码则抛出异常 capture_outputTrue, textTrue, cwdKIT_ROOT) # 在工具包目录下执行确保相对路径正确 logger.info(f脚本 {script_name} 执行成功。) logger.debug(f标准输出:\n{result.stdout}) if result.stderr: logger.debug(f标准错误:\n{result.stderr}) return True, result.stdout except subprocess.CalledProcessError as e: logger.error(f脚本 {script_name} 执行失败返回码: {e.returncode}) logger.error(f错误输出:\n{e.stderr}) return False, e.stderr except Exception as e: logger.error(f执行过程中发生未知错误: {e}) return False, str(e)代码解释run_shell_script函数是我们的基础积木。它接受脚本名和PDF文件路径然后去调用真正的处理脚本。使用subprocess.run来安全地执行外部命令。checkTrue会在命令失败时抛出异常让我们能捕获错误。capture_outputTrue和textTrue让我们能获取脚本打印的内容便于记录和调试。cwdKIT_ROOT非常重要它确保Shell脚本在执行时其内部的相对路径比如引用其他模型文件是正确的。4. 构建批量处理流水线有了执行单个任务的基础函数我们现在来搭建完整的流水线。4.1 定义处理步骤与流程我们在pdf_pipeline.py中继续添加以下代码class PDFProcessingPipeline: PDF批量处理流水线 # 定义可用的处理步骤及其对应的脚本 AVAILABLE_TASKS { layout: 布局推理.sh, table: 表格识别.sh, formula_recog: 公式识别.sh, formula_reason: 公式推理.sh, # 未来可以扩展更多任务 } def __init__(self, input_pdf_dir, output_base_dir): 初始化流水线。 参数: input_pdf_dir (str or Path): 存放待处理PDF的文件夹路径 output_base_dir (str or Path): 所有输出结果的根目录 self.input_dir Path(input_pdf_dir) self.output_base Path(output_base_dir) self.output_base.mkdir(parentsTrue, exist_okTrue) # 用于记录处理状态 self.processing_log [] def process_single_pdf(self, pdf_file, tasks): 对单个PDF文件执行一系列任务。 参数: pdf_file (Path): PDF文件对象 tasks (list): 要执行的任务名列表如 [layout, table] 返回: dict: 该文件处理状态的详细记录 file_record { filename: pdf_file.name, filepath: str(pdf_file), tasks: {}, overall_success: True, start_time: time.time(), end_time: None } logger.info(f开始处理文件: {pdf_file.name}) for task_name in tasks: if task_name not in self.AVAILABLE_TASKS: logger.warning(f未知任务 {task_name}已跳过。) continue script_name self.AVAILABLE_TASKS[task_name] task_start time.time() # 为每个任务创建独立的输出子目录 task_output_dir self.output_base / pdf_file.stem / task_name task_output_dir.mkdir(parentsTrue, exist_okTrue) logger.info(f 执行任务: {task_name} - {script_name}) # 调用我们之前写好的封装函数 # 注意这里假设脚本支持将结果输出到指定目录。 # 如果原脚本不支持我们需要更复杂的处理比如先运行到默认位置再移动文件。 # 这里我们传递一个输出目录参数假设脚本支持 -o。 success, output run_shell_script( script_name, pdf_file, output_dirtask_output_dir, extra_args[-o, str(task_output_dir)] # 这个参数需要根据实际脚本调整 ) task_duration time.time() - task_start file_record[tasks][task_name] { success: success, duration_seconds: round(task_duration, 2), output_sample: output[:500] if output else # 只记录前500字符 } if not success: file_record[overall_success] False logger.error(f 文件 {pdf_file.name} 的任务 {task_name} 失败。) # 这里可以选择是否继续执行该文件的其他任务 # break # 如果某个任务失败则终止该文件后续任务 file_record[end_time] time.time() total_time file_record[end_time] - file_record[start_time] file_record[total_duration_seconds] round(total_time, 2) status 成功 if file_record[overall_success] else 失败 logger.info(f文件 {pdf_file.name} 处理{status} 总耗时: {total_time:.2f}秒) return file_record def run_batch(self, tasksNone, pdf_pattern*.pdf): 批量处理输入目录下的所有PDF文件。 参数: tasks (list, optional): 要执行的任务列表。默认为None执行所有可用任务。 pdf_pattern (str): 用于匹配PDF文件的通配符模式。 返回: list: 所有文件的处理记录列表 if tasks is None: tasks list(self.AVAILABLE_TASKS.keys()) # 默认执行全部任务 logger.info(f未指定任务将执行所有可用任务: {tasks}) # 查找所有PDF文件 pdf_files list(self.input_dir.glob(pdf_pattern)) if not pdf_files: logger.warning(f在目录 {self.input_dir} 中未找到匹配 {pdf_pattern} 的PDF文件。) return [] logger.info(f找到 {len(pdf_files)} 个待处理的PDF文件。) all_records [] for pdf_file in pdf_files: record self.process_single_pdf(pdf_file, tasks) all_records.append(record) self.processing_log.append(record) # 保存到实例日志 # 保存详细的处理日志到JSON文件 log_file self.output_base / fprocessing_log_{time.strftime(%Y%m%d_%H%M%S)}.json with open(log_file, w, encodingutf-8) as f: json.dump(all_records, f, ensure_asciiFalse, indent2) logger.info(f详细处理日志已保存至: {log_file}) # 打印简要报告 self._print_summary(all_records) return all_records def _print_summary(self, records): 打印批量处理摘要报告 total_files len(records) successful_files sum(1 for r in records if r[overall_success]) failed_files total_files - successful_files print(\n *50) print(PDF批量处理流水线 - 执行摘要) print(*50) print(f处理时间: {time.strftime(%Y-%m-%d %H:%M:%S)}) print(f输入目录: {self.input_dir}) print(f输出根目录: {self.output_base}) print(f总文件数: {total_files}) print(f成功文件数: {successful_files}) print(f失败文件数: {failed_files}) print(-*50) if failed_files 0: print(失败文件列表:) for r in records: if not r[overall_success]: failed_tasks [t for t, info in r[tasks].items() if not info[success]] print(f - {r[filename]} (失败任务: {, .join(failed_tasks)})) print(*50)4.2 如何使用这个流水线现在让我们写一个简单的使用示例。在同一目录下创建一个run_pipeline.py文件。# run_pipeline.py from pdf_pipeline import PDFProcessingPipeline from pathlib import Path if __name__ __main__: # 1. 配置路径 # 假设你的PDF文件都放在这个目录 input_directory /path/to/your/pdf/files # 请修改为你的PDF文件夹路径 # 所有结果将输出到这个目录下 output_directory /path/to/your/output/results # 请修改为你想要的输出路径 # 2. 创建流水线实例 pipeline PDFProcessingPipeline(input_directory, output_directory) # 3. 定义要执行的任务序列 # 例如先分析布局再识别表格和公式 my_tasks [layout, table, formula_recog] # 如果你想运行所有任务可以不传tasks参数或者传入 my_tasks [layout, table, formula_recog, formula_reason] # 4. 运行批量处理 print(启动PDF批量处理流水线...) results pipeline.run_batch(tasksmy_tasks) print(\n处理完成请查看输出目录获取结果。)5. 进阶优化与实战技巧上面的代码已经是一个可用的流水线但在实际生产中我们还可以让它更强大、更健壮。5.1 处理原脚本的输出定位问题一个常见的挑战是PDF-Extract-Kit中的Shell脚本可能将结果输出到固定的、或由脚本内部决定的目录而不是我们指定的目录。我们有几种策略应对策略一修改Shell脚本如果允许最简单的方法是直接修改.sh脚本增加一个输出目录参数。例如在表格识别.sh开头添加参数解析#!/bin/bash # 原脚本内容可能已存在 # 我们在开头添加 OUTPUT_DIR./default_output # 默认值 while [[ $# -gt 0 ]]; do case $1 in -o|--output) OUTPUT_DIR$2 shift 2 ;; *) PDF_FILE$1 shift ;; esac done # 确保输出目录存在 mkdir -p $OUTPUT_DIR # ... 后续脚本命令中将所有输出重定向到 $OUTPUT_DIR ...策略二Python层进行结果文件搬运更通用如果不便修改原脚本我们可以在Python执行完脚本后将结果从默认位置移动到我们指定的目录。这需要你先了解每个脚本的输出默认在哪里。修改run_shell_script函数或process_single_pdf方法在执行成功后添加文件移动逻辑import shutil # ... 在 run_shell_script 函数成功执行后 ... if success and output_dir: # 假设你知道脚本默认输出到 KIT_ROOT / outputs / pdf_file.stem default_output_path KIT_ROOT / outputs / pdf_file.stem if default_output_path.exists(): # 将整个目录移动到我们指定的位置 shutil.copytree(default_output_path, output_dir, dirs_exist_okTrue) logger.info(f已将结果从 {default_output_path} 移动到 {output_dir}) # 可选删除原默认输出以节省空间 # shutil.rmtree(default_output_path)5.2 增加并行处理提升速度处理大量文件时顺序执行会很慢。我们可以使用Python的concurrent.futures模块实现多进程并行处理。# 在 PDFProcessingPipeline 类中添加新方法 import concurrent.futures def run_batch_parallel(self, tasksNone, pdf_pattern*.pdf, max_workers2): 使用多进程并行批量处理PDF文件。 注意并行处理可能大幅增加内存和CPU负载请根据机器配置调整max_workers。 if tasks is None: tasks list(self.AVAILABLE_TASKS.keys()) pdf_files list(self.input_dir.glob(pdf_pattern)) if not pdf_files: logger.warning(f未找到PDF文件。) return [] all_records [] logger.info(f开始并行处理 {len(pdf_files)} 个文件最大工人数: {max_workers}) # 使用进程池 with concurrent.futures.ProcessPoolExecutor(max_workersmax_workers) as executor: # 提交所有任务 future_to_file { executor.submit(self._process_single_wrapper, pdf_file, tasks): pdf_file for pdf_file in pdf_files } # 收集结果 for future in concurrent.futures.as_completed(future_to_file): pdf_file future_to_file[future] try: record future.result() all_records.append(record) logger.info(f完成: {pdf_file.name}) except Exception as exc: logger.error(f文件 {pdf_file.name} 在并行处理中产生异常: {exc}) # 创建一个失败记录 all_records.append({ filename: pdf_file.name, filepath: str(pdf_file), overall_success: False, error: str(exc) }) # 后续的日志保存和摘要打印与 run_batch 相同 # ... (省略重复代码) ... return all_records def _process_single_wrapper(self, pdf_file, tasks): 一个包装函数用于适配并行执行接口。 # 注意在多进程中需要确保每个进程有独立的配置和资源。 # 这里简单调用原方法复杂场景可能需要复制环境。 return self.process_single_pdf(pdf_file, tasks)使用并行处理 只需在run_pipeline.py中将pipeline.run_batch(...)替换为pipeline.run_batch_parallel(tasksmy_tasks, max_workers4)。max_workers的数量建议设置为你的CPU核心数或更少具体取决于每个任务的内存消耗。5.3 生成可视化报告除了文本日志我们还可以生成一个简单的HTML报告更直观地展示处理结果。# 在 pdf_pipeline.py 中添加一个报告生成函数 def generate_html_report(records, output_filereport.html): 生成HTML格式的处理报告 html_content !DOCTYPE html html head titlePDF处理流水线报告/title style body { font-family: sans-serif; margin: 40px; } table { border-collapse: collapse; width: 100%; margin-top: 20px; } th, td { border: 1px solid #ddd; padding: 12px; text-align: left; } th { background-color: #4CAF50; color: white; } tr:nth-child(even) { background-color: #f2f2f2; } .success { color: green; font-weight: bold; } .failure { color: red; font-weight: bold; } .summary { background-color: #e7f3fe; padding: 15px; border-radius: 5px; } /style /head body h1 PDF批量处理流水线执行报告/h1 div classsummary h2执行摘要/h2 pstrong总文件数:/strong {total_files}/p pstrong成功文件数:/strong span classsuccess{success_count}/span/p pstrong失败文件数:/strong span classfailure{fail_count}/span/p pstrong生成时间:/strong {gen_time}/p /div h2详细处理记录/h2 table tr th文件名/th th状态/th th总耗时(秒)/th th执行任务/th th备注/th /tr {rows} /table /body /html total len(records) success sum(1 for r in records if r.get(overall_success, False)) failed total - success rows for r in records: status 成功 if r.get(overall_success) else 失败 status_class success if r.get(overall_success) else failure tasks_done , .join(r.get(tasks, {}).keys()) duration r.get(total_duration_seconds, N/A) rows f tr td{r.get(filename)}/td tdspan class{status_class}{status}/span/td td{duration}/td td{tasks_done}/td td{任务全部完成 if r.get(overall_success) else 存在失败任务}/td /tr final_html html_content.format( total_filestotal, success_countsuccess, fail_countfailed, gen_timetime.strftime(%Y-%m-%d %H:%M:%S), rowsrows ) with open(output_file, w, encodingutf-8) as f: f.write(final_html) logger.info(fHTML报告已生成: {output_file})在run_batch方法末尾调用它generate_html_report(all_records, self.output_base / report.html)。6. 总结通过本文的实践我们完成了一个从简单封装到复杂流水线的构建过程。回顾一下我们实现的核心价值自动化将手动执行多个Shell脚本的重复劳动变成了一个命令就能启动的自动化流程。批量化轻松应对成百上千个PDF文件的处理需求效率提升数十倍。可配置化可以自由组合处理步骤布局、表格、公式适应不同的业务场景。健壮性完善的日志记录和错误处理让处理过程清晰可控部分文件失败不影响整体。可扩展性代码结构清晰很容易添加新的处理步骤只需更新AVAILABLE_TASKS字典或优化策略如并行处理、结果搬运。这个项目不仅仅是一个脚本更是一个工程化思维的体现。面对已有的、分散的工具我们通过编写“胶水代码”Glue Code将它们系统地串联起来创造出更大的生产力。你可以基于这个框架继续扩展更多功能比如集成邮件通知处理完成后发送报告。添加对压缩文件ZIP的支持自动解压处理。将流水线部署为Web服务或定时任务。与云存储如S3集成直接处理云端文件。希望这个实例能为你处理类似批量任务提供清晰的思路和实用的代码模板。自动化不是为了替代思考而是将我们从重复中解放出来去解决更有价值的问题。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。