1. 项目概述为什么我们需要为Z-Image-Turbo编写UI压力测试脚本最近在负责一个图像处理服务的性能保障工作这个服务内部代号叫Z-Image-Turbo。它本质上是一个提供图像压缩、格式转换、智能裁剪等功能的API服务前端会有一个管理后台UI界面来配置任务、查看处理队列和结果。随着用户量和并发处理请求的激增我们遇到了一个典型问题在管理后台进行批量操作时页面响应变慢甚至偶尔出现超时或操作失败。前端的同事反馈说按钮点了没反应后端的监控却显示API接口的响应时间和成功率都在正常范围内。这就引出了一个关键问题问题可能出在UI层与后端服务交互的链路上而不仅仅是单个API的性能。用户通过浏览器点击一个“批量压缩”按钮这个动作会触发一系列复杂的异步请求提交任务、轮询状态、获取进度、下载结果。传统的API压力测试工具比如JMeter虽然能模拟单个接口的并发调用但很难完整复现这种带有状态依赖、顺序逻辑的“用户操作流”。我们需要一种方法能够像真实用户一样自动化地、高并发地操作那个管理后台UI从而发现整个应用链路包括前端渲染、网络请求、后端异步任务处理、WebSocket推送等在压力下的真实表现。这就是“Z-Image-Turbo自动化测试模拟UI请求的压力测试脚本”项目的由来。它的核心目标不是替代JMeter而是补全测试场景。我们要写一个脚本这个脚本能自动登录管理后台模拟用户执行一系列UI操作比如连续提交10个图像处理任务并且能够以多线程或分布式的方式运行制造出高并发访问的压力从而评估整个系统的健壮性、响应能力和资源消耗情况。这比单纯压测一个/api/compress接口要有价值得多因为它更贴近真实业务场景。2. 核心思路与方案选型从“点”到“面”的压力模拟在设计这个压力测试方案时我首先梳理了需要模拟的核心用户操作流这直接决定了脚本的复杂度和技术选型。对于Z-Image-Turbo的管理后台一个典型的压力场景可能是“多个管理员同时上传并处理一批图片”。这个流程可以拆解为登录系统。进入“任务创建”页面。选择本地图片文件。设置处理参数如压缩质量、目标格式。提交任务获取任务ID。定期轮询或通过WebSocket监听该任务ID的处理状态。任务完成后模拟点击下载链接。这个流程涉及到了表单操作、文件上传、异步状态查询等多个环节。基于这个需求我评估了几个主流方案方案一基于Selenium/Playwright的浏览器自动化这是最直观的方案用代码控制一个真实的浏览器如Chrome去点击、输入。它的优点是能100%还原用户操作包括执行前端JavaScript、处理Cookie和Session。Playwright相比老牌的Selenium在API设计、执行速度和浏览器上下文隔离方面更有优势。但它的缺点也很明显资源消耗巨大。每个虚拟用户VU都需要启动一个浏览器实例对于需要模拟上百并发用户的压力测试来说对测试机内存和CPU是严峻考验而且运行速度相对较慢。方案二基于Requests等库直接模拟HTTP请求这种方案是“抓包”思路。我们用浏览器正常操作一遍通过开发者工具的网络面板Network记录下每个操作触发的HTTP请求包括URL、方法、Headers、Body。然后用Python的requests库或Node.js的axios库直接编写代码去发送这些请求。它的优点是轻量、高效一个进程就能模拟成百上千的并发非常适合压力测试。但挑战在于现代Web应用大量使用动态Token如CSRF token、JWT、复杂的会话管理和可能的前端加密直接模拟请求需要仔细处理这些认证和防伪机制有时甚至需要先执行一些请求来获取必要的Token。方案三混合模式本项目最终选择经过权衡我选择了混合模式作为本项目的技术基底。核心压力生成部分采用方案二直接模拟HTTP请求以保证并发效率和资源可控。但对于登录等涉及复杂前端交互或初始会话建立的环节则采用方案一Playwright来辅助获取关键凭证。例如可以用Playwright脚本自动登录一次获取登录后的Cookie和Session ID然后将这些凭证提供给后续大量并发的高效requests脚本使用。这样既保证了脚本的健壮性能处理复杂的登录逻辑又保证了压力测试阶段的高性能。工具栈最终确定如下Python 3.8: 作为主开发语言生态丰富requests,aiohttp等库非常适合网络请求。Playwright for Python: 用于处理复杂的、需要浏览器环境的前端交互主要是首次登录和获取Token。Requests / Aiohttp:requests用于编写清晰、易调试的同步请求逻辑aiohttp用于编写高性能的异步并发压力测试脚本。Pandas (可选): 用于管理测试用例数据比如从CSV文件中读取要上传的图片路径列表。Locust / JMeter (作为对比与补充): 虽然本项目核心是自研脚本但我们可以用Locust这个基于Python的压测框架来快速封装我们的测试逻辑它自带分布式压测和Web UI监控非常方便。JMeter则可以作为基准对比工具验证我们自研脚本产生的压力是否准确。注意选择直接模拟HTTP请求进行压测意味着你需要对你的Web应用的前后端交互协议有深入的理解。你必须清楚地知道每个按钮点击背后发送了什么请求服务器返回了什么以及会话是如何维持的。这是一个“白盒”测试思路虽然有一定学习成本但带来的控制力和效率提升是巨大的。3. 脚本核心模块设计与实现拆解一个健壮的压力测试脚本不能把所有逻辑都堆在一个文件里。为了提高可维护性和复用性我将脚本拆分成几个核心模块。3.1 认证与会话管理模块这是脚本的基石如果认证失败所有后续请求都无效。对于Z-Image-Turbo其管理后台采用常见的Cookie-Session或JWT认证。1. 登录凭证获取使用Playwright我们首先编写一个独立的auth_helper.py用Playwright实现自动登录并提取凭证。# auth_helper.py import asyncio from playwright.async_api import async_playwright import json class AuthHelper: def __init__(self, login_url, username, password): self.login_url login_url self.username username self.password self._decrypt_password(password) # 简单示例密码建议从环境变量读取 self.cookies None self.token None async def login_and_get_auth(self): 使用Playwright模拟登录并获取cookies或token async with async_playwright() as p: # 建议使用无头模式但调试时可设为False browser await p.chromium.launch(headlessTrue) context await browser.new_context() page await context.new_page() try: await page.goto(self.login_url) # 假设登录表单的input选择器为#username和#password await page.fill(#username, self.username) await page.fill(#password, self.password) await page.click(button[typesubmit]) # 等待登录成功后的跳转或某个元素出现 await page.wait_for_selector(#dashboard, timeout10000) # 假设登录后会出现id为dashboard的元素 # 方案A获取Cookies (适用于Session-Cookie认证) self.cookies {cookie[name]: cookie[value] for cookie in await context.cookies()} print(f获取到Cookies: {list(self.cookies.keys())}) # 方案B获取Token (适用于JWT假设Token在localStorage或某个API响应中) # 例如从localStorage获取 # token await page.evaluate(() localStorage.getItem(auth_token)) # self.token token except Exception as e: print(f登录失败: {e}) # 这里可以截图保存便于调试 await page.screenshot(pathlogin_failure.png) raise e finally: await browser.close() return self.cookies or self.token def _decrypt_password(self, encrypted_pwd): # 简单的解密逻辑实际项目中应从安全配置读取 return encrypted_pwd # 此处简化 # 使用示例 async def main(): helper AuthHelper(https://admin.z-image-turbo.test/login, admin, your_encrypted_password) auth_data await helper.login_and_get_auth() # 将auth_data保存到文件或环境变量供压力测试脚本使用 with open(auth_data.json, w) as f: json.dump(auth_data, f) if __name__ __main__: asyncio.run(main())2. 请求会话构造使用Requests压力测试主脚本pressure_test.py会读取上面保存的认证信息并构建一个持久化的会话Session这个会话会自动管理Cookies。# pressure_test.py 片段 import requests import json class TurboPressureTester: def __init__(self, base_url): self.base_url base_url self.session requests.Session() self._load_auth() def _load_auth(self): 从文件加载认证信息并设置到session中 try: with open(auth_data.json, r) as f: auth_data json.load(f) # 如果是cookies if isinstance(auth_data, dict): # 将cookies添加到session的cookies jar中 for name, value in auth_data.items(): self.session.cookies.set(name, value, domain.z-image-turbo.test) # 注意domain print(Cookies加载成功。) # 如果是token则设置请求头 elif isinstance(auth_data, str): self.session.headers.update({Authorization: fBearer {auth_data}}) print(Token已设置到请求头。) except FileNotFoundError: print(未找到认证文件请先运行auth_helper.py登录。) exit(1)3.2 核心业务流程模拟模块这个模块对应具体的UI操作。每个操作封装成一个函数内部使用配置好的self.session去发送请求。1. 创建图像处理任务这是最核心的操作通常是一个POST请求包含表单数据和文件。# pressure_test.py 继续 class TurboPressureTester: # ... __init__ 等代码 ... def create_processing_task(self, image_path, quality85, output_formatwebp): 模拟UI上的“创建任务”操作 :param image_path: 本地图片文件路径 :param quality: 压缩质量 (1-100) :param output_format: 输出格式如 webp, jpg, png :return: 任务ID url f{self.base_url}/api/tasks # 构建multipart/form-data数据模拟表单上传 files { image: (image.jpg, open(image_path, rb), image/jpeg) # 文件名可根据实际修改 } data { quality: quality, format: output_format, # 可能还有其他参数如width, height, crop_strategy等 } try: # 关键使用session.post它会自动携带认证cookies response self.session.post(url, filesfiles, datadata, timeout30) response.raise_for_status() # 如果状态码不是200-399抛出HTTPError result response.json() task_id result.get(data, {}).get(taskId) if not task_id: raise ValueError(f响应中未找到taskId: {result}) print(f任务创建成功ID: {task_id}) return task_id except requests.exceptions.RequestException as e: print(f创建任务请求失败: {e}) if hasattr(e, response) and e.response is not None: print(f错误响应: {e.response.text}) return None finally: if image in files: files[image][1].close() # 记得关闭文件2. 查询任务状态任务提交后是异步处理的我们需要定期轮询状态。def get_task_status(self, task_id): 轮询任务状态 url f{self.base_url}/api/tasks/{task_id}/status try: response self.session.get(url, timeout10) response.raise_for_status() result response.json() status result.get(data, {}).get(status) # 如 PENDING, PROCESSING, SUCCESS, FAILED progress result.get(data, {}).get(progress, 0) # 进度百分比 return status, progress except requests.exceptions.RequestException as e: print(f查询任务{task_id}状态失败: {e}) return ERROR, 03. 模拟下载结果可选当任务状态为SUCCESS时模拟点击下载按钮。def download_result(self, task_id, save_path): 下载处理后的图片 url f{self.base_url}/api/tasks/{task_id}/result try: # streamTrue用于下载大文件 with self.session.get(url, streamTrue, timeout60) as response: response.raise_for_status() with open(save_path, wb) as f: for chunk in response.iter_content(chunk_size8192): f.write(chunk) print(f任务{task_id}结果已下载至: {save_path}) return True except requests.exceptions.RequestException as e: print(f下载任务{task_id}结果失败: {e}) return False3.3 并发压力生成与调度模块单个请求的模拟不是压力测试。我们需要一个机制来并发执行上述业务流程。这里提供两种模式多线程ThreadPoolExecutor和异步IOasyncio aiohttp。对于IO密集型的HTTP请求测试异步模式效率更高。1. 多线程模式示例使用concurrent.futures库思路简单清晰。import concurrent.futures import time import random def worker(tester, image_path, user_id): 单个虚拟用户VU的执行逻辑 print(fVU-{user_id}: 开始执行任务流) # 可以在这里加入随机思考时间模拟真实用户 # time.sleep(random.uniform(0.5, 2.0)) task_id tester.create_processing_task(image_path) if not task_id: print(fVU-{user_id}: 任务创建失败) return # 轮询状态最多轮询10次每次间隔2秒 for i in range(10): status, progress tester.get_task_status(task_id) print(fVU-{user_id}: 任务{task_id} 状态[{status}] 进度[{progress}%]) if status SUCCESS: # tester.download_result(task_id, f./results/{task_id}.webp) print(fVU-{user_id}: 任务完成) break elif status FAILED: print(fVU-{user_id}: 任务失败) break time.sleep(2) # 轮询间隔 else: print(fVU-{user_id}: 任务{task_id} 轮询超时) def run_thread_pressure_test(base_url, image_paths, max_workers10): 启动多线程压力测试 # 为每个虚拟用户创建一个Tester实例重要避免Session共享导致状态混乱 testers [TurboPressureTester(base_url) for _ in range(len(image_paths))] with concurrent.futures.ThreadPoolExecutor(max_workersmax_workers) as executor: # 提交任务 future_to_user { executor.submit(worker, testers[i], image_paths[i], i): i for i in range(len(image_paths)) } # 等待所有任务完成 for future in concurrent.futures.as_completed(future_to_user): user_id future_to_user[future] try: future.result() # 获取结果如果有异常会在这里抛出 except Exception as exc: print(fVU-{user_id} 执行过程中产生异常: {exc})2. 异步模式示例推荐用于高性能压测使用aiohttp和asyncio。注意requests库是同步的在异步环境中使用会阻塞事件循环因此我们需要将核心的HTTP请求逻辑也用aiohttp重写或者使用aiohttp重写整个Tester类。这里展示一个使用aiohttp的异步Worker概念。# async_pressure_test.py import aiohttp import asyncio import aiofiles class AsyncTurboTester: def __init__(self, base_url, cookies): self.base_url base_url self.cookies cookies # 注意aiohttp的ClientSession通常作为上下文管理器在单个函数内使用或在类中统一管理。 # 这里为了简化我们在每个方法中创建。生产环境应考虑会话复用和连接池。 async def create_task(self, session, image_path, quality85): url f{self.base_url}/api/tasks # 异步读取文件 async with aiofiles.open(image_path, rb) as f: file_data await f.read() data aiohttp.FormData() data.add_field(image, file_data, filenamepressure_test.jpg, content_typeimage/jpeg) data.add_field(quality, str(quality)) async with session.post(url, datadata, cookiesself.cookies) as response: result await response.json() return result.get(data, {}).get(taskId) async def worker(self, session, image_path, user_id): 异步的单个用户逻辑 print(fAsync-VU-{user_id}: start) task_id await self.create_task(session, image_path) if task_id: # 异步轮询 for _ in range(10): status, progress await self.get_task_status(session, task_id) if status SUCCESS: break await asyncio.sleep(2) print(fAsync-VU-{user_id}: end) async def main_async(): base_url https://admin.z-image-turbo.test # 假设我们已经有了cookies (可以从之前的Playwright脚本获取) cookies {session_id: your_session_value} image_paths [./test1.jpg, ./test2.jpg, ./test3.jpg] * 10 # 模拟30个任务 # 创建一个全局的aiohttp会话连接池可以复用 async with aiohttp.ClientSession(cookiescookies) as session: tester AsyncTurboTester(base_url, cookies) tasks [] for i, img_path in enumerate(image_paths): # 创建30个并发任务 task asyncio.create_task(tester.worker(session, img_path, i)) tasks.append(task) # 等待所有并发任务完成 await asyncio.gather(*tasks) # 运行 # asyncio.run(main_async())实操心得Session隔离是关键。在多线程或异步并发场景下绝对不能让所有虚拟用户共享同一个requests.Session或aiohttp.ClientSession对象。因为Session内部会维护连接池和Cookie jar共享会导致用户间的Cookie污染和连接竞争测试结果完全失真。正确的做法是为每个虚拟用户或每批用户创建独立的Session实例。在上面的多线程例子中我们预先创建了len(image_paths)个Tester实例每个实例有自己的Session就是这个道理。4. 测试场景构造与数据驱动一个有效的压力测试需要有代表性的测试数据和场景。我们的脚本应该支持从外部文件如CSV、JSON读取测试参数实现数据驱动测试。1. 准备测试数据文件 (test_data.csv):image_path,quality,format,expected_size_kb ./samples/photo1.jpg,90,webp,150 ./samples/photo2.png,80,jpg,200 ./samples/photo3.bmp,95,png,8002. 在脚本中读取并驱动测试import csv def load_test_cases(csv_path): cases [] with open(csv_path, newline, encodingutf-8) as csvfile: reader csv.DictReader(csvfile) for row in reader: cases.append(row) return cases def run_data_driven_test(base_url, csv_path, concurrent_users5): test_cases load_test_cases(csv_path) image_paths [case[image_path] for case in test_cases] # 使用多线程或异步模式运行传入image_paths run_thread_pressure_test(base_url, image_paths, max_workersconcurrent_users) # 或 asyncio.run(main_async(...))3. 构造复杂场景真实的用户操作不是简单的“创建-轮询”。我们可以设计更复杂的场景比如混合场景70%的用户只上传一张小图30%的用户上传多张大图。峰值场景先以低并发运行1分钟然后突然在10秒内将并发用户数提升到峰值持续2分钟再缓慢下降。稳定性场景以恒定的并发用户数如50个持续运行数小时观察系统内存、CPU是否有泄漏响应时间是否稳定。要实现这些场景需要在调度模块中加入更复杂的逻辑比如使用time模块控制节奏或者使用更专业的压测框架如Locust的场景设置功能。5. 结果收集、监控与可视化压力测试不能光跑完就算了我们需要收集数据并进行分析。关键指标包括吞吐量 (Throughput)每秒完成的请求数RPS或事务数TPS。响应时间 (Response Time)平均响应时间、P9595%的请求在此时间内完成、P99响应时间。错误率 (Error Rate)失败请求数 / 总请求数。资源利用率服务器端的CPU、内存、磁盘I/O、网络I/O这通常需要服务器监控工具配合如PrometheusGrafana。1. 在脚本中集成简易指标收集我们可以在每个请求函数中加入计时和状态记录。import time from dataclasses import dataclass from typing import List import statistics dataclass class RequestMetric: endpoint: str status_code: int response_time_ms: float timestamp: float class MetricsCollector: def __init__(self): self.metrics: List[RequestMetric] [] def record(self, endpoint, status_code, response_time_ms): self.metrics.append(RequestMetric(endpoint, status_code, response_time_ms, time.time())) def generate_report(self): if not self.metrics: return No metrics collected. success_metrics [m for m in self.metrics if 200 m.status_code 300] error_count len(self.metrics) - len(success_metrics) error_rate error_count / len(self.metrics) * 100 if self.metrics else 0 response_times [m.response_time_ms for m in self.metrics] avg_rt statistics.mean(response_times) if response_times else 0 p95_rt statistics.quantiles(response_times, n20)[18] if len(response_times) 20 else 0 # 计算P95近似值 report f 压力测试报告 总请求数: {len(self.metrics)} 成功请求数: {len(success_metrics)} 错误请求数: {error_count} 错误率: {error_rate:.2f}% 平均响应时间: {avg_rt:.2f} ms P95响应时间: {p95_rt:.2f} ms return report # 在TurboPressureTester中集成 class TurboPressureTester: def __init__(self, base_url, collector: MetricsCollector): self.base_url base_url self.session requests.Session() self.collector collector self._load_auth() def create_processing_task(self, image_path, quality85, output_formatwebp): url f{self.base_url}/api/tasks files {...} data {...} start_time time.time() try: response self.session.post(url, filesfiles, datadata, timeout30) response_time_ms (time.time() - start_time) * 1000 # 记录指标 self.collector.record(/api/tasks, response.status_code, response_time_ms) response.raise_for_status() # ... 后续处理 except requests.exceptions.RequestException as e: # 记录错误例如超时或网络错误可设status_code为0或-1 response_time_ms (time.time() - start_time) * 1000 self.collector.record(/api/tasks, 0, response_time_ms) # ... 后续处理2. 与专业监控系统集成对于长期、大型的压力测试建议将数据发送到专业的监控系统如InfluxDB时序数据库结合Grafana可视化。可以在脚本中每次记录指标时通过InfluxDB的客户端库如influxdb-client-python将数据点写入。这样就能在Grafana上实时看到漂亮的压力测试仪表盘观察曲线变化。6. 常见问题、踩坑记录与排查技巧在实际编写和运行这个脚本的过程中我遇到了不少坑这里总结一下希望能帮你绕过。问题1登录成功但后续API请求返回401/403未授权。原因这是最常见的问题。可能的原因有Cookie作用域Domain/Path不匹配Playwright获取的Cookie的domain可能是admin.z-image-turbo.test而你用requests访问的API域名是api.z-image-turbo.test。Cookie默认不跨域发送。Token过期或失效JWT Token可能有很短的有效期或者服务器端有额外的验证机制如Token必须与登录IP绑定。缺少必要的请求头除了Cookie/Authorization头API可能还需要X-CSRF-TOKEN、X-Requested-With等头信息。排查用浏览器正常操作在开发者工具的Network面板里仔细对比你的脚本发送的请求和浏览器发送的请求。逐字对比URL、Method、Headers尤其是Cookie、Authorization、Content-Type以及其他自定义头、Request Body。检查服务器返回的登录响应除了Set-Cookie是否在响应Body里返回了Token。在脚本中打印出你实际发送的请求头和Cookie与浏览器抓到的进行比对。问题2文件上传失败服务器返回“无效的文件格式”或“文件损坏”。原因模拟文件上传时multipart/form-data的格式构造不正确或者文件读取方式有问题。排查与解决确保files参数字典的构造正确。requests库的files参数期望的格式是{field_name: (filename, fileobj, content_type)}。filename很重要有些后端会根据后缀名判断文件类型。使用aiohttp时FormData的添加方式要正确。可以先用一个简单的、确定能工作的文件比如一个小文本文件测试上传接口排除文件本身和路径的问题。对比浏览器上传时网络请求中“Payload”标签下的具体内容看boundary和每个part的headers是否一致。问题3高并发下错误率飙升但低并发时正常。原因服务器连接数或线程池耗尽后端服务如Nginx、Tomcat、应用服务器有最大连接数限制。数据库连接池耗尽应用连接数据库的连接池设置过小。资源竞争如图像处理服务同时处理的文件数达到上限或者临时目录磁盘空间不足。脚本自身问题测试机网络带宽或端口数被占满或者脚本没有正确管理Session/连接导致本地资源耗尽。排查监控服务器资源在压测时实时观察服务器的CPU、内存、磁盘IO、网络IO以及应用日志、数据库连接数。查看错误日志分析服务器返回的错误信息是“Connection refused”、“Gateway Timeout”还是“500 Internal Server Error”。梯度增加并发数从1个用户开始逐步增加到5、10、20、50...观察错误率开始显著上升的拐点这个点可能就是系统的瓶颈所在。检查脚本配置确保使用了连接池requests.Session或aiohttp.ClientSession会复用连接并合理设置超时时间避免大量请求因超时堆积。问题4如何模拟更真实的“用户思考时间”方案在脚本的每个操作步骤之间比如登录后、提交任务前加入随机的等待时间time.sleep(random.uniform(1, 5))。这可以避免所有请求在同一时刻爆发式到达服务器使测试结果更贴近真实场景。Locust等框架内置了wait_time功能可以很方便地配置。问题5测试结果数据如何分析和呈现基础分析像上面MetricsCollector那样计算基本的聚合指标。进阶分析将每次请求的详细指标时间戳、端点、响应时间、状态码写入CSV或数据库。然后用Python的pandas和matplotlib库进行分析和绘图比如绘制响应时间随时间变化的折线图或者绘制响应时间的分布直方图。黄金标准集成到CI/CD流水线中每次压测后自动生成报告并与历史基准进行对比如果核心指标如P95响应时间、错误率出现退化则自动标记构建失败。最后这个脚本的价值不仅仅在于发现性能瓶颈。在每次Z-Image-Turbo发布新版本前跑一遍这个压力测试可以作为回归测试的一部分确保新的代码修改没有引入性能回退。它从一个用户操作流程的完整视角为系统的稳定性和可靠性提供了另一重坚实保障。
基于HTTP请求模拟的Web应用UI压力测试实战:从原理到Z-Image-Turbo案例
1. 项目概述为什么我们需要为Z-Image-Turbo编写UI压力测试脚本最近在负责一个图像处理服务的性能保障工作这个服务内部代号叫Z-Image-Turbo。它本质上是一个提供图像压缩、格式转换、智能裁剪等功能的API服务前端会有一个管理后台UI界面来配置任务、查看处理队列和结果。随着用户量和并发处理请求的激增我们遇到了一个典型问题在管理后台进行批量操作时页面响应变慢甚至偶尔出现超时或操作失败。前端的同事反馈说按钮点了没反应后端的监控却显示API接口的响应时间和成功率都在正常范围内。这就引出了一个关键问题问题可能出在UI层与后端服务交互的链路上而不仅仅是单个API的性能。用户通过浏览器点击一个“批量压缩”按钮这个动作会触发一系列复杂的异步请求提交任务、轮询状态、获取进度、下载结果。传统的API压力测试工具比如JMeter虽然能模拟单个接口的并发调用但很难完整复现这种带有状态依赖、顺序逻辑的“用户操作流”。我们需要一种方法能够像真实用户一样自动化地、高并发地操作那个管理后台UI从而发现整个应用链路包括前端渲染、网络请求、后端异步任务处理、WebSocket推送等在压力下的真实表现。这就是“Z-Image-Turbo自动化测试模拟UI请求的压力测试脚本”项目的由来。它的核心目标不是替代JMeter而是补全测试场景。我们要写一个脚本这个脚本能自动登录管理后台模拟用户执行一系列UI操作比如连续提交10个图像处理任务并且能够以多线程或分布式的方式运行制造出高并发访问的压力从而评估整个系统的健壮性、响应能力和资源消耗情况。这比单纯压测一个/api/compress接口要有价值得多因为它更贴近真实业务场景。2. 核心思路与方案选型从“点”到“面”的压力模拟在设计这个压力测试方案时我首先梳理了需要模拟的核心用户操作流这直接决定了脚本的复杂度和技术选型。对于Z-Image-Turbo的管理后台一个典型的压力场景可能是“多个管理员同时上传并处理一批图片”。这个流程可以拆解为登录系统。进入“任务创建”页面。选择本地图片文件。设置处理参数如压缩质量、目标格式。提交任务获取任务ID。定期轮询或通过WebSocket监听该任务ID的处理状态。任务完成后模拟点击下载链接。这个流程涉及到了表单操作、文件上传、异步状态查询等多个环节。基于这个需求我评估了几个主流方案方案一基于Selenium/Playwright的浏览器自动化这是最直观的方案用代码控制一个真实的浏览器如Chrome去点击、输入。它的优点是能100%还原用户操作包括执行前端JavaScript、处理Cookie和Session。Playwright相比老牌的Selenium在API设计、执行速度和浏览器上下文隔离方面更有优势。但它的缺点也很明显资源消耗巨大。每个虚拟用户VU都需要启动一个浏览器实例对于需要模拟上百并发用户的压力测试来说对测试机内存和CPU是严峻考验而且运行速度相对较慢。方案二基于Requests等库直接模拟HTTP请求这种方案是“抓包”思路。我们用浏览器正常操作一遍通过开发者工具的网络面板Network记录下每个操作触发的HTTP请求包括URL、方法、Headers、Body。然后用Python的requests库或Node.js的axios库直接编写代码去发送这些请求。它的优点是轻量、高效一个进程就能模拟成百上千的并发非常适合压力测试。但挑战在于现代Web应用大量使用动态Token如CSRF token、JWT、复杂的会话管理和可能的前端加密直接模拟请求需要仔细处理这些认证和防伪机制有时甚至需要先执行一些请求来获取必要的Token。方案三混合模式本项目最终选择经过权衡我选择了混合模式作为本项目的技术基底。核心压力生成部分采用方案二直接模拟HTTP请求以保证并发效率和资源可控。但对于登录等涉及复杂前端交互或初始会话建立的环节则采用方案一Playwright来辅助获取关键凭证。例如可以用Playwright脚本自动登录一次获取登录后的Cookie和Session ID然后将这些凭证提供给后续大量并发的高效requests脚本使用。这样既保证了脚本的健壮性能处理复杂的登录逻辑又保证了压力测试阶段的高性能。工具栈最终确定如下Python 3.8: 作为主开发语言生态丰富requests,aiohttp等库非常适合网络请求。Playwright for Python: 用于处理复杂的、需要浏览器环境的前端交互主要是首次登录和获取Token。Requests / Aiohttp:requests用于编写清晰、易调试的同步请求逻辑aiohttp用于编写高性能的异步并发压力测试脚本。Pandas (可选): 用于管理测试用例数据比如从CSV文件中读取要上传的图片路径列表。Locust / JMeter (作为对比与补充): 虽然本项目核心是自研脚本但我们可以用Locust这个基于Python的压测框架来快速封装我们的测试逻辑它自带分布式压测和Web UI监控非常方便。JMeter则可以作为基准对比工具验证我们自研脚本产生的压力是否准确。注意选择直接模拟HTTP请求进行压测意味着你需要对你的Web应用的前后端交互协议有深入的理解。你必须清楚地知道每个按钮点击背后发送了什么请求服务器返回了什么以及会话是如何维持的。这是一个“白盒”测试思路虽然有一定学习成本但带来的控制力和效率提升是巨大的。3. 脚本核心模块设计与实现拆解一个健壮的压力测试脚本不能把所有逻辑都堆在一个文件里。为了提高可维护性和复用性我将脚本拆分成几个核心模块。3.1 认证与会话管理模块这是脚本的基石如果认证失败所有后续请求都无效。对于Z-Image-Turbo其管理后台采用常见的Cookie-Session或JWT认证。1. 登录凭证获取使用Playwright我们首先编写一个独立的auth_helper.py用Playwright实现自动登录并提取凭证。# auth_helper.py import asyncio from playwright.async_api import async_playwright import json class AuthHelper: def __init__(self, login_url, username, password): self.login_url login_url self.username username self.password self._decrypt_password(password) # 简单示例密码建议从环境变量读取 self.cookies None self.token None async def login_and_get_auth(self): 使用Playwright模拟登录并获取cookies或token async with async_playwright() as p: # 建议使用无头模式但调试时可设为False browser await p.chromium.launch(headlessTrue) context await browser.new_context() page await context.new_page() try: await page.goto(self.login_url) # 假设登录表单的input选择器为#username和#password await page.fill(#username, self.username) await page.fill(#password, self.password) await page.click(button[typesubmit]) # 等待登录成功后的跳转或某个元素出现 await page.wait_for_selector(#dashboard, timeout10000) # 假设登录后会出现id为dashboard的元素 # 方案A获取Cookies (适用于Session-Cookie认证) self.cookies {cookie[name]: cookie[value] for cookie in await context.cookies()} print(f获取到Cookies: {list(self.cookies.keys())}) # 方案B获取Token (适用于JWT假设Token在localStorage或某个API响应中) # 例如从localStorage获取 # token await page.evaluate(() localStorage.getItem(auth_token)) # self.token token except Exception as e: print(f登录失败: {e}) # 这里可以截图保存便于调试 await page.screenshot(pathlogin_failure.png) raise e finally: await browser.close() return self.cookies or self.token def _decrypt_password(self, encrypted_pwd): # 简单的解密逻辑实际项目中应从安全配置读取 return encrypted_pwd # 此处简化 # 使用示例 async def main(): helper AuthHelper(https://admin.z-image-turbo.test/login, admin, your_encrypted_password) auth_data await helper.login_and_get_auth() # 将auth_data保存到文件或环境变量供压力测试脚本使用 with open(auth_data.json, w) as f: json.dump(auth_data, f) if __name__ __main__: asyncio.run(main())2. 请求会话构造使用Requests压力测试主脚本pressure_test.py会读取上面保存的认证信息并构建一个持久化的会话Session这个会话会自动管理Cookies。# pressure_test.py 片段 import requests import json class TurboPressureTester: def __init__(self, base_url): self.base_url base_url self.session requests.Session() self._load_auth() def _load_auth(self): 从文件加载认证信息并设置到session中 try: with open(auth_data.json, r) as f: auth_data json.load(f) # 如果是cookies if isinstance(auth_data, dict): # 将cookies添加到session的cookies jar中 for name, value in auth_data.items(): self.session.cookies.set(name, value, domain.z-image-turbo.test) # 注意domain print(Cookies加载成功。) # 如果是token则设置请求头 elif isinstance(auth_data, str): self.session.headers.update({Authorization: fBearer {auth_data}}) print(Token已设置到请求头。) except FileNotFoundError: print(未找到认证文件请先运行auth_helper.py登录。) exit(1)3.2 核心业务流程模拟模块这个模块对应具体的UI操作。每个操作封装成一个函数内部使用配置好的self.session去发送请求。1. 创建图像处理任务这是最核心的操作通常是一个POST请求包含表单数据和文件。# pressure_test.py 继续 class TurboPressureTester: # ... __init__ 等代码 ... def create_processing_task(self, image_path, quality85, output_formatwebp): 模拟UI上的“创建任务”操作 :param image_path: 本地图片文件路径 :param quality: 压缩质量 (1-100) :param output_format: 输出格式如 webp, jpg, png :return: 任务ID url f{self.base_url}/api/tasks # 构建multipart/form-data数据模拟表单上传 files { image: (image.jpg, open(image_path, rb), image/jpeg) # 文件名可根据实际修改 } data { quality: quality, format: output_format, # 可能还有其他参数如width, height, crop_strategy等 } try: # 关键使用session.post它会自动携带认证cookies response self.session.post(url, filesfiles, datadata, timeout30) response.raise_for_status() # 如果状态码不是200-399抛出HTTPError result response.json() task_id result.get(data, {}).get(taskId) if not task_id: raise ValueError(f响应中未找到taskId: {result}) print(f任务创建成功ID: {task_id}) return task_id except requests.exceptions.RequestException as e: print(f创建任务请求失败: {e}) if hasattr(e, response) and e.response is not None: print(f错误响应: {e.response.text}) return None finally: if image in files: files[image][1].close() # 记得关闭文件2. 查询任务状态任务提交后是异步处理的我们需要定期轮询状态。def get_task_status(self, task_id): 轮询任务状态 url f{self.base_url}/api/tasks/{task_id}/status try: response self.session.get(url, timeout10) response.raise_for_status() result response.json() status result.get(data, {}).get(status) # 如 PENDING, PROCESSING, SUCCESS, FAILED progress result.get(data, {}).get(progress, 0) # 进度百分比 return status, progress except requests.exceptions.RequestException as e: print(f查询任务{task_id}状态失败: {e}) return ERROR, 03. 模拟下载结果可选当任务状态为SUCCESS时模拟点击下载按钮。def download_result(self, task_id, save_path): 下载处理后的图片 url f{self.base_url}/api/tasks/{task_id}/result try: # streamTrue用于下载大文件 with self.session.get(url, streamTrue, timeout60) as response: response.raise_for_status() with open(save_path, wb) as f: for chunk in response.iter_content(chunk_size8192): f.write(chunk) print(f任务{task_id}结果已下载至: {save_path}) return True except requests.exceptions.RequestException as e: print(f下载任务{task_id}结果失败: {e}) return False3.3 并发压力生成与调度模块单个请求的模拟不是压力测试。我们需要一个机制来并发执行上述业务流程。这里提供两种模式多线程ThreadPoolExecutor和异步IOasyncio aiohttp。对于IO密集型的HTTP请求测试异步模式效率更高。1. 多线程模式示例使用concurrent.futures库思路简单清晰。import concurrent.futures import time import random def worker(tester, image_path, user_id): 单个虚拟用户VU的执行逻辑 print(fVU-{user_id}: 开始执行任务流) # 可以在这里加入随机思考时间模拟真实用户 # time.sleep(random.uniform(0.5, 2.0)) task_id tester.create_processing_task(image_path) if not task_id: print(fVU-{user_id}: 任务创建失败) return # 轮询状态最多轮询10次每次间隔2秒 for i in range(10): status, progress tester.get_task_status(task_id) print(fVU-{user_id}: 任务{task_id} 状态[{status}] 进度[{progress}%]) if status SUCCESS: # tester.download_result(task_id, f./results/{task_id}.webp) print(fVU-{user_id}: 任务完成) break elif status FAILED: print(fVU-{user_id}: 任务失败) break time.sleep(2) # 轮询间隔 else: print(fVU-{user_id}: 任务{task_id} 轮询超时) def run_thread_pressure_test(base_url, image_paths, max_workers10): 启动多线程压力测试 # 为每个虚拟用户创建一个Tester实例重要避免Session共享导致状态混乱 testers [TurboPressureTester(base_url) for _ in range(len(image_paths))] with concurrent.futures.ThreadPoolExecutor(max_workersmax_workers) as executor: # 提交任务 future_to_user { executor.submit(worker, testers[i], image_paths[i], i): i for i in range(len(image_paths)) } # 等待所有任务完成 for future in concurrent.futures.as_completed(future_to_user): user_id future_to_user[future] try: future.result() # 获取结果如果有异常会在这里抛出 except Exception as exc: print(fVU-{user_id} 执行过程中产生异常: {exc})2. 异步模式示例推荐用于高性能压测使用aiohttp和asyncio。注意requests库是同步的在异步环境中使用会阻塞事件循环因此我们需要将核心的HTTP请求逻辑也用aiohttp重写或者使用aiohttp重写整个Tester类。这里展示一个使用aiohttp的异步Worker概念。# async_pressure_test.py import aiohttp import asyncio import aiofiles class AsyncTurboTester: def __init__(self, base_url, cookies): self.base_url base_url self.cookies cookies # 注意aiohttp的ClientSession通常作为上下文管理器在单个函数内使用或在类中统一管理。 # 这里为了简化我们在每个方法中创建。生产环境应考虑会话复用和连接池。 async def create_task(self, session, image_path, quality85): url f{self.base_url}/api/tasks # 异步读取文件 async with aiofiles.open(image_path, rb) as f: file_data await f.read() data aiohttp.FormData() data.add_field(image, file_data, filenamepressure_test.jpg, content_typeimage/jpeg) data.add_field(quality, str(quality)) async with session.post(url, datadata, cookiesself.cookies) as response: result await response.json() return result.get(data, {}).get(taskId) async def worker(self, session, image_path, user_id): 异步的单个用户逻辑 print(fAsync-VU-{user_id}: start) task_id await self.create_task(session, image_path) if task_id: # 异步轮询 for _ in range(10): status, progress await self.get_task_status(session, task_id) if status SUCCESS: break await asyncio.sleep(2) print(fAsync-VU-{user_id}: end) async def main_async(): base_url https://admin.z-image-turbo.test # 假设我们已经有了cookies (可以从之前的Playwright脚本获取) cookies {session_id: your_session_value} image_paths [./test1.jpg, ./test2.jpg, ./test3.jpg] * 10 # 模拟30个任务 # 创建一个全局的aiohttp会话连接池可以复用 async with aiohttp.ClientSession(cookiescookies) as session: tester AsyncTurboTester(base_url, cookies) tasks [] for i, img_path in enumerate(image_paths): # 创建30个并发任务 task asyncio.create_task(tester.worker(session, img_path, i)) tasks.append(task) # 等待所有并发任务完成 await asyncio.gather(*tasks) # 运行 # asyncio.run(main_async())实操心得Session隔离是关键。在多线程或异步并发场景下绝对不能让所有虚拟用户共享同一个requests.Session或aiohttp.ClientSession对象。因为Session内部会维护连接池和Cookie jar共享会导致用户间的Cookie污染和连接竞争测试结果完全失真。正确的做法是为每个虚拟用户或每批用户创建独立的Session实例。在上面的多线程例子中我们预先创建了len(image_paths)个Tester实例每个实例有自己的Session就是这个道理。4. 测试场景构造与数据驱动一个有效的压力测试需要有代表性的测试数据和场景。我们的脚本应该支持从外部文件如CSV、JSON读取测试参数实现数据驱动测试。1. 准备测试数据文件 (test_data.csv):image_path,quality,format,expected_size_kb ./samples/photo1.jpg,90,webp,150 ./samples/photo2.png,80,jpg,200 ./samples/photo3.bmp,95,png,8002. 在脚本中读取并驱动测试import csv def load_test_cases(csv_path): cases [] with open(csv_path, newline, encodingutf-8) as csvfile: reader csv.DictReader(csvfile) for row in reader: cases.append(row) return cases def run_data_driven_test(base_url, csv_path, concurrent_users5): test_cases load_test_cases(csv_path) image_paths [case[image_path] for case in test_cases] # 使用多线程或异步模式运行传入image_paths run_thread_pressure_test(base_url, image_paths, max_workersconcurrent_users) # 或 asyncio.run(main_async(...))3. 构造复杂场景真实的用户操作不是简单的“创建-轮询”。我们可以设计更复杂的场景比如混合场景70%的用户只上传一张小图30%的用户上传多张大图。峰值场景先以低并发运行1分钟然后突然在10秒内将并发用户数提升到峰值持续2分钟再缓慢下降。稳定性场景以恒定的并发用户数如50个持续运行数小时观察系统内存、CPU是否有泄漏响应时间是否稳定。要实现这些场景需要在调度模块中加入更复杂的逻辑比如使用time模块控制节奏或者使用更专业的压测框架如Locust的场景设置功能。5. 结果收集、监控与可视化压力测试不能光跑完就算了我们需要收集数据并进行分析。关键指标包括吞吐量 (Throughput)每秒完成的请求数RPS或事务数TPS。响应时间 (Response Time)平均响应时间、P9595%的请求在此时间内完成、P99响应时间。错误率 (Error Rate)失败请求数 / 总请求数。资源利用率服务器端的CPU、内存、磁盘I/O、网络I/O这通常需要服务器监控工具配合如PrometheusGrafana。1. 在脚本中集成简易指标收集我们可以在每个请求函数中加入计时和状态记录。import time from dataclasses import dataclass from typing import List import statistics dataclass class RequestMetric: endpoint: str status_code: int response_time_ms: float timestamp: float class MetricsCollector: def __init__(self): self.metrics: List[RequestMetric] [] def record(self, endpoint, status_code, response_time_ms): self.metrics.append(RequestMetric(endpoint, status_code, response_time_ms, time.time())) def generate_report(self): if not self.metrics: return No metrics collected. success_metrics [m for m in self.metrics if 200 m.status_code 300] error_count len(self.metrics) - len(success_metrics) error_rate error_count / len(self.metrics) * 100 if self.metrics else 0 response_times [m.response_time_ms for m in self.metrics] avg_rt statistics.mean(response_times) if response_times else 0 p95_rt statistics.quantiles(response_times, n20)[18] if len(response_times) 20 else 0 # 计算P95近似值 report f 压力测试报告 总请求数: {len(self.metrics)} 成功请求数: {len(success_metrics)} 错误请求数: {error_count} 错误率: {error_rate:.2f}% 平均响应时间: {avg_rt:.2f} ms P95响应时间: {p95_rt:.2f} ms return report # 在TurboPressureTester中集成 class TurboPressureTester: def __init__(self, base_url, collector: MetricsCollector): self.base_url base_url self.session requests.Session() self.collector collector self._load_auth() def create_processing_task(self, image_path, quality85, output_formatwebp): url f{self.base_url}/api/tasks files {...} data {...} start_time time.time() try: response self.session.post(url, filesfiles, datadata, timeout30) response_time_ms (time.time() - start_time) * 1000 # 记录指标 self.collector.record(/api/tasks, response.status_code, response_time_ms) response.raise_for_status() # ... 后续处理 except requests.exceptions.RequestException as e: # 记录错误例如超时或网络错误可设status_code为0或-1 response_time_ms (time.time() - start_time) * 1000 self.collector.record(/api/tasks, 0, response_time_ms) # ... 后续处理2. 与专业监控系统集成对于长期、大型的压力测试建议将数据发送到专业的监控系统如InfluxDB时序数据库结合Grafana可视化。可以在脚本中每次记录指标时通过InfluxDB的客户端库如influxdb-client-python将数据点写入。这样就能在Grafana上实时看到漂亮的压力测试仪表盘观察曲线变化。6. 常见问题、踩坑记录与排查技巧在实际编写和运行这个脚本的过程中我遇到了不少坑这里总结一下希望能帮你绕过。问题1登录成功但后续API请求返回401/403未授权。原因这是最常见的问题。可能的原因有Cookie作用域Domain/Path不匹配Playwright获取的Cookie的domain可能是admin.z-image-turbo.test而你用requests访问的API域名是api.z-image-turbo.test。Cookie默认不跨域发送。Token过期或失效JWT Token可能有很短的有效期或者服务器端有额外的验证机制如Token必须与登录IP绑定。缺少必要的请求头除了Cookie/Authorization头API可能还需要X-CSRF-TOKEN、X-Requested-With等头信息。排查用浏览器正常操作在开发者工具的Network面板里仔细对比你的脚本发送的请求和浏览器发送的请求。逐字对比URL、Method、Headers尤其是Cookie、Authorization、Content-Type以及其他自定义头、Request Body。检查服务器返回的登录响应除了Set-Cookie是否在响应Body里返回了Token。在脚本中打印出你实际发送的请求头和Cookie与浏览器抓到的进行比对。问题2文件上传失败服务器返回“无效的文件格式”或“文件损坏”。原因模拟文件上传时multipart/form-data的格式构造不正确或者文件读取方式有问题。排查与解决确保files参数字典的构造正确。requests库的files参数期望的格式是{field_name: (filename, fileobj, content_type)}。filename很重要有些后端会根据后缀名判断文件类型。使用aiohttp时FormData的添加方式要正确。可以先用一个简单的、确定能工作的文件比如一个小文本文件测试上传接口排除文件本身和路径的问题。对比浏览器上传时网络请求中“Payload”标签下的具体内容看boundary和每个part的headers是否一致。问题3高并发下错误率飙升但低并发时正常。原因服务器连接数或线程池耗尽后端服务如Nginx、Tomcat、应用服务器有最大连接数限制。数据库连接池耗尽应用连接数据库的连接池设置过小。资源竞争如图像处理服务同时处理的文件数达到上限或者临时目录磁盘空间不足。脚本自身问题测试机网络带宽或端口数被占满或者脚本没有正确管理Session/连接导致本地资源耗尽。排查监控服务器资源在压测时实时观察服务器的CPU、内存、磁盘IO、网络IO以及应用日志、数据库连接数。查看错误日志分析服务器返回的错误信息是“Connection refused”、“Gateway Timeout”还是“500 Internal Server Error”。梯度增加并发数从1个用户开始逐步增加到5、10、20、50...观察错误率开始显著上升的拐点这个点可能就是系统的瓶颈所在。检查脚本配置确保使用了连接池requests.Session或aiohttp.ClientSession会复用连接并合理设置超时时间避免大量请求因超时堆积。问题4如何模拟更真实的“用户思考时间”方案在脚本的每个操作步骤之间比如登录后、提交任务前加入随机的等待时间time.sleep(random.uniform(1, 5))。这可以避免所有请求在同一时刻爆发式到达服务器使测试结果更贴近真实场景。Locust等框架内置了wait_time功能可以很方便地配置。问题5测试结果数据如何分析和呈现基础分析像上面MetricsCollector那样计算基本的聚合指标。进阶分析将每次请求的详细指标时间戳、端点、响应时间、状态码写入CSV或数据库。然后用Python的pandas和matplotlib库进行分析和绘图比如绘制响应时间随时间变化的折线图或者绘制响应时间的分布直方图。黄金标准集成到CI/CD流水线中每次压测后自动生成报告并与历史基准进行对比如果核心指标如P95响应时间、错误率出现退化则自动标记构建失败。最后这个脚本的价值不仅仅在于发现性能瓶颈。在每次Z-Image-Turbo发布新版本前跑一遍这个压力测试可以作为回归测试的一部分确保新的代码修改没有引入性能回退。它从一个用户操作流程的完整视角为系统的稳定性和可靠性提供了另一重坚实保障。