1. 风云卫星数据下载的痛点与自动化需求每次手动下载风云卫星数据时你是不是也经历过这样的折磨先要登录网站然后在密密麻麻的列表里一个个找文件复制链接切换下载工具等待完成后再重复这个过程。特别是当需要下载几十甚至上百个数据文件时这种重复劳动简直让人崩溃。我最近帮一个气象研究所做项目时就遇到了这个问题。他们需要下载过去5年的风云卫星数据做气候分析总量超过200GB。如果手动操作估计得花上好几天时间。更糟心的是风云卫星数据官网最近改版了虽然界面更美观了但下载路径依然繁琐而且同时提供HTTP和FTP两种下载方式手动操作更加麻烦。这就是为什么我们需要一个自动化下载脚本。通过Python脚本我们可以实现一键批量下载把几天的工作量压缩到几小时内完成。更重要的是这个脚本可以重复使用以后需要更新数据时只需要运行一下脚本就能搞定。2. 基础脚本搭建与功能实现2.1 环境准备与项目结构首先我们需要准备一个简单的项目结构。创建一个项目文件夹里面包含两个关键内容一个名为target的文件夹用于存放下载文件列表一个Python脚本文件比如download_all.py建议使用Python 3.6或更高版本并安装必要的依赖库pip install requests2.2 核心下载功能实现脚本的核心是处理两种下载方式HTTP和FTP。我们先来看HTTP下载的实现def download_via_http(url): filename os.path.basename(url) filename filename.split(?)[0] # 处理可能存在的查询参数 response requests.get(url, streamTrue) if response.status_code 200: with open(filename, wb) as file: for chunk in response.iter_content(chunk_size8192): file.write(chunk) print(fDownloaded via HTTP: {filename})FTP下载稍微复杂一些需要处理认证信息def download_via_ftp(url, username, password): parsed_url urlparse(url) host url.split(/)[0] path parsed_url.path filename os.path.basename(path) with ftplib.FTP(host) as ftp: ftp.login(userusername, passwdpassword) with open(filename, wb) as file: ftp.retrbinary(fRETR {filename}, file.write) print(fDownloaded via FTP: {filename})2.3 文件列表处理我们需要一个函数来处理包含下载链接的文本文件def process_file(file_path): with open(file_path, r) as file: lines file.readlines() for line in lines: line line.strip() if line.startswith(ftp://): parts line.split() username_password parts[0].replace(ftp://, ).split(:) username username_password[0] password username_password[1] ftp_url parts[1] download_via_ftp(ftp_url, username, password) elif line.startswith(http://) or line.startswith(https://): download_via_http(line)3. 针对新版网站结构的适配优化3.1 解析新版网页结构风云卫星数据网站改版后下载链接的获取方式可能发生了变化。我们需要分析新网页的HTML结构找到数据文件链接的规律。通常这些链接会有特定的class或id我们可以使用BeautifulSoup来解析from bs4 import BeautifulSoup import re def extract_links_from_html(html_content): soup BeautifulSoup(html_content, html.parser) links [] # 查找包含数据文件链接的元素 for a in soup.find_all(a, hrefTrue): href a[href] if re.match(r.*\.(hdf|nc|tif)$, href, re.IGNORECASE): links.append(href) return links3.2 处理登录与会话保持很多数据网站需要登录才能下载。我们可以使用requests的Session对象来保持登录状态def login_to_website(username, password): session requests.Session() login_url https://example.com/login # 替换为实际登录地址 payload { username: username, password: password } response session.post(login_url, datapayload) if response.status_code 200: print(登录成功) return session else: print(登录失败) return None4. 性能优化多线程与异步下载4.1 多线程下载实现当需要下载大量文件时单线程下载效率太低。我们可以使用Python的concurrent.futures模块实现多线程下载from concurrent.futures import ThreadPoolExecutor def download_with_threads(file_list, max_workers5): with ThreadPoolExecutor(max_workersmax_workers) as executor: for file_info in file_list: if file_info[type] http: executor.submit(download_via_http, file_info[url]) elif file_info[type] ftp: executor.submit( download_via_ftp, file_info[url], file_info[username], file_info[password] )4.2 异步IO实现更高效率对于IO密集型任务异步IO能提供更好的性能。我们可以使用aiohttp和asyncio来实现import aiohttp import asyncio async def async_download_http(session, url): filename os.path.basename(url) async with session.get(url) as response: if response.status 200: with open(filename, wb) as f: while True: chunk await response.content.read(8192) if not chunk: break f.write(chunk) print(fDownloaded: {filename}) async def main_async(file_list): connector aiohttp.TCPConnector(limit10) # 限制并发连接数 async with aiohttp.ClientSession(connectorconnector) as session: tasks [] for file_info in file_list: if file_info[type] http: task async_download_http(session, file_info[url]) tasks.append(task) await asyncio.gather(*tasks)5. 实用技巧与注意事项5.1 断点续传实现大文件下载可能会中途中断我们可以实现断点续传功能def download_with_resume(url, filename): headers {} if os.path.exists(filename): downloaded os.path.getsize(filename) headers {Range: fbytes{downloaded}-} response requests.get(url, headersheaders, streamTrue) mode ab if headers else wb with open(filename, mode) as file: for chunk in response.iter_content(chunk_size8192): file.write(chunk)5.2 错误处理与重试机制网络下载难免会遇到各种错误我们需要添加重试机制from tenacity import retry, stop_after_attempt, wait_exponential retry(stopstop_after_attempt(3), waitwait_exponential(multiplier1, min4, max10)) def robust_download(url): response requests.get(url, streamTrue) if response.status_code ! 200: raise Exception(f下载失败状态码{response.status_code}) filename os.path.basename(url) with open(filename, wb) as file: for chunk in response.iter_content(chunk_size8192): file.write(chunk) return filename5.3 下载进度显示对于大文件下载显示进度条能提升用户体验from tqdm import tqdm def download_with_progress(url, filename): response requests.get(url, streamTrue) total_size int(response.headers.get(content-length, 0)) with open(filename, wb) as file, tqdm( descfilename, totaltotal_size, unitiB, unit_scaleTrue, unit_divisor1024, ) as bar: for chunk in response.iter_content(chunk_size8192): size file.write(chunk) bar.update(size)在实际项目中我发现这些优化措施能显著提升下载效率。特别是当需要下载数百个文件时多线程或异步下载可以将总时间从数小时缩短到几分钟。不过要注意服务器端的并发限制避免被封IP。建议开始时设置较小的并发数如5个线程然后根据实际情况逐步调整。
风云卫星数据自动化下载:Python脚本实战与优化
1. 风云卫星数据下载的痛点与自动化需求每次手动下载风云卫星数据时你是不是也经历过这样的折磨先要登录网站然后在密密麻麻的列表里一个个找文件复制链接切换下载工具等待完成后再重复这个过程。特别是当需要下载几十甚至上百个数据文件时这种重复劳动简直让人崩溃。我最近帮一个气象研究所做项目时就遇到了这个问题。他们需要下载过去5年的风云卫星数据做气候分析总量超过200GB。如果手动操作估计得花上好几天时间。更糟心的是风云卫星数据官网最近改版了虽然界面更美观了但下载路径依然繁琐而且同时提供HTTP和FTP两种下载方式手动操作更加麻烦。这就是为什么我们需要一个自动化下载脚本。通过Python脚本我们可以实现一键批量下载把几天的工作量压缩到几小时内完成。更重要的是这个脚本可以重复使用以后需要更新数据时只需要运行一下脚本就能搞定。2. 基础脚本搭建与功能实现2.1 环境准备与项目结构首先我们需要准备一个简单的项目结构。创建一个项目文件夹里面包含两个关键内容一个名为target的文件夹用于存放下载文件列表一个Python脚本文件比如download_all.py建议使用Python 3.6或更高版本并安装必要的依赖库pip install requests2.2 核心下载功能实现脚本的核心是处理两种下载方式HTTP和FTP。我们先来看HTTP下载的实现def download_via_http(url): filename os.path.basename(url) filename filename.split(?)[0] # 处理可能存在的查询参数 response requests.get(url, streamTrue) if response.status_code 200: with open(filename, wb) as file: for chunk in response.iter_content(chunk_size8192): file.write(chunk) print(fDownloaded via HTTP: {filename})FTP下载稍微复杂一些需要处理认证信息def download_via_ftp(url, username, password): parsed_url urlparse(url) host url.split(/)[0] path parsed_url.path filename os.path.basename(path) with ftplib.FTP(host) as ftp: ftp.login(userusername, passwdpassword) with open(filename, wb) as file: ftp.retrbinary(fRETR {filename}, file.write) print(fDownloaded via FTP: {filename})2.3 文件列表处理我们需要一个函数来处理包含下载链接的文本文件def process_file(file_path): with open(file_path, r) as file: lines file.readlines() for line in lines: line line.strip() if line.startswith(ftp://): parts line.split() username_password parts[0].replace(ftp://, ).split(:) username username_password[0] password username_password[1] ftp_url parts[1] download_via_ftp(ftp_url, username, password) elif line.startswith(http://) or line.startswith(https://): download_via_http(line)3. 针对新版网站结构的适配优化3.1 解析新版网页结构风云卫星数据网站改版后下载链接的获取方式可能发生了变化。我们需要分析新网页的HTML结构找到数据文件链接的规律。通常这些链接会有特定的class或id我们可以使用BeautifulSoup来解析from bs4 import BeautifulSoup import re def extract_links_from_html(html_content): soup BeautifulSoup(html_content, html.parser) links [] # 查找包含数据文件链接的元素 for a in soup.find_all(a, hrefTrue): href a[href] if re.match(r.*\.(hdf|nc|tif)$, href, re.IGNORECASE): links.append(href) return links3.2 处理登录与会话保持很多数据网站需要登录才能下载。我们可以使用requests的Session对象来保持登录状态def login_to_website(username, password): session requests.Session() login_url https://example.com/login # 替换为实际登录地址 payload { username: username, password: password } response session.post(login_url, datapayload) if response.status_code 200: print(登录成功) return session else: print(登录失败) return None4. 性能优化多线程与异步下载4.1 多线程下载实现当需要下载大量文件时单线程下载效率太低。我们可以使用Python的concurrent.futures模块实现多线程下载from concurrent.futures import ThreadPoolExecutor def download_with_threads(file_list, max_workers5): with ThreadPoolExecutor(max_workersmax_workers) as executor: for file_info in file_list: if file_info[type] http: executor.submit(download_via_http, file_info[url]) elif file_info[type] ftp: executor.submit( download_via_ftp, file_info[url], file_info[username], file_info[password] )4.2 异步IO实现更高效率对于IO密集型任务异步IO能提供更好的性能。我们可以使用aiohttp和asyncio来实现import aiohttp import asyncio async def async_download_http(session, url): filename os.path.basename(url) async with session.get(url) as response: if response.status 200: with open(filename, wb) as f: while True: chunk await response.content.read(8192) if not chunk: break f.write(chunk) print(fDownloaded: {filename}) async def main_async(file_list): connector aiohttp.TCPConnector(limit10) # 限制并发连接数 async with aiohttp.ClientSession(connectorconnector) as session: tasks [] for file_info in file_list: if file_info[type] http: task async_download_http(session, file_info[url]) tasks.append(task) await asyncio.gather(*tasks)5. 实用技巧与注意事项5.1 断点续传实现大文件下载可能会中途中断我们可以实现断点续传功能def download_with_resume(url, filename): headers {} if os.path.exists(filename): downloaded os.path.getsize(filename) headers {Range: fbytes{downloaded}-} response requests.get(url, headersheaders, streamTrue) mode ab if headers else wb with open(filename, mode) as file: for chunk in response.iter_content(chunk_size8192): file.write(chunk)5.2 错误处理与重试机制网络下载难免会遇到各种错误我们需要添加重试机制from tenacity import retry, stop_after_attempt, wait_exponential retry(stopstop_after_attempt(3), waitwait_exponential(multiplier1, min4, max10)) def robust_download(url): response requests.get(url, streamTrue) if response.status_code ! 200: raise Exception(f下载失败状态码{response.status_code}) filename os.path.basename(url) with open(filename, wb) as file: for chunk in response.iter_content(chunk_size8192): file.write(chunk) return filename5.3 下载进度显示对于大文件下载显示进度条能提升用户体验from tqdm import tqdm def download_with_progress(url, filename): response requests.get(url, streamTrue) total_size int(response.headers.get(content-length, 0)) with open(filename, wb) as file, tqdm( descfilename, totaltotal_size, unitiB, unit_scaleTrue, unit_divisor1024, ) as bar: for chunk in response.iter_content(chunk_size8192): size file.write(chunk) bar.update(size)在实际项目中我发现这些优化措施能显著提升下载效率。特别是当需要下载数百个文件时多线程或异步下载可以将总时间从数小时缩短到几分钟。不过要注意服务器端的并发限制避免被封IP。建议开始时设置较小的并发数如5个线程然后根据实际情况逐步调整。