Python微服务通信模式

Python微服务通信模式 Python微服务通信模式实战 ——微服务间通信是分布式系统的核心挑战涉及同步、异步和服务发现等模式[1] 同步通信REST API (httpx)同步通信简单直观适合低延迟、实时性要求高的场景。httpx库提供了现代化的HTTP客户端。import httpxfrom typing import Dict, Any, Optionalclass OrderServiceClient:通过REST API同步调用订单服务def __init__(self, base_url: str):self.client httpx.Client(base_urlbase_url, timeout10.0)def create_order(self, order_data: Dict[str, Any]) - Dict[str, Any]:创建订单(POST请求)返回服务端响应response self.client.post(/api/orders, jsonorder_data)response.raise_for_status() # 非2xx状态码抛出异常return response.json()def get_order(self, order_id: str) - Optional[Dict[str, Any]]:查询订单(GET请求)不存在时返回Noneresponse self.client.get(f/api/orders/{order_id})if response.status_code 404:return Noneresponse.raise_for_status()return response.json()def close(self):关闭HTTP客户端释放连接池资源self.client.close()[2] 异步通信消息队列 (Pika RabbitMQ)异步通信通过消息代理解耦服务适合处理突发流量和长耗时任务。import pikaimport jsonfrom typing import Callableclass MessageBroker:基于RabbitMQ的异步消息代理封装def __init__(self, host: str localhost):self.connection pika.BlockingConnection(pika.ConnectionParameters(hosthost))self.channel self.connection.channel()def declare_queue(self, queue_name: str, durable: bool True):声明队列。durableTrue保证RabbitMQ重启后队列不丢失self.channel.queue_declare(queuequeue_name, durabledurable)def publish(self, queue: str, message: dict):发布消息到指定队列self.channel.basic_publish(exchange,routing_keyqueue,bodyjson.dumps(message).encode(utf-8),propertiespika.BasicProperties(delivery_mode2), # 持久化消息)def consume(self, queue: str, callback: Callable):注册消费者。callback接收(ch, method, properties, body)self.channel.basic_consume(queuequeue, on_message_callbackcallback)self.channel.start_consuming() # 阻塞等待消息def close(self):关闭连接self.connection.close()[3] gRPC 通信gRPC基于Protocol Buffers性能高、支持双向流适合微服务间内部通信。# gRPC需要先定义proto文件然后生成Python代码# 假设已生成 order_pb2 和 order_pb2_grpc 模块import grpcfrom concurrent import futuresclass OrderGrpcClient:gRPC客户端通过一元RPC调用服务端方法def __init__(self, target: str localhost:50051):self.channel grpc.insecure_channel(target)# self.stub order_pb2_grpc.OrderServiceStub(self.channel)def get_order(self, order_id: str):调用gRPC远程方法获取订单详情# request order_pb2.GetOrderRequest(order_idorder_id)# response self.stub.GetOrder(request)# return responsepass # 实际使用时取消注释[4] 服务发现模式 (Consul HTTP API)服务发现让服务动态找到彼此无需硬编码地址。import requestsclass ServiceDiscovery:通过Consul HTTP API实现服务注册与发现def __init__(self, consul_host: str localhost, consul_port: int 8500):self.base_url fhttp://{consul_host}:{consul_port}def register(self, service_name: str, port: int, host: str 127.0.0.1):向Consul注册服务实例payload {Name: service_name,Address: host,Port: port,Check: { # 健康检查配置HTTP: fhttp://{host}:{port}/health,Interval: 10s, # 每10秒检查一次Timeout: 2s,},}response requests.put(f{self.base_url}/v1/agent/service/register, jsonpayload)return response.status_code 200def discover(self, service_name: str) - list:发现指定服务的所有健康实例response requests.get(f{self.base_url}/v1/health/service/{service_name}?passingtrue)services response.json()return [{host: svc[Service][Address],port: svc[Service][Port],}for svc in services][5] 健康检查端点每个微服务应暴露健康检查端点供负载均衡和服务发现组件使用。from fastapi import FastAPIfrom fastapi.responses import JSONResponseapp FastAPI()app.get(/health)async def health_check():健康检查端点返回服务运行状态return JSONResponse(content{status: healthy,service: order-service,version: 1.0.0,})[6] 断路器模式 (pybreaker)断路器防止级联故障——当被调用服务不可用时快速失败。import pybreakerimport httpx# 创建断路器连续失败3次后断开30秒后尝试恢复order_breaker pybreaker.CircuitBreaker(fail_max3, # 最大失败次数reset_timeout30, # 重置超时秒)class OrderServiceWithBreaker:带断路器的订单服务客户端order_breakerdef call_create_order(self, order_data: dict) - dict:断路器保护的外部调用熔断时抛出异常with httpx.Client() as client:resp client.post(http://order-svc/api/orders, jsonorder_data)resp.raise_for_status()return resp.json()[7] 重试与指数退避 (tenacity)网络故障是分布式系统中的常态合理重试可大幅提升稳定性。from tenacity import (retry,stop_after_attempt,wait_exponential,retry_if_exception_type,)import httpxretry(stopstop_after_attempt(3), # 最多重试3次waitwait_exponential(multiplier1, min1, max10), # 等待1s→2s→4sretryretry_if_exception_type(httpx.RequestError), # 仅对网络错误重试)def fetch_user(user_id: int) - dict:带重试机制的用户查询with httpx.Client() as client:resp client.get(fhttp://user-svc/api/users/{user_id})resp.raise_for_status()return resp.json()[8] 综合示例服务间的可靠调用if __name__ __main__:# 服务发现discovery ServiceDiscovery()services discovery.discover(order-service)print(f发现 {len(services)} 个订单服务实例)# 同步调用带重试和断路器try:order OrderServiceWithBreaker().call_create_order({user_id: 42, items: [{sku: ABC, qty: 2}]})print(f订单创建成功: {order})except pybreaker.CircuitBreakerError:print(断路器打开服务暂不可用请稍后重试)except Exception as e:print(f通信失败: {e})