FlaskPython实现m3u8视频下载与Cloudflare R2云存储全流程实战当我们需要从网络获取视频资源时m3u8格式的流媒体文件是常见的选择。本文将带你从零开始构建一个完整的Flask应用实现m3u8视频的自动下载、本地存储以及Cloudflare R2云存储的双重备份方案。1. 环境准备与基础配置在开始编码前我们需要确保开发环境已经准备就绪。首先创建一个干净的Python虚拟环境python -m venv m3u8_downloader source m3u8_downloader/bin/activate # Linux/Mac m3u8_downloader\Scripts\activate # Windows安装必要的依赖包pip install flask boto3 requests certifi接下来配置Cloudflare R2的访问凭证。在Cloudflare控制面板中创建R2存储桶后获取以下信息端点URL (Endpoint URL)访问密钥ID (Access Key ID)秘密访问密钥 (Secret Access Key)存储桶名称 (Bucket Name)建议将这些敏感信息存储在环境变量中而非直接硬编码在脚本里import os from flask import Flask app Flask(__name__) app.config[R2_ENDPOINT_URL] os.getenv(R2_ENDPOINT_URL) app.config[R2_ACCESS_KEY_ID] os.getenv(R2_ACCESS_KEY_ID) app.config[R2_SECRET_ACCESS_KEY] os.getenv(R2_SECRET_ACCESS_KEY) app.config[R2_BUCKET_NAME] os.getenv(R2_BUCKET_NAME)2. 构建m3u8下载核心功能m3u8文件本质上是一个播放列表包含了多个.ts分片视频的地址。我们需要先下载m3u8文件解析出所有.ts分片然后逐个下载。首先创建一个下载器类封装HTTP请求逻辑import requests import certifi class VideoDownloader: def __init__(self): self.session requests.Session() self.session.verify certifi.where() def set_headers(self, headers): 设置自定义请求头 self.session.headers.update(headers) def download(self, url, timeout30): 下载文件内容 try: response self.session.get(url, timeouttimeout) response.raise_for_status() return response.content except requests.exceptions.RequestException as e: print(f下载失败: {url}, 错误: {e}) return None接下来实现m3u8解析器import re from urllib.parse import urljoin class M3U8Parser: staticmethod def parse_m3u8(content, base_url): 解析m3u8内容返回所有ts文件URL if not content: return [] text content.decode(utf-8) lines text.split(\n) ts_urls [] for line in lines: line line.strip() if line and not line.startswith(#) and line.endswith(.ts): ts_urls.append(urljoin(base_url, line)) return ts_urls3. 实现本地与云存储双重备份为了确保数据安全我们将同时保存视频文件到本地和Cloudflare R2存储。首先实现本地存储功能import os from pathlib import Path class LocalStorage: staticmethod def save(content, directory, filename): 保存文件到本地 try: Path(directory).mkdir(parentsTrue, exist_okTrue) filepath os.path.join(directory, filename) with open(filepath, wb) as f: f.write(content) return True except Exception as e: print(f本地保存失败: {e}) return False然后是Cloudflare R2存储的实现。由于R2兼容S3 API我们可以使用boto3库import boto3 from botocore.exceptions import ClientError class R2Storage: def __init__(self, app): self.client boto3.client( s3, endpoint_urlapp.config[R2_ENDPOINT_URL], aws_access_key_idapp.config[R2_ACCESS_KEY_ID], aws_secret_access_keyapp.config[R2_SECRET_ACCESS_KEY] ) self.bucket app.config[R2_BUCKET_NAME] def upload(self, content, key): 上传文件到R2 try: self.client.put_object( Bucketself.bucket, Keykey, Bodycontent ) return True except ClientError as e: print(fR2上传失败: {e}) return False4. 构建Flask API与错误处理现在我们将各个模块整合到Flask应用中并添加完善的错误处理机制。首先定义API路由from flask import request, jsonify import tempfile app.route(/api/download, methods[POST]) def download_video(): data request.json m3u8_url data.get(m3u8_url) base_url data.get(base_url) # ts文件的基础URL output_dir data.get(output_dir, downloads) if not m3u8_url or not base_url: return jsonify({error: 缺少必要参数}), 400 downloader VideoDownloader() if headers in data: downloader.set_headers(data[headers]) # 下载m3u8文件 m3u8_content downloader.download(m3u8_url) if not m3u8_content: return jsonify({error: m3u8文件下载失败}), 500 # 解析ts文件列表 ts_urls M3U8Parser.parse_m3u8(m3u8_content, base_url) if not ts_urls: return jsonify({error: 未找到有效的ts文件}), 500 # 保存m3u8文件 LocalStorage.save(m3u8_content, output_dir, index.m3u8) r2_storage R2Storage(app) r2_storage.upload(m3u8_content, f{output_dir}/index.m3u8) # 下载所有ts文件 results [] for ts_url in ts_urls: ts_content downloader.download(ts_url) if ts_content: filename ts_url.split(/)[-1] # 本地保存 LocalStorage.save(ts_content, output_dir, filename) # R2上传 r2_key f{output_dir}/{filename} r2_storage.upload(ts_content, r2_key) results.append({ url: ts_url, filename: filename, status: success }) else: results.append({ url: ts_url, filename: , status: failed }) return jsonify({ message: 下载任务完成, results: results, success_count: len([r for r in results if r[status] success]) })添加必要的错误处理中间件app.errorhandler(404) def not_found(error): return jsonify({error: 资源不存在}), 404 app.errorhandler(500) def internal_error(error): return jsonify({error: 服务器内部错误}), 5005. 性能优化与实用技巧基础功能实现后我们可以进行多方面的优化来提升系统的性能和可靠性。5.1 多线程下载使用Python的concurrent.futures实现多线程下载from concurrent.futures import ThreadPoolExecutor, as_completed def download_all_ts(ts_urls, output_dir, max_workers5): with ThreadPoolExecutor(max_workersmax_workers) as executor: futures [] for ts_url in ts_urls: futures.append(executor.submit( download_single_ts, ts_url, output_dir )) results [] for future in as_completed(futures): results.append(future.result()) return results5.2 断点续传与重试机制实现一个带重试机制的下载函数def download_with_retry(url, max_retries3, timeout30): for attempt in range(max_retries): try: response requests.get(url, timeouttimeout) response.raise_for_status() return response.content except Exception as e: if attempt max_retries - 1: raise time.sleep(2 ** attempt) # 指数退避5.3 进度显示添加下载进度显示功能from tqdm import tqdm def download_with_progress(url, output_dir, filename): response requests.get(url, streamTrue) total_size int(response.headers.get(content-length, 0)) filepath os.path.join(output_dir, filename) with open(filepath, wb) as f, tqdm( descfilename, totaltotal_size, unitB, unit_scaleTrue, unit_divisor1024, ) as bar: for data in response.iter_content(chunk_size1024): f.write(data) bar.update(len(data))6. 部署与监控完成开发后我们需要考虑如何部署应用并监控其运行状态。6.1 使用Gunicorn部署pip install gunicorn gunicorn -w 4 -b :5000 app:app6.2 添加健康检查端点app.route(/health) def health_check(): return jsonify({status: healthy})6.3 日志配置import logging from logging.handlers import RotatingFileHandler handler RotatingFileHandler(app.log, maxBytes10000, backupCount3) handler.setLevel(logging.INFO) app.logger.addHandler(handler)7. 安全加固措施最后我们需要考虑应用的安全性防止常见的安全威胁。7.1 请求速率限制from flask_limiter import Limiter from flask_limiter.util import get_remote_address limiter Limiter( app, key_funcget_remote_address, default_limits[200 per day, 50 per hour] ) app.route(/api/download) limiter.limit(10 per minute) def download_video(): # ...7.2 输入验证def validate_url(url): 验证URL格式 pattern re.compile( r^(https?://)? # http:// or https:// r([a-zA-Z0-9.-]) # domain r(\.[a-zA-Z]{2,63}) # .com, .org etc r(:[0-9]{1,5})? # optional port r(/.*)?$ # optional path ) return bool(pattern.match(url))7.3 敏感信息保护from flask_talisman import Talisman Talisman(app, force_httpsTrue)
用Flask+Python搞定m3u8视频下载与Cloudflare R2上传,保姆级配置避坑指南
FlaskPython实现m3u8视频下载与Cloudflare R2云存储全流程实战当我们需要从网络获取视频资源时m3u8格式的流媒体文件是常见的选择。本文将带你从零开始构建一个完整的Flask应用实现m3u8视频的自动下载、本地存储以及Cloudflare R2云存储的双重备份方案。1. 环境准备与基础配置在开始编码前我们需要确保开发环境已经准备就绪。首先创建一个干净的Python虚拟环境python -m venv m3u8_downloader source m3u8_downloader/bin/activate # Linux/Mac m3u8_downloader\Scripts\activate # Windows安装必要的依赖包pip install flask boto3 requests certifi接下来配置Cloudflare R2的访问凭证。在Cloudflare控制面板中创建R2存储桶后获取以下信息端点URL (Endpoint URL)访问密钥ID (Access Key ID)秘密访问密钥 (Secret Access Key)存储桶名称 (Bucket Name)建议将这些敏感信息存储在环境变量中而非直接硬编码在脚本里import os from flask import Flask app Flask(__name__) app.config[R2_ENDPOINT_URL] os.getenv(R2_ENDPOINT_URL) app.config[R2_ACCESS_KEY_ID] os.getenv(R2_ACCESS_KEY_ID) app.config[R2_SECRET_ACCESS_KEY] os.getenv(R2_SECRET_ACCESS_KEY) app.config[R2_BUCKET_NAME] os.getenv(R2_BUCKET_NAME)2. 构建m3u8下载核心功能m3u8文件本质上是一个播放列表包含了多个.ts分片视频的地址。我们需要先下载m3u8文件解析出所有.ts分片然后逐个下载。首先创建一个下载器类封装HTTP请求逻辑import requests import certifi class VideoDownloader: def __init__(self): self.session requests.Session() self.session.verify certifi.where() def set_headers(self, headers): 设置自定义请求头 self.session.headers.update(headers) def download(self, url, timeout30): 下载文件内容 try: response self.session.get(url, timeouttimeout) response.raise_for_status() return response.content except requests.exceptions.RequestException as e: print(f下载失败: {url}, 错误: {e}) return None接下来实现m3u8解析器import re from urllib.parse import urljoin class M3U8Parser: staticmethod def parse_m3u8(content, base_url): 解析m3u8内容返回所有ts文件URL if not content: return [] text content.decode(utf-8) lines text.split(\n) ts_urls [] for line in lines: line line.strip() if line and not line.startswith(#) and line.endswith(.ts): ts_urls.append(urljoin(base_url, line)) return ts_urls3. 实现本地与云存储双重备份为了确保数据安全我们将同时保存视频文件到本地和Cloudflare R2存储。首先实现本地存储功能import os from pathlib import Path class LocalStorage: staticmethod def save(content, directory, filename): 保存文件到本地 try: Path(directory).mkdir(parentsTrue, exist_okTrue) filepath os.path.join(directory, filename) with open(filepath, wb) as f: f.write(content) return True except Exception as e: print(f本地保存失败: {e}) return False然后是Cloudflare R2存储的实现。由于R2兼容S3 API我们可以使用boto3库import boto3 from botocore.exceptions import ClientError class R2Storage: def __init__(self, app): self.client boto3.client( s3, endpoint_urlapp.config[R2_ENDPOINT_URL], aws_access_key_idapp.config[R2_ACCESS_KEY_ID], aws_secret_access_keyapp.config[R2_SECRET_ACCESS_KEY] ) self.bucket app.config[R2_BUCKET_NAME] def upload(self, content, key): 上传文件到R2 try: self.client.put_object( Bucketself.bucket, Keykey, Bodycontent ) return True except ClientError as e: print(fR2上传失败: {e}) return False4. 构建Flask API与错误处理现在我们将各个模块整合到Flask应用中并添加完善的错误处理机制。首先定义API路由from flask import request, jsonify import tempfile app.route(/api/download, methods[POST]) def download_video(): data request.json m3u8_url data.get(m3u8_url) base_url data.get(base_url) # ts文件的基础URL output_dir data.get(output_dir, downloads) if not m3u8_url or not base_url: return jsonify({error: 缺少必要参数}), 400 downloader VideoDownloader() if headers in data: downloader.set_headers(data[headers]) # 下载m3u8文件 m3u8_content downloader.download(m3u8_url) if not m3u8_content: return jsonify({error: m3u8文件下载失败}), 500 # 解析ts文件列表 ts_urls M3U8Parser.parse_m3u8(m3u8_content, base_url) if not ts_urls: return jsonify({error: 未找到有效的ts文件}), 500 # 保存m3u8文件 LocalStorage.save(m3u8_content, output_dir, index.m3u8) r2_storage R2Storage(app) r2_storage.upload(m3u8_content, f{output_dir}/index.m3u8) # 下载所有ts文件 results [] for ts_url in ts_urls: ts_content downloader.download(ts_url) if ts_content: filename ts_url.split(/)[-1] # 本地保存 LocalStorage.save(ts_content, output_dir, filename) # R2上传 r2_key f{output_dir}/{filename} r2_storage.upload(ts_content, r2_key) results.append({ url: ts_url, filename: filename, status: success }) else: results.append({ url: ts_url, filename: , status: failed }) return jsonify({ message: 下载任务完成, results: results, success_count: len([r for r in results if r[status] success]) })添加必要的错误处理中间件app.errorhandler(404) def not_found(error): return jsonify({error: 资源不存在}), 404 app.errorhandler(500) def internal_error(error): return jsonify({error: 服务器内部错误}), 5005. 性能优化与实用技巧基础功能实现后我们可以进行多方面的优化来提升系统的性能和可靠性。5.1 多线程下载使用Python的concurrent.futures实现多线程下载from concurrent.futures import ThreadPoolExecutor, as_completed def download_all_ts(ts_urls, output_dir, max_workers5): with ThreadPoolExecutor(max_workersmax_workers) as executor: futures [] for ts_url in ts_urls: futures.append(executor.submit( download_single_ts, ts_url, output_dir )) results [] for future in as_completed(futures): results.append(future.result()) return results5.2 断点续传与重试机制实现一个带重试机制的下载函数def download_with_retry(url, max_retries3, timeout30): for attempt in range(max_retries): try: response requests.get(url, timeouttimeout) response.raise_for_status() return response.content except Exception as e: if attempt max_retries - 1: raise time.sleep(2 ** attempt) # 指数退避5.3 进度显示添加下载进度显示功能from tqdm import tqdm def download_with_progress(url, output_dir, filename): response requests.get(url, streamTrue) total_size int(response.headers.get(content-length, 0)) filepath os.path.join(output_dir, filename) with open(filepath, wb) as f, tqdm( descfilename, totaltotal_size, unitB, unit_scaleTrue, unit_divisor1024, ) as bar: for data in response.iter_content(chunk_size1024): f.write(data) bar.update(len(data))6. 部署与监控完成开发后我们需要考虑如何部署应用并监控其运行状态。6.1 使用Gunicorn部署pip install gunicorn gunicorn -w 4 -b :5000 app:app6.2 添加健康检查端点app.route(/health) def health_check(): return jsonify({status: healthy})6.3 日志配置import logging from logging.handlers import RotatingFileHandler handler RotatingFileHandler(app.log, maxBytes10000, backupCount3) handler.setLevel(logging.INFO) app.logger.addHandler(handler)7. 安全加固措施最后我们需要考虑应用的安全性防止常见的安全威胁。7.1 请求速率限制from flask_limiter import Limiter from flask_limiter.util import get_remote_address limiter Limiter( app, key_funcget_remote_address, default_limits[200 per day, 50 per hour] ) app.route(/api/download) limiter.limit(10 per minute) def download_video(): # ...7.2 输入验证def validate_url(url): 验证URL格式 pattern re.compile( r^(https?://)? # http:// or https:// r([a-zA-Z0-9.-]) # domain r(\.[a-zA-Z]{2,63}) # .com, .org etc r(:[0-9]{1,5})? # optional port r(/.*)?$ # optional path ) return bool(pattern.match(url))7.3 敏感信息保护from flask_talisman import Talisman Talisman(app, force_httpsTrue)