Qwen3-ASR-0.6B代码实例:Qwen3-ASR-0.6B批量处理目录下所有音频文件

Qwen3-ASR-0.6B代码实例:Qwen3-ASR-0.6B批量处理目录下所有音频文件 Qwen3-ASR-0.6B代码实例Qwen3-ASR-0.6B批量处理目录下所有音频文件1. 引言语音识别的批量处理需求在实际工作中我们经常需要处理大量的音频文件比如会议录音整理、播客内容转录、语音资料归档等。如果一个一个文件手动上传处理效率极低且容易出错。Qwen3-ASR-0.6B作为一款轻量级高性能语音识别模型提供了强大的API接口让我们能够通过编程方式实现批量处理。本文将带你一步步学习如何使用Python代码批量处理目录下的所有音频文件将语音转换为文字。无论你是技术小白还是有经验的开发者都能快速掌握这个实用技能。2. 环境准备与快速部署2.1 确认服务正常运行在开始编写批量处理代码之前首先确保Qwen3-ASR-0.6B服务已经正常运行。打开终端执行以下命令检查服务状态curl http://你的服务器IP:8080/api/health如果返回类似下面的响应说明服务正常{ status: healthy, model_loaded: true, gpu_available: true }2.2 安装必要的Python库我们需要安装几个Python库来编写批量处理脚本pip install requests tqdmrequests用于发送HTTP请求到语音识别服务tqdm用于显示处理进度条让批量处理过程更直观3. 批量处理代码实现3.1 基础批量处理脚本下面是一个完整的批量处理脚本可以处理指定目录下的所有音频文件import os import requests from tqdm import tqdm import time class AudioBatchProcessor: def __init__(self, server_urlhttp://localhost:8080): self.server_url server_url self.supported_formats [.wav, .mp3, .m4a, .flac, .ogg] def get_audio_files(self, directory_path): 获取目录下所有支持的音频文件 audio_files [] for file in os.listdir(directory_path): if any(file.lower().endswith(ext) for ext in self.supported_formats): audio_files.append(os.path.join(directory_path, file)) return audio_files def transcribe_audio(self, audio_path, languageNone): 转录单个音频文件 try: with open(audio_path, rb) as audio_file: files {audio_file: audio_file} data {language: language} if language else {} response requests.post( f{self.server_url}/api/transcribe, filesfiles, datadata, timeout300 # 5分钟超时 ) if response.status_code 200: result response.json() return result.get(text, ), True else: return f错误: {response.status_code} - {response.text}, False except Exception as e: return f异常: {str(e)}, False def process_directory(self, input_dir, output_filetranscriptions.txt, languageNone): 批量处理目录下的所有音频文件 audio_files self.get_audio_files(input_dir) if not audio_files: print(未找到支持的音频文件) return print(f找到 {len(audio_files)} 个音频文件开始处理...) # 打开输出文件 with open(output_file, w, encodingutf-8) as f_out: # 使用进度条显示处理进度 for audio_file in tqdm(audio_files, desc处理音频文件): text, success self.transcribe_audio(audio_file, language) # 写入结果 f_out.write(f文件: {os.path.basename(audio_file)}\n) f_out.write(f状态: {成功 if success else 失败}\n) f_out.write(f转录结果: {text}\n) f_out.write(- * 50 \n\n) # 避免频繁请求添加短暂延迟 time.sleep(0.5) print(f处理完成结果已保存到: {output_file}) # 使用示例 if __name__ __main__: processor AudioBatchProcessor(server_urlhttp://你的服务器IP:8080) processor.process_directory( input_dir./audio_files, # 音频文件所在目录 output_file./transcription_results.txt, # 输出文件 languageChinese # 可选指定语言 )3.2 增强版批量处理脚本如果你需要更高级的功能比如错误重试、结果统计等可以使用这个增强版脚本import os import requests from tqdm import tqdm import time import json from datetime import datetime class EnhancedAudioProcessor: def __init__(self, server_urlhttp://localhost:8080, max_retries3): self.server_url server_url self.supported_formats [.wav, .mp3, .m4a, .flac, .ogg] self.max_retries max_retries def get_audio_files(self, directory_path): 获取目录下所有支持的音频文件按修改时间排序 audio_files [] for file in os.listdir(directory_path): file_path os.path.join(directory_path, file) if any(file.lower().endswith(ext) for ext in self.supported_formats): audio_files.append(file_path) # 按修改时间排序最新的优先处理 audio_files.sort(keyos.path.getmtime, reverseTrue) return audio_files def transcribe_with_retry(self, audio_path, languageNone): 带重试机制的转录函数 for attempt in range(self.max_retries): try: with open(audio_path, rb) as audio_file: files {audio_file: audio_file} data {language: language} if language else {} response requests.post( f{self.server_url}/api/transcribe, filesfiles, datadata, timeout300 ) if response.status_code 200: result response.json() return result.get(text, ), True, attempt 1 else: print(f第{attempt 1}次尝试失败: {response.status_code}) time.sleep(2 ** attempt) # 指数退避 except Exception as e: print(f第{attempt 1}次尝试异常: {str(e)}) time.sleep(2 ** attempt) return 所有尝试均失败, False, self.max_retries def process_directory_enhanced(self, input_dir, output_dirresults, languageNone): 增强的批量处理函数支持更多功能 os.makedirs(output_dir, exist_okTrue) audio_files self.get_audio_files(input_dir) if not audio_files: print(未找到支持的音频文件) return # 创建详细统计信息 stats { total_files: len(audio_files), successful: 0, failed: 0, start_time: datetime.now().isoformat(), results: [] } print(f找到 {len(audio_files)} 个音频文件开始处理...) # 处理每个文件 for audio_file in tqdm(audio_files, desc处理音频文件): file_name os.path.basename(audio_file) text, success, attempts self.transcribe_with_retry(audio_file, language) # 保存单个文件的结果 result_data { filename: file_name, success: success, attempts: attempts, transcription: text, processing_time: datetime.now().isoformat() } # 保存到单独的文件 base_name os.path.splitext(file_name)[0] result_file os.path.join(output_dir, f{base_name}_result.json) with open(result_file, w, encodingutf-8) as f: json.dump(result_data, f, ensure_asciiFalse, indent2) # 更新统计信息 if success: stats[successful] 1 else: stats[failed] 1 stats[results].append(result_data) time.sleep(0.3) # 避免服务器过载 # 完成统计 stats[end_time] datetime.now().isoformat() stats[success_rate] f{(stats[successful] / stats[total_files] * 100):.1f}% # 保存总体统计信息 stats_file os.path.join(output_dir, processing_stats.json) with open(stats_file, w, encodingutf-8) as f: json.dump(stats, f, ensure_asciiFalse, indent2) # 生成汇总报告 self.generate_summary_report(stats, output_dir) print(f\n处理完成) print(f成功: {stats[successful]}/{stats[total_files]} ({stats[success_rate]})) print(f详细结果保存在: {output_dir}) def generate_summary_report(self, stats, output_dir): 生成文本格式的汇总报告 report_file os.path.join(output_dir, summary_report.txt) with open(report_file, w, encodingutf-8) as f: f.write(语音识别批量处理汇总报告\n) f.write( * 50 \n\n) f.write(f处理时间: {stats[start_time]} 至 {stats[end_time]}\n) f.write(f总文件数: {stats[total_files]}\n) f.write(f成功识别: {stats[successful]}\n) f.write(f识别失败: {stats[failed]}\n) f.write(f成功率: {stats[success_rate]}\n\n) f.write(详细结果:\n) f.write(- * 30 \n) for result in stats[results]: status 成功 if result[success] else f失败(尝试{result[attempts]}次) f.write(f{result[filename]}: {status}\n) if result[success] and result[transcription]: f.write(f 转录: {result[transcription][:100]}...\n) f.write(\n) # 使用示例 if __name__ __main__: processor EnhancedAudioProcessor( server_urlhttp://你的服务器IP:8080, max_retries3 ) processor.process_directory_enhanced( input_dir./audio_files, # 音频文件目录 output_dir./results, # 结果输出目录 languageChinese # 可选语言 )4. 实际应用场景与技巧4.1 处理不同语言的音频文件Qwen3-ASR-0.6B支持52种语言和方言你可以根据音频内容指定不同的语言参数# 处理英文音频 processor.process_directory(./english_audios, languageEnglish) # 处理粤语音频 processor.process_directory(./cantonese_audios, languageCantonese) # 让模型自动检测语言不指定language参数 processor.process_directory(./mixed_audios)4.2 定时批量处理任务你可以结合系统定时任务实现自动化的批量处理import schedule import time def daily_audio_processing(): 每天定时处理新音频文件 processor AudioBatchProcessor(server_urlhttp://你的服务器IP:8080) # 只处理今天新产生的文件 today datetime.now().strftime(%Y%m%d) output_file f./results/transcriptions_{today}.txt processor.process_directory(./daily_audios, output_file) # 每天凌晨2点执行 schedule.every().day.at(02:00).do(daily_audio_processing) while True: schedule.run_pending() time.sleep(60)4.3 处理网络音频URL除了本地文件还可以批量处理网络上的音频文件def process_urls(url_list, output_fileurl_transcriptions.txt): 批量处理网络音频URL results [] for url in tqdm(url_list, desc处理URL音频): try: response requests.post( http://你的服务器IP:8080/api/transcribe_url, json{audio_url: url}, timeout300 ) if response.status_code 200: text response.json().get(text, ) results.append(fURL: {url}\n转录: {text}\n) else: results.append(fURL: {url}\n错误: {response.status_code}\n) except Exception as e: results.append(fURL: {url}\n异常: {str(e)}\n) # 保存结果 with open(output_file, w, encodingutf-8) as f: f.writelines(results)5. 常见问题与解决方法5.1 处理速度优化如果处理大量文件时速度较慢可以尝试以下优化方法# 使用多线程加速处理 from concurrent.futures import ThreadPoolExecutor, as_completed def parallel_process_directory(input_dir, output_file, languageNone, max_workers4): 使用多线程并行处理 processor AudioBatchProcessor() audio_files processor.get_audio_files(input_dir) with ThreadPoolExecutor(max_workersmax_workers) as executor: # 提交所有任务 future_to_file { executor.submit(processor.transcribe_audio, file, language): file for file in audio_files } # 收集结果 results [] for future in tqdm(as_completed(future_to_file), totallen(audio_files)): file future_to_file[future] try: text, success future.result() results.append((os.path.basename(file), text, success)) except Exception as e: results.append((os.path.basename(file), f异常: {str(e)}, False)) # 保存结果 with open(output_file, w, encodingutf-8) as f: for filename, text, success in results: f.write(f文件: {filename}\n状态: {成功 if success else 失败}\n) f.write(f转录结果: {text}\n) f.write(- * 50 \n\n)5.2 大文件处理策略对于接近100MB限制的大文件建议先进行分割处理def split_large_audio(audio_path, chunk_size_mb50): 分割大音频文件需要安装pydub库 from pydub import AudioSegment import math audio AudioSegment.from_file(audio_path) file_size_mb os.path.getsize(audio_path) / (1024 * 1024) if file_size_mb chunk_size_mb: return [audio_path] # 无需分割 # 计算需要分割成几段 chunks math.ceil(file_size_mb / chunk_size_mb) chunk_length len(audio) // chunks output_files [] for i in range(chunks): start i * chunk_length end (i 1) * chunk_length if i chunks - 1 else len(audio) chunk audio[start:end] output_path f{os.path.splitext(audio_path)[0]}_part{i1}.mp3 chunk.export(output_path, formatmp3) output_files.append(output_path) return output_files6. 总结通过本文的代码实例你已经掌握了使用Qwen3-ASR-0.6B批量处理音频文件的完整方法。从基础的单文件处理到高级的批量并行处理从本地文件到网络URL这些代码都能满足不同的业务需求。关键要点回顾使用简单的Python脚本就能实现音频批量识别通过多线程和错误重试机制提升处理效率和稳定性支持多种语言和方言的自动识别提供完整的统计和报告功能实践建议先从少量文件测试确认服务正常运行根据实际需求调整并发数量和重试策略定期检查处理结果优化识别准确率结合定时任务实现自动化处理现在你可以将这些代码应用到实际工作中大幅提升语音识别的处理效率让机器帮你完成繁琐的转录工作。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。