python: Producer Consumer Pattern

python: Producer Consumer Pattern 项目结构# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Producer Consumer Pattern 生产者消费者模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 15:48 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : settings.py # 全局配置集中管理便于维护和修改 QUEUE_MAX_SIZE 5 RAW_MATERIAL_TYPES [黄金原料, 钻石原石, 翡翠原石, 铂金原料] QUALITY_LEVELS [S级(顶级), A级(优质), B级(普通)] # 线程并发配置 PRODUCER_THREADS 1 PROCESS_THREADS 2 QUALITY_THREADS 1 WAREHOUSE_THREADS 1 SALE_THREADS 2 # 耗时模拟配置 TIME_RAW (1, 3) TIME_PROCESS (2, 4) TIME_QUALITY (1, 2) TIME_WAREHOUSE (0.5, 1) TIME_SALE (1, 3) # 新增业务控制 MAX_PRODUCE_COUNT 10 # 最多生产10件珠宝生产完自动停止 # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Producer Consumer Pattern 生产者消费者模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 15:49 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : logger.py import logging import sys class ColorFormatter(logging.Formatter): # 终端颜色码 COLOR_WHITE \033[37m COLOR_RED \033[31m COLOR_GREEN \033[32m COLOR_BLUE \033[34m COLOR_RESET \033[0m def format(self, record): msg super().format(record) # 按模块区分颜色 if ProducerService in record.name: return self.COLOR_GREEN msg self.COLOR_RESET elif ConsumerService in record.name: return self.COLOR_WHITE msg self.COLOR_RESET elif record.levelno logging.ERROR: return self.COLOR_RED msg self.COLOR_RESET return self.COLOR_BLUE msg self.COLOR_RESET class LoggerHelper(object): 日志工具 def get_logger(name: str) - logging.Logger: :return: logger logging.getLogger(name) logger.setLevel(logging.INFO) if logger.handlers: return logger fmt %(asctime)s | %(levelname)s | %(name)s | %(message)s date_fmt %Y-%m-%d %H:%M:%S console_handler logging.StreamHandler(sys.stdout) console_handler.setFormatter(ColorFormatter(fmt, datefmtdate_fmt)) logger.addHandler(console_handler) return logger # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Producer Consumer Pattern 生产者消费者模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 15:51 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : decorators.py import time import random from functools import wraps class Decorators(object): 通用装饰器 def simulate_cost(min_sec: float, max_sec: float): 模拟业务耗时装饰器 :param max_sec: :return: def decorator(func): wraps(func) def wrapper(*args, **kwargs): time.sleep(random.uniform(min_sec, max_sec)) return func(*args, **kwargs) return wrapper return decorator # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Producer Consumer Pattern 生产者消费者模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 15:53 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : jewelry.py from dataclasses import dataclass from enum import Enum # 数据模型 class JewelryStage(Enum): RAW 原料 PROCESSED 成品 QUALIFIED 已分级 WAREHOUSED 已入库 SALABLE 可销售 dataclass class Jewelry: 珠宝实体模型统一数据结构 id: str material: str stage: JewelryStage quality_level: str None def __str__(self): if self.quality_level: return f[{self.stage.value}]{self.quality_level}{self.material} return f[{self.stage.value}]{self.material} # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Producer Consumer Pattern 生产者消费者模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 15:54 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : queue_manager.py 队列核心 import queue import threading from ProducerConsumerPattern.config.settings import QUEUE_MAX_SIZE class QueueManager: 单例队列管理器全局唯一统一管理所有队列 _instance None def __new__(cls): if cls._instance is None: cls._instance super().__new__(cls) cls._instance.init_queues() return cls._instance def init_queues(self): :return: self.raw_queue queue.Queue(maxsizeQUEUE_MAX_SIZE) self.process_queue queue.Queue(maxsizeQUEUE_MAX_SIZE) self.quality_queue queue.Queue(maxsizeQUEUE_MAX_SIZE) self.sale_queue queue.Queue(maxsizeQUEUE_MAX_SIZE) # 新增线程安全停止信号 self._stop_event threading.Event() self.produced_count 0 self._count_lock threading.Lock() property def is_stop(self): :return: return self._stop_event.is_set() def stop(self): 发送全局停止信号 :return: self._stop_event.set() def add_produce_count(self): 线程安全累加生产计数 :return: with self._count_lock: self.produced_count 1 return self.produced_count # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Producer Consumer Pattern 生产者消费者模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 15:56 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : thread_manager.py import threading from ProducerConsumerPattern.utils.logger import LoggerHelper logger LoggerHelper.get_logger(ThreadManager) class ThreadManager: 线程生命周期管理器统一启动/管理/守护 def __init__(self): self.threads [] def add_thread(self, target, name: str, args: tuple ()): :param target: :param name: :param args: :return: t threading.Thread(targettarget, argsargs, namename, daemonTrue) self.threads.append(t) def start_all(self): :return: logger.info(启动所有业务线程...) for t in self.threads: t.start() logger.info(f线程已启动: {t.name})# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Producer Consumer Pattern 生产者消费者模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 15:58 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : producer.py 生产者服务单一职责 import uuid import random from ProducerConsumerPattern.models.jewelry import Jewelry, JewelryStage from ProducerConsumerPattern.core.queue_manager import QueueManager from ProducerConsumerPattern.utils.logger import LoggerHelper from ProducerConsumerPattern.utils.decorators import Decorators from ProducerConsumerPattern.config.settings import RAW_MATERIAL_TYPES, TIME_RAW, MAX_PRODUCE_COUNT logger LoggerHelper.get_logger(ProducerService) queue_mgr QueueManager() class RawMaterialProducer: 原料采购生产者唯一职责 生产原料 staticmethod Decorators.simulate_cost(*TIME_RAW) def produce(): while not queue_mgr.is_stop: material random.choice(RAW_MATERIAL_TYPES) jewelry Jewelry( idstr(uuid.uuid4())[:8], materialmaterial, stageJewelryStage.RAW ) queue_mgr.raw_queue.put(jewelry) current_cnt queue_mgr.add_produce_count() logger.info( f原料采购完成 | {jewelry} | 队列剩余: {queue_mgr.raw_queue.qsize()} | 已生产:{current_cnt}/{MAX_PRODUCE_COUNT}) # 达到最大产量触发全局停止 if current_cnt MAX_PRODUCE_COUNT: logger.info(f已达到最大生产数量 {MAX_PRODUCE_COUNT}准备停止生产) queue_mgr.stop() break # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Producer Consumer Pattern 生产者消费者模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 16:00 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : consumer.py 消费者服务职责单一 import random import queue from ProducerConsumerPattern.models.jewelry import Jewelry, JewelryStage from ProducerConsumerPattern.core.queue_manager import QueueManager from ProducerConsumerPattern.utils.logger import LoggerHelper from ProducerConsumerPattern.utils.decorators import Decorators from ProducerConsumerPattern.config.settings import ( QUALITY_LEVELS, TIME_PROCESS, TIME_QUALITY, TIME_WAREHOUSE, TIME_SALE ) logger LoggerHelper.get_logger(ConsumerService) queue_mgr QueueManager() class ProcessConsumer: 加工消费者唯一职责 加工原料 staticmethod Decorators.simulate_cost(*TIME_PROCESS) def consume(): while not queue_mgr.is_stop or not queue_mgr.raw_queue.empty(): try: jewelry queue_mgr.raw_queue.get(timeout0.5) except queue.Empty: continue logger.info(f开始加工 | {jewelry}) jewelry.stage JewelryStage.PROCESSED queue_mgr.process_queue.put(jewelry) queue_mgr.raw_queue.task_done() logger.info(f加工完成 | {jewelry} | 队列剩余: {queue_mgr.process_queue.qsize()}) class QualityConsumer: 质检消费者 staticmethod Decorators.simulate_cost(*TIME_QUALITY) def consume(): :return: while not queue_mgr.is_stop or not queue_mgr.raw_queue.empty(): try: jewelry queue_mgr.raw_queue.get(timeout0.5) except queue.Empty: continue # jewelry queue_mgr.process_queue.get() logger.info(f开始质检 | {jewelry}) jewelry.quality_level random.choice(QUALITY_LEVELS) jewelry.stage JewelryStage.QUALIFIED queue_mgr.quality_queue.put(jewelry) queue_mgr.process_queue.task_done() logger.info(f质检完成 | {jewelry} | 队列剩余: {queue_mgr.quality_queue.qsize()}) class WarehouseConsumer: 仓储消费者 staticmethod Decorators.simulate_cost(*TIME_WAREHOUSE) def consume(): :return: while not queue_mgr.is_stop or not queue_mgr.raw_queue.empty(): try: jewelry queue_mgr.raw_queue.get(timeout0.5) except queue.Empty: continue # jewelry queue_mgr.quality_queue.get() logger.info(f开始入库 | {jewelry}) jewelry.stage JewelryStage.SALABLE queue_mgr.sale_queue.put(jewelry) queue_mgr.quality_queue.task_done() logger.info(f入库完成 | {jewelry} | 队列剩余: {queue_mgr.sale_queue.qsize()}) class SaleConsumer: 销售消费者 staticmethod Decorators.simulate_cost(*TIME_SALE) def consume(): :return: while not queue_mgr.is_stop or not queue_mgr.raw_queue.empty(): try: jewelry queue_mgr.raw_queue.get(timeout0.5) except queue.Empty: continue # jewelry queue_mgr.sale_queue.get() logger.info(f开始销售 | {jewelry}) queue_mgr.sale_queue.task_done() logger.info(f销售成功 | {jewelry}\n)调用# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述 Producer Consumer Pattern 生产者消费者模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 16:03 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : ProducerConsumerBll.py import time from ProducerConsumerPattern.utils.logger import LoggerHelper from ProducerConsumerPattern.core.thread_manager import ThreadManager from ProducerConsumerPattern.core.queue_manager import QueueManager from ProducerConsumerPattern.services.producer import RawMaterialProducer from ProducerConsumerPattern.services.consumer import ProcessConsumer, QualityConsumer, WarehouseConsumer, SaleConsumer from ProducerConsumerPattern.config.settings import ( PRODUCER_THREADS, PROCESS_THREADS, QUALITY_THREADS, WAREHOUSE_THREADS, SALE_THREADS ) logger LoggerHelper.get_logger(ProducerConsumerBll) queue_mgr QueueManager() class ProducerConsumerBll(object): def demo(self): :return: logger.info( * 60) logger.info(珠宝全流程生产销售系统企业级生产者消费者模式) logger.info( * 60) thread_mgr ThreadManager() # 注册所有线程 for _ in range(PRODUCER_THREADS): thread_mgr.add_thread(RawMaterialProducer.produce, RawProducer) for _ in range(PROCESS_THREADS): thread_mgr.add_thread(ProcessConsumer.consume, ProcessConsumer) for _ in range(QUALITY_THREADS): thread_mgr.add_thread(QualityConsumer.consume, QualityConsumer) for _ in range(WAREHOUSE_THREADS): thread_mgr.add_thread(WarehouseConsumer.consume, WarehouseConsumer) for _ in range(SALE_THREADS): thread_mgr.add_thread(SaleConsumer.consume, SaleConsumer) # 启动 thread_mgr.start_all() try: # 主线程循环等待每1秒检查一次是否停止 while not queue_mgr.is_stop: time.sleep(1) logger.info(收到停止信号等待队列剩余任务处理完成...) # 等待所有业务线程自然退出 for t in thread_mgr.threads: t.join(timeout10) logger.info(所有业务线程已正常退出) logger.info(系统正常结束输出停止) except KeyboardInterrupt: logger.info(检测到 CtrlC触发优雅停止...) queue_mgr.stop() # 等待线程处理剩余任务 for t in thread_mgr.threads: t.join(timeout10) logger.info(系统手动终止完成输出停止)输出