python: Parallelism Pattern 项目结构# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Parallelism Pattern 并行模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/17 20:44 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : settings.py import os # 基础路径 BASE_DIR os.path.dirname(os.path.dirname(os.path.abspath(__file__))) IMAGE_SAVE_DIR os.path.join(BASE_DIR, jewelry_images) # 并行配置 THREAD_MAX_WORKERS 16 PROCESS_MAX_WORKERS 4 # 业务模拟配置 JEWELRY_COUNT 100 ORDER_COUNT 50 WAREHOUSE_COUNT 5 # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Parallelism Pattern 并行模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/17 20:46 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : entities.py from dataclasses import dataclass from ParallelismPattern.config.settings import JEWELRY_COUNT, ORDER_COUNT, WAREHOUSE_COUNT dataclass class Jewelry(object): id: str dataclass class Order(object): id: str dataclass class Warehouse(object): id: str class EntityFactory(object): 实体工厂统一生成测试数据 staticmethod def get_jewelries(): :return: return [Jewelry(f珠宝_{i:03d}) for i in range(1, JEWELRY_COUNT 1)] staticmethod def get_orders(): :return: return [Order(f订单_{i:03d}) for i in range(1, ORDER_COUNT 1)] staticmethod def get_warehouses(): :return: return [Warehouse(f仓库_{chr(65 i)}) for i in range(WAREHOUSE_COUNT)] # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Parallelism Pattern 并行模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/17 20:44 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : logger.py import logging import os from datetime import datetime from ParallelismPattern.config.settings import BASE_DIR # 日志目录 LOG_DIR os.path.join(BASE_DIR, logs) os.makedirs(LOG_DIR, exist_okTrue) # 日志格式 logging.basicConfig( levellogging.INFO, format%(asctime)s | %(levelname)s | %(module)s | %(message)s, handlers[ logging.FileHandler(os.path.join(LOG_DIR, fjewelry_{datetime.now().date()}.log), encodingutf-8), logging.StreamHandler() ] ) logger logging.getLogger(jewelry_system) # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Parallelism Pattern 并行模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/17 20:44 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : parallel.py from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed from ParallelismPattern.utils.logger import logger from ParallelismPattern.config.settings import THREAD_MAX_WORKERS, PROCESS_MAX_WORKERS class ParallelExecutor(object): 通用并行执行器支持线程/进程完全解耦业务 并行执行器核心通用组件 staticmethod def run_thread_tasks(task_func, task_list, max_workersTHREAD_MAX_WORKERS): 执行多线程任务IO密集 :param task_func: :param task_list: :param max_workers: :return: return ParallelExecutor._run_tasks( executor_clsThreadPoolExecutor, task_functask_func, task_listtask_list, max_workersmax_workers ) staticmethod def run_process_tasks(task_func, task_list, max_workersPROCESS_MAX_WORKERS): 执行多进程任务CPU密集 :param task_func: :param task_list: :param max_workers: :return: return ParallelExecutor._run_tasks( executor_clsProcessPoolExecutor, task_functask_func, task_listtask_list, max_workersmax_workers ) staticmethod def _run_tasks(executor_cls, task_func, task_list, max_workers): 内部通用执行逻辑 :param executor_cls: :param task_func: :param task_list: :param max_workers: :return: results [] try: with executor_cls(max_workersmax_workers) as executor: future_map {executor.submit(task_func, item): item for item in task_list} for future in as_completed(future_map): try: res future.result() results.append(res) logger.info(res) except Exception as e: task_item future_map[future] err_msg f任务执行失败 | 函数{task_func.__name__} | 项{task_item} | 错误{str(e)} logger.error(err_msg) results.append(err_msg) except Exception as e: logger.error(f并行执行器异常{str(e)}) return results# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Parallelism Pattern 并行模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/17 20:46 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : quality_service.py import time import random class QualityService(object): 质检服务 staticmethod def inspect(jewelry): :param jewelry: :return: time.sleep(0.2) result random.choice([合格, 不合格]) return f【质检】{jewelry.id}{result} # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Parallelism Pattern 并行模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/17 20:50 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : image_service.py import os from PIL import Image, ImageDraw from ParallelismPattern.config.settings import IMAGE_SAVE_DIR from ParallelismPattern.utils.logger import logger import time class ImageService(object): 图片服务 staticmethod def process(jewelry): :param jewelry: :return: try: os.makedirs(IMAGE_SAVE_DIR, exist_okTrue) img Image.new(RGB, (200, 200), colorgold) draw ImageDraw.Draw(img) draw.text((10, 10), jewelry.id, fillblack) img.save(os.path.join(IMAGE_SAVE_DIR, f{jewelry.id}.jpg)) time.sleep(0.15) return f【图片处理】{jewelry.id}完成 except Exception as e: logger.error(f图片处理失败 {jewelry.id}: {e}) return f【图片处理】{jewelry.id}失败 # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Parallelism Pattern 并行模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/17 20:54 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : stock_service.py import time import random class StockService(object): 库存服务 staticmethod def check(warehouse): :param warehouse: :return: time.sleep(0.3) num random.randint(50, 200) return f【库存盘点】{warehouse.id}库存{num}件 # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Parallelism Pattern 并行模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/17 20:55 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : notice_service.py import time class NoticeService(object): 通知服务 staticmethod def send_ship(order): time.sleep(0.1) return f【发货通知】{order.id}已推送短信邮件 # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Parallelism Pattern 并行模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/17 20:57 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : cert_service.py import time import random class CertService(object): 证书服务 staticmethod def verify(jewelry): time.sleep(0.25) cert fNGTC{random.randint(100000,999999)} return f【证书核验】{jewelry.id}证书{cert} 有效 # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述 Parallelism Pattern 并行模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/17 21:06 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : price_service.py import time import random class PriceService(object): 定价服务 staticmethod def calculate(jewelry): time.sleep(0.22) price random.randint(1000, 50000) return f【定价核算】{jewelry.id}定价{price}元 # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Parallelism Pattern 并行模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/17 21:07 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : task_manager.py from ParallelismPattern.models.entities import EntityFactory from ParallelismPattern.services.quality_service import QualityService from ParallelismPattern.services.image_service import ImageService from ParallelismPattern.services.stock_service import StockService from ParallelismPattern.services.notice_service import NoticeService from ParallelismPattern.services.cert_service import CertService from ParallelismPattern.services.price_service import PriceService from ParallelismPattern.utils.parallel import ParallelExecutor class JewelryTaskManager(object): 任务编排中心 新增业务只需要在这里加一个方法 一个服务无需动并行核心 def __init__(self): self.jewelries EntityFactory.get_jewelries() self.orders EntityFactory.get_orders() self.warehouses EntityFactory.get_warehouses() def run_all_tasks(self): 执行全流程任务 :return: # IO密集 → 线程池 ParallelExecutor.run_thread_tasks(NoticeService.send_ship, self.orders) ParallelExecutor.run_thread_tasks(StockService.check, self.warehouses) ParallelExecutor.run_thread_tasks(ImageService.process, self.jewelries) # CPU密集 → 进程池 ParallelExecutor.run_process_tasks(QualityService.inspect, self.jewelries) ParallelExecutor.run_process_tasks(CertService.verify, self.jewelries) ParallelExecutor.run_process_tasks(PriceService.calculate, self.jewelries)调用# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Parallelism Pattern 并行模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/17 21:09 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : ParallelismBll.py import time from ParallelismPattern.tasks.task_manager import JewelryTaskManager from ParallelismPattern.utils.logger import logger class ParallelismBll(object): def demo(self): :return: logger.info( 企业级珠宝并行任务系统 启动 ) start time.time() # 启动任务 manager JewelryTaskManager() manager.run_all_tasks() end time.time() logger.info(f 系统执行完成 | 总耗时{end - start:.2f}s )输出