自动化测试(十一) 事件驱动测试-Kafka-RabbitMQ消息组件测试

自动化测试(十一) 事件驱动测试-Kafka-RabbitMQ消息组件测试 事件驱动测试Kafka/RabbitMQ消息组件测试微服务架构下服务间通信不只是HTTP调用消息队列Kafka/RabbitMQ也是重要一环。今天咱们聊聊怎么测试基于消息的系统——这比接口测试更复杂因为引入了异步和最终一致性。一、消息系统的测试挑战假设订单服务创建订单后发送一个OrderCreatedEvent到Kafka库存服务消费这个消息来扣减库存┌─────────────┐ OrderCreatedEvent ┌─────────────┐ │ 订单服务 │ ──────────────────────── │ 库存服务 │ │ order-svc │ (Kafka) │ stock-svc │ └─────────────┘ └─────────────┘测试难点难点说明异步性消息不是即时到达怎么断言最终结果顺序性消息顺序是否重要乱序消费怎么办重复消费消息可能重复投递业务逻辑幂等吗死信队列消费失败的消息去哪了怎么处理分区并行Kafka多分区消费怎么保证测试确定性二、Kafka 测试方案方案1嵌入式 KafkaSpring Kafka Test适合单元测试和集成测试不需要Docker。dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka-test/artifactIdscopetest/scope/dependencySpringBootTestEmbeddedKafka(partitions1,// 单分区保证顺序topics{order-events},// 自动创建topicbrokerProperties{listenersPLAINTEXT://localhost:9092,port9092})classOrderEventProducerTest{AutowiredOrderEventProducerproducer;AutowiredEmbeddedKafkaBrokerembeddedKafka;TestDisplayName(订单创建后发送事件到Kafka)voidshouldSendOrderCreatedEvent(){// GivenOrderCreatedEventeventnewOrderCreatedEvent(100L,ITEM-001,2,1L);// Whenproducer.send(event);// Then: 消费验证ConsumerString,StringconsumercreateConsumer();embeddedKafka.consumeFromAnEmbeddedTopic(consumer,order-events);ConsumerRecordString,StringrecordKafkaTestUtils.getSingleRecord(consumer,order-events,Duration.ofSeconds(5));assertThat(record).isNotNull();assertThat(record.key()).isEqualTo(100);OrderCreatedEventreceivedobjectMapper.readValue(record.value(),OrderCreatedEvent.class);assertThat(received.getOrderId()).isEqualTo(100L);assertThat(received.getSku()).isEqualTo(ITEM-001);consumer.close();}privateConsumerString,StringcreateConsumer(){MapString,ObjectpropsKafkaTestUtils.consumerProps(test-group,true,embeddedKafka);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,earliest);returnnewDefaultKafkaConsumerFactory(props,newStringDeserializer(),newStringDeserializer()).createConsumer();}}方案2Testcontainers Kafka更接近真实环境SpringBootTestTestcontainersclassOrderEventIntegrationTest{ContainerstaticKafkaContainerkafkanewKafkaContainer(DockerImageName.parse(confluentinc/cp-kafka:7.5.0));DynamicPropertySourcestaticvoidconfigureKafka(BootstrapRegistryregistry){registry.add(spring.kafka.bootstrap-servers,kafka::getBootstrapServers);}AutowiredOrderServiceorderService;AutowiredKafkaTemplateString,StringkafkaTemplate;TestDisplayName(创建订单触发库存扣减端到端)voidshouldDeductStockWhenOrderCreated()throwsException{// Given: 准备一个消费者来验证库存服务收到的消息ConsumerString,StringconsumercreateConsumer(stock-service-group);consumer.subscribe(List.of(order-events));// When: 创建订单OrderResultresultorderService.createOrder(newCreateOrderRequest(ITEM-001,2));assertThat(result.isSuccess()).isTrue();// Then: 验证消息被发送最多等5秒ConsumerRecordsString,Stringrecordsconsumer.poll(Duration.ofSeconds(5));assertThat(records.count()).isEqualTo(1);ConsumerRecordString,Stringrecordrecords.iterator().next();OrderCreatedEventeventobjectMapper.readValue(record.value(),OrderCreatedEvent.class);assertThat(event.getOrderId()).isEqualTo(result.getOrderId());assertThat(event.getEventType()).isEqualTo(ORDER_CREATED);consumer.close();}}方案3Kafka 消费者测试SpringBootTestEmbeddedKafka(topicsorder-events)classStockEventConsumerTest{AutowiredStockEventConsumerconsumer;AutowiredStockServicestockService;SpyBean// 部分Mock监控调用StockEventConsumerconsumerSpy;TestDisplayName(消费订单创建事件扣减库存)voidshouldDeductStockOnOrderCreated(){// GivenOrderCreatedEventeventnewOrderCreatedEvent(100L,ITEM-001,2,1L);ConsumerRecordString,StringrecordnewConsumerRecord(order-events,0,0L,100,toJson(event));// When: 直接调用消费者方法模拟Kafka投递consumer.onOrderCreated(record);// Thenverify(stockService).deduct(ITEM-001,2);}TestDisplayName(重复消费同一事件只扣减一次库存幂等性)voidshouldBeIdempotent(){OrderCreatedEventeventnewOrderCreatedEvent(100L,ITEM-001,2,1L);ConsumerRecordString,StringrecordnewConsumerRecord(order-events,0,0L,100,toJson(event));// 消费两次模拟重复投递consumer.onOrderCreated(record);consumer.onOrderCreated(record);// 只扣减一次verify(stockService,times(1)).deduct(ITEM-001,2);}TestDisplayName(库存不足时消息进入死信队列)voidshouldSendToDLQWhenInsufficientStock(){when(stockService.deduct(any(),any())).thenThrow(newInsufficientStockException());OrderCreatedEventeventnewOrderCreatedEvent(100L,ITEM-001,999,1L);ConsumerRecordString,StringrecordnewConsumerRecord(order-events,0,0L,100,toJson(event));// 消费失败assertThatThrownBy(()-consumer.onOrderCreated(record)).isInstanceOf(InsufficientStockException.class);// 验证重试后进入死信队列// ... 验证DLQ topic收到消息}}三、RabbitMQ 测试方案Testcontainers RabbitMQdependencygroupIdorg.testcontainers/groupIdartifactIdrabbitmq/artifactIdscopetest/scope/dependencySpringBootTestTestcontainersclassRabbitMQOrderEventTest{ContainerstaticRabbitMQContainerrabbitMQnewRabbitMQContainer(DockerImageName.parse(rabbitmq:3-alpine));DynamicPropertySourcestaticvoidconfigureProperties(DynamicPropertyRegistryregistry){registry.add(spring.rabbitmq.host,rabbitMQ::getHost);registry.add(spring.rabbitmq.port,rabbitMQ::getAmqpPort);registry.add(spring.rabbitmq.username,rabbitMQ::getAdminUsername);registry.add(spring.rabbitmq.password,rabbitMQ::getAdminPassword);}AutowiredRabbitTemplaterabbitTemplate;AutowiredOrderEventPublisherpublisher;TestDisplayName(发送订单事件到RabbitMQ)voidshouldPublishOrderEvent(){// GivenOrderCreatedEventeventnewOrderCreatedEvent(100L,ITEM-001,2,1L);// Whenpublisher.publish(event);// Then: 直接从队列取消息验证MessagemessagerabbitTemplate.receive(order-events-queue,5000);assertThat(message).isNotNull();OrderCreatedEventreceivedobjectMapper.readValue(newString(message.getBody()),OrderCreatedEvent.class);assertThat(received.getOrderId()).isEqualTo(100L);}}RabbitMQ 消费者测试SpringBootTestclassStockRabbitConsumerTest{AutowiredStockRabbitConsumerconsumer;AutowiredStockServicestockService;TestDisplayName(消费订单消息扣减库存)voidshouldConsumeAndDeduct(){OrderCreatedEventeventnewOrderCreatedEvent(100L,ITEM-001,2,1L);MessagemessageMessageBuilder.withBody(toJson(event).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();// 直接调用RabbitListener方法consumer.handleOrderCreated(message);verify(stockService).deduct(ITEM-001,2);}}四、异步等待策略消息测试最头疼的是什么时候断言。方案1Awaitility推荐dependencygroupIdorg.awaitility/groupIdartifactIdawaitility/artifactIdversion4.2.0/versionscopetest/scope/dependencyTestDisplayName(订单创建后库存最终会被扣减)voidshouldEventuallyDeductStock(){// 创建订单orderService.createOrder(newCreateOrderRequest(ITEM-001,2));// 等待最多5秒直到库存被扣减await().atMost(Duration.ofSeconds(5)).pollInterval(Duration.ofMillis(200)).untilAsserted(()-{StockstockstockRepository.findBySku(ITEM-001);assertThat(stock.getAvailableQuantity()).isEqualTo(98);// 原100-2});}方案2CountDownLatch精确控制TestDisplayName(批量订单并发创建消息顺序消费)voidshouldConsumeInOrder()throwsInterruptedException{intorderCount10;CountDownLatchlatchnewCountDownLatch(orderCount);ListLongreceivedOrderIdsCollections.synchronizedList(newArrayList());// 设置消费者回调consumer.setMessageHandler(event-{receivedOrderIds.add(event.getOrderId());latch.countDown();});// 发送10个订单for(inti0;iorderCount;i){orderService.createOrder(newCreateOrderRequest(ITEM-001,1));}// 等待所有消息被消费booleanallConsumedlatch.await(10,TimeUnit.SECONDS);assertThat(allConsumed).isTrue();// 验证顺序assertThat(receivedOrderIds).hasSize(orderCount);assertThat(receivedOrderIds).isSorted();// 按顺序消费}五、消息测试的黄金法则原则说明生产者测试验证消息格式正确、发送到正确topic/queue消费者测试验证收到消息后业务逻辑正确、幂等性端到端测试验证消息从发送到消费的全链路死信测试验证消费失败时的降级和补偿机制性能测试验证高吞吐量下的消费能力和延迟六、小结今天咱们聊了事件驱动系统的测试工具场景特点EmbeddedKafkaKafka单元/集成测试轻量、快速、单分区保序Testcontainers Kafka接近真实的Kafka测试多分区、集群特性Testcontainers RabbitMQRabbitMQ测试完整AMQP特性Awaitility异步断言优雅等待条件满足CountDownLatch精确控制并发适合验证消息数量一句话总结消息测试的核心是异步等待——用Awaitility等工具优雅处理最终一致性同时别忘了验证幂等性和死信处理。