gRPC 实战教程:从 Protobuf 到 FastAPI,一篇讲清核心原理与开发实践 PRCgRPC为什么会选择 gRPCgRPC 为什么常常更快gRPC 和 REST 对比protobuf相关依赖工具DemoProtobuf 常用数据类型Protobuf 常用关键字生成文件怎么看四种 RPC 方式Unary一元 RPCServer streaming服务端流Client streaming客户端流Bidirectional streaming双向流错误处理Metadata请求元数据压缩按需拦截器进程模型按需gRPC-Web 与 Envoy浏览器需要时再用HTTP/JSON 转码与反射按需gRPC 与 FastAPI 共存FastAPI 作为 gRPC 客户端在同一进程暴露 HTTP 和 gRPC如何选择RPCRemote Procedure Call远程过程调用是一种分布式通信抽象调用方像调用本地函数一样调用另一个进程或另一台机器上的服务。这个“像本地调用”的体验来自客户端桩、服务端桩和通信框架网络延迟、超时、重试、部分失败等分布式系统问题并不会因此消失。gRPCgRPC 是一个开源、高性能的 RPC 框架。它通常使用 Protocol Buffers下文简称 Protobuf定义服务接口和消息并在传输层使用基于 HTTP/2 的 gRPC 协议。编译.proto文件后可以为 Python、Go、Java、C、C#、Node.js 等语言生成客户端 Stub本地调用代理和服务端骨架。名称说明gRPC 官方 FAQ 将 gRPC 解释为递归缩写“gRPC Remote Procedure Calls”不宜写成“Google Remote Procedure Call”。为什么会选择 gRPCREST 更像一种面向资源的架构风格gRPC 更强调“调用哪个服务方法”。两者解决问题的视角不同并非谁天然取代谁。gRPC 常用于内部服务通信主要价值有接口契约明确服务、方法、请求和响应都写在.proto中字段类型和编号构成可检查、可演进的契约。跨语言代码生成调用方通常不必手写 URL、JSON 映射和响应解析生成的 stub 把远程调用包装成目标语言中的方法调用。传输开销较紧凑Protobuf 使用二进制字段编码常比等价的文本 JSON 更小实际收益取决于字段类型、数据规模、压缩方式和实现不能用固定倍数概括。复用连接与并发请求HTTP/2 可在一个连接上承载多个并发 stream减少频繁建连和 HTTP/1.1 队头阻塞带来的开销。Channel 是客户端到服务端的长期通信通道应复用连接达到并发 stream 上限后请求仍会排队。原生流式 RPC除一问一答的 Unary RPC 外还支持服务端流、客户端流和双向流适合持续推送、批量上传和双向会话。统一的 RPC 语义deadline最晚完成时间、取消、状态码和 metadata请求元数据等能力都围绕一次 RPC 组织。gRPC 为什么常常更快“快”通常来自一组因素叠加而不是“二进制一定比 JSON 快”这一条Protobuf 消息通常更紧凑网络传输字节更少编码和解码效率也常有优势。HTTP/2 长连接和多路复用降低了重复建连开销并允许多个 RPC 并发共享连接。流式 RPC 可在同一个长生命周期调用中持续传输多条消息避免反复创建 RPC 的固定成本。生成代码减少了动态字段映射和手写协议适配也让客户端与服务端更容易保持一致。但 gRPC不是在所有场景都更快。对很小的本地请求框架和序列化开销可能占主导浏览器接入需要 gRPC-Web 或网关排障工具和可读性不如纯文本 HTTP/JSON 直接。是否采用应以端到端延迟、吞吐、带宽、开发成本和兼容性测试为准。gRPC 和 REST 对比维度gRPCREST/HTTP API接口模型面向服务与方法通常由.proto定义面向资源使用 URI 与 HTTP 方法表达操作常见数据格式Protobuf也可使用其他编解码方案JSON 最常见也可使用文本或二进制格式传输原生 gRPC 通常基于 HTTP/2可运行在 HTTP/1.1、HTTP/2 或 HTTP/3 上取决于服务端与客户端类型与代码生成强契约通常生成客户端/服务端代码OpenAPI 也可描述契约并生成代码但不是 REST 的强制要求流式能力原生支持四种 RPC 形态可使用 SSE、WebSocket、流式响应等接口模型不同浏览器接入通常需要 gRPC-Web、Connect 或网关代理浏览器和通用 HTTP 工具可直接访问适合场景内部服务、跨语言调用、低延迟和流式通信公共 API、浏览器接口、资源型 CRUD 与开放生态参考《gRPC——我们为什么要用 gRPCgRPC 快在哪里》、gRPC 核心概念、gRPC 性能最佳实践。protobufProtobufProtocol Buffers是 Google 开源的语言无关、平台无关、可扩展的结构化数据序列化机制。它由 schema、编译器和各语言运行时组成不只是一个“类似 JSON 的工具库”。Protobuf 通常比等价 JSON 更紧凑、解析更高效但不存在适用于所有数据和语言实现的固定“3–5 倍”结论性能应以真实业务数据基准测试为准。.proto是接口定义文件也是客户端与服务端共同遵守的契约。消息字段后的数字如name 1是序列化编号不是默认值一旦发布就不要改号或把旧编号分配给新字段。相关依赖工具conda create -n grpc_learn python3.11 conda activate grpc_learn # 安装相关依赖 python -m pip install grpcio grpcio-tools protobufgrpcio是运行时grpcio-tools用于从.proto生成 Python 代码protobuf提供消息的序列化与反序列化实现。Demo# file_name:hello_grpc.proto syntax proto3; // 包名 package test; // 定义服务 service HelloRpc { // 定义服务函数 rpc HelloAnchor(HelloAnchorReq) returns (HelloAnchorReply){} } // 定义信息数据格式 message HelloAnchorReq { string name 1; int32 age 2; } message HelloAnchorReply { string result 1; }先把.proto编译为 Python 消息类和 gRPC 接口代码python -m grpc_tools.protoc -I. --python_out. --grpc_python_out. hello_grpc.proto随后生成hello_grpc_pb2.py和hello_grpc_pb2_grpc.py。# service.py import grpc import hello_grpc_pb2 as pb2 import hello_grpc_pb2_grpc as pb2_grpc from concurrent import futures class HelloRpc(pb2_grpc.HelloRpcServicer): def HelloAnchor(self, request, context): name request.name age request.age result fHello, {name}!, your age is {age}! return pb2.HelloAnchorReply(resultresult) def run(): grpc_server grpc.server( # gRPC Python 默认使用线程池处理请求 # max_workers 控制线程池可并发执行的服务方法数量gRPC 核心并非‘仅支持一个线程’ futures.ThreadPoolExecutor(max_workers4) ) # 注册服务到 grpc pb2_grpc.add_HelloRpcServicer_to_server(HelloRpc(), grpc_server) # 绑定端口 grpc_server.add_insecure_port(localhost:5000) print(Starting hello_grpc... at localhost:5000) # 启动服务 grpc_server.start() # 常驻 try: while True: grpc_server.wait_for_termination(3600) except KeyboardInterrupt: # 键盘中止安全退出 grpc_server.stop(0) if __name__ __main__: run()# 启动服务端 python service.py# client.py import grpc import hello_grpc_pb2 as pb2 import hello_grpc_pb2_grpc as pb2_grpc def run(): # Channel 负责连接管理应在多个 RPC 之间复用 conn grpc.insecure_channel(localhost:5000) # 绑定频道到对应的客户端 client pb2_grpc.HelloRpcStub(channelconn) response client.HelloAnchor( pb2.HelloAnchorReq(nameanchor, age18), timeout3, ) print(response.result) if __name__ __main__: run()# 启动客户端 python client.pyinsecure_channel和add_insecure_port不启用 TLS只适合本机 Demo 或受信任网络跨主机部署应配置凭据并使用安全 Channel。Protobuf 常用数据类型类型说明stringUTF-8 编码或 7-bit ASCII 文本二进制数据应使用 bytesbytes任意字节序列bool布尔类型int3232位整型int6464位整型float浮点类型repeated重复字段Python 中通常表现为容器repeated string data 1;map映射字段Python 中通常表现为映射容器mapstring, string data 1;Protobuf 常用关键字类型说明package包名syntaxProtobuf版本service定义服务rpc定义服务中的方法stream定义的方法传输为流传输message定义消息体 message User{}extend扩展消息体 extend User{}import导入其他.proto文件//注释生成文件怎么看执行grpc_tools.protoc后会得到两个文件hello_grpc_pb2.py包含 Protobuf 消息类和描述符。hello_grpc_pb2_grpc.py包含客户端 Stub、服务端 Servicer 基类和注册函数。通常不需要阅读或修改生成文件。业务代码只需要服务端继承HelloRpcServicer客户端创建HelloRpcStub.proto变化后重新生成即可。四种 RPC 方式是否写stream决定请求和响应是一条消息还是消息序列方式请求响应典型用途Unary一条一条查询、普通命令Server streaming一条多条持续推送结果Client streaming多条一条分片上传、批量汇总Bidirectional streaming多条多条独立双向会话Unary一元 RPC最常见的一问一答。调用会一直等待到服务端返回、发生错误或超过 deadline。# file_name:hello_grpc.proto service HelloRpc { rpc HelloAnchor(HelloAnchorReq) returns (HelloAnchorReply){} } message HelloAnchorReq { string name 1; int32 age 2; } message HelloAnchorReply { string result 1; }# service.py class HelloRpc(pb2_grpc.HelloRpcServicer): def HelloAnchor(self, request, context): name request.name age request.age result fHello, {name}!, your age is {age}! return pb2.HelloAnchorReply(resultresult)# client.py def run(): conn grpc.insecure_channel(localhost:5000) client pb2_grpc.HelloRpcStub(channelconn) response client.HelloAnchor(pb2.HelloAnchorReq( nameanchor, age18 )) print(response.result)Server streaming服务端流客户端发送一条请求服务端按顺序返回多条响应客户端迭代响应直到流结束。# file_name:hello_grpc.proto service HelloRpc { rpc TestClientRecvStream(TestClientRecvStreamReq) returns (stream TestClientRecvStreamReply){} } message TestClientRecvStreamReq { string data 1; } message TestClientRecvStreamReply { string result 1; }# service.py class HelloRpc(pb2_grpc.HelloRpcServicer): def TestClientRecvStream(self, request, context): index 0 while context.is_active(): data request.data if data close: return time.sleep(1) index 1 result send %d %s %(index, data) print(result) yield pb2.TestClientRecvStreamReply( resultresult )# client.py def run(): conn grpc.insecure_channel(localhost:5000) client pb2_grpc.HelloRpcStub(channelconn) response client.TestClientRecvStream(pb2.TestClientRecvStreamReq( dataclose )) for item in response: print(item.result)Client streaming客户端流客户端通过迭代器发送多条请求服务端消费完请求流后返回一条响应。# file_name:hello_grpc.proto service HelloRpc { rpc TestClientSendStream(stream TestClientSendStreamReq) returns (TestClientSendStreamReply){} } message TestClientSendStreamReq { string data 1; } message TestClientSendStreamReply { string result 1; }# service.py class HelloRpc(pb2_grpc.HelloRpcServicer): def TestClientSendStream(self, request_iterator, context): index 0 for request in request_iterator: print(request.data, :, index) if index 10: break index 1 return pb2.TestClientSendStreamReply(resultok)# client.py def test(): index 0 while True: time.sleep(1) data str(random.randint(1,100)) if index 5: break print(index) index 1 yield pb2.TestClientSendStreamReq(datadata) def run(): conn grpc.insecure_channel(localhost:5000) client pb2_grpc.HelloRpcStub(channelconn) response client.TestClientSendStream((test())) print(response.result)Bidirectional streaming双向流双方都可发送多条消息两条流彼此独立“双向”不表示服务端收到一条后必须立即回复一条。# file_name:hello_grpc.proto service HelloRpc { rpc TestTwoWayStream(stream TestTwoWayStreamReq) returns (stream TestTwoWayStreamReply){} } message TestTwoWayStreamReq { string data 1; } message TestTwoWayStreamReply { string result 1; }# service.py class HelloRpc(pb2_grpc.HelloRpcServicer): def TestTwoWayStream(self, request_iterator, context): for request in request_iterator: data request.data yield pb2.TestTwoWayStreamReply(resultservice send client %s % data)# client.py def test_two(): index 0 while True: time.sleep(1) data str(random.randint(1,100)) if index 5: break print(index) index 1 yield pb2.TestTwoWayStreamReq(datadata) def run(): conn grpc.insecure_channel(localhost:5000) client pb2_grpc.HelloRpcStub(channelconn) # 超时异常 response client.TestTwoWayStream(test_two(), timeout10) for res in response: print(res.result)错误处理服务端使用标准状态码结束 RPC客户端捕获grpc.RpcError。状态码描述调用结果响应消息承载业务数据不要再在响应体中设计一套重复的“成功/失败码”。# service.pydefHelloAnchor(self,request,context):ifrequest.age0:context.abort(grpc.StatusCode.INVALID_ARGUMENT,age must be greater than or equal to 0,)returnpb2.HelloAnchorReply(resultfHello,{request.name})# client.pytry:responsestub.HelloAnchor(request,timeout3)exceptgrpc.RpcErrorasexc:print(exc.code(),exc.details())常用状态码包括INVALID_ARGUMENT、NOT_FOUND、UNAUTHENTICATED、PERMISSION_DENIED、UNAVAILABLE和DEADLINE_EXCEEDED。客户端应设置 timeout只有在请求具备幂等性且策略明确时才重试。Metadata请求元数据Metadata 是一次 RPC 携带的键值对常用于认证、追踪和租户信息。文本 key 使用小写 ASCII二进制 key 以-bin结尾。大块业务数据应放在 Protobuf 消息中。# service.pyclassHelloRpc(pb2_grpc.HelloRpcServicer):defHelloAnchor(self,request,context):metadatadict(context.invocation_metadata())print(metadata.get(trace-id))context.set_trailing_metadata(((server-version,1),))returnpb2.HelloAnchorReply(resultok)# client.pyresponse,callstub.HelloAnchor.with_call(pb2.HelloAnchorReq(nameanchor,age18),metadata((trace-id,abc-123),),timeout3,)print(dict(call.trailing_metadata()))压缩按需压缩适合体积较大、可压缩率高的消息小消息可能因为 CPU 和压缩头开销反而更慢。先压测再在 channel、server 或单次调用上启用channelgrpc.insecure_channel(localhost:5000,compressiongrpc.Compression.Gzip,)responsestub.HelloAnchor(request,compressiongrpc.Compression.Gzip,timeout3,)不要在应用层再手工 gzip Protobuf 字节除非协议明确要求这样做。拦截器拦截器类似中间件适合统一处理日志、指标、认证和 trace 传播。不要把具体业务规则塞进拦截器也不要为了单个方法引入它直接在方法中处理通常更清楚。服务端实现grpc.ServerInterceptor.intercept_service()客户端根据 RPC 形态实现对应 interceptor然后在创建 server 或 channel 时注册。认证失败应返回UNAUTHENTICATED权限不足返回PERMISSION_DENIED不应返回UNIMPLEMENTED。进程模型按需同步 gRPC Python 通过线程池执行服务方法max_workers应根据阻塞时间、CPU 使用率和压测结果设置。CPU 密集任务不要仅靠扩大线程池应交给独立进程、任务队列或可横向扩展的服务实例。多进程共享端口依赖操作系统和SO_REUSEPORT行为跨平台差异较大。生产环境更简单可靠的做法通常是每个进程监听独立实例端口由容器平台或负载均衡器分发流量。gRPC-Web 与 Envoy浏览器需要时再用浏览器不能直接使用完整的原生 gRPC 协议因此前端通常调用 gRPC-Web再由 Envoy 等代理转为后端 gRPC。Browser ── gRPC-Web ── Envoy ── gRPC/HTTP2 ── gRPC Server生成客户端protoc-I. hello_grpc.proto\--js_outimport_stylecommonjs,binary:./src/api\--grpc-web_outimport_stylecommonjs,modegrpcwebtext:./src/apiEnvoy 的最小职责是启用grpc_webfilter、将请求路由到 gRPC cluster并为 upstream 显式启用 HTTP/2。跨域部署时再配置 CORSTLS、鉴权和限流也应按实际边界增加而不是复制一份“万能配置”。官方 gRPC-Web 客户端支持 Unary RPCgrpcwebtext模式还支持服务端流。客户端流和双向流不是标准 gRPC-Web 的通用能力。HTTP/JSON 转码与反射按需这两个能力解决不同问题JSON 转码让普通 HTTP/JSON 客户端调用 gRPC 服务。需要在.proto中声明google.api.http规则并给 Envoy 的grpc_json_transcoder提供 descriptor set。Server Reflection让grpcurl等工具在运行时查询服务描述方便调试。它不等于“客户端从此不需要契约”也不应替代正常的.proto发布和版本管理。fromgrpc_reflection.v1alphaimportreflection service_names(pb2.DESCRIPTOR.services_by_name[HelloRpc].full_name,reflection.SERVICE_NAME,)reflection.enable_server_reflection(service_names,grpc_server)动态拼装消息和 RPC 路径适合通用调试工具不适合普通业务客户端它牺牲了生成代码带来的类型检查、IDE 提示和契约可读性。gRPC 与 FastAPI 共存FastAPI 和 gRPC 都能暴露服务接口但擅长的入口不同。FastAPI 适合浏览器、第三方调用方和资源型 HTTP APIgRPC 适合契约明确的服务间调用及流式通信。项目不必机械地“两套都上”服务数量少、调用量不高或开放性优先时只用 FastAPI 往往更简单跨语言服务增多、延迟和带宽敏感或需要流式 RPC 时再引入 gRPC 更划算。一种常见但并非唯一的分工是Browser / Third-party Client │ HTTP/JSON ▼ FastAPI API Layer │ gRPC ▼ Internal ServiceFastAPI 负责 HTTP 路由、参数校验、鉴权、文件上传和 OpenAPI 文档gRPC 负责内部服务契约与 RPC 调用。服务发现、负载均衡、TLS 和可观测性并不由“使用 gRPC”自动解决仍需结合 DNS、服务网格、代理或平台能力配置。FastAPI 作为 gRPC 客户端FastAPI 路由若声明为async def不应在事件循环中直接调用同步 Stub否则网络等待会阻塞同一 worker 的其他协程。grpc.aio是 gRPC Python 的异步 API可以和 FastAPI 共用事件循环并在应用生命周期内复用 ChannelfromcontextlibimportasynccontextmanagerimportgrpcfromfastapiimportFastAPI,Requestimporthello_grpc_pb2aspb2importhello_grpc_pb2_grpcaspb2_grpcasynccontextmanagerasyncdeflifespan(app:FastAPI):channelgrpc.aio.insecure_channel(127.0.0.1:50051)app.state.grpc_channelchannel app.state.hello_stubpb2_grpc.HelloRpcStub(channel)try:yieldfinally:awaitchannel.close()appFastAPI(lifespanlifespan)app.get(/hello/{name})asyncdefhello(name:str,request:Request):replyawaitrequest.app.state.hello_stub.HelloAnchor(pb2.HelloAnchorReq(namename,age18),timeout3.0,)return{result:reply.result}这里有三个容易忽略的点channel 和 stub 应复用不要每个 HTTP 请求都重新创建连接。为 RPC 设置合理的 deadline/timeout并把grpc.aio.AioRpcError映射为合适的 HTTP 状态码。客户端取消 HTTP 请求时可按业务需要继续向下游传播取消信号。如果现有代码只能使用同步 stub可以把 FastAPI 路由写成普通def让 FastAPI 在线程池中执行或者显式放入线程池这是一种兼容方案不等于同步调用变成了异步调用。在同一进程暴露 HTTP 和 gRPC同一 Python 进程可以同时监听两个端口。与手工启动裸线程相比使用grpc.aio.server()更容易和 FastAPI/ASGI 共享事件循环并能在 lifespan 中完成优雅关闭fromcontextlibimportasynccontextmanagerimportgrpcfromfastapiimportFastAPIimporthello_grpc_pb2_grpcaspb2_grpcasynccontextmanagerasyncdeflifespan(app:FastAPI):grpc_servergrpc.aio.server()pb2_grpc.add_HelloRpcServicer_to_server(HelloRpc(),grpc_server)grpc_server.add_insecure_port([::]:50051)awaitgrpc_server.start()try:yieldfinally:awaitgrpc_server.stop(grace5)appFastAPI(lifespanlifespan)启动 HTTP 服务uvicorn app:app--host0.0.0.0--port8000这种方式适合开发、小型部署或两个入口必须共享进程内状态的场景但不是天然的“生产最佳实践”。需要注意Uvicorn 开多个 worker 时每个 worker 都会执行 lifespan若都绑定50051通常会发生端口冲突。HTTP 与 gRPC 共用进程故障、CPU、内存和发布周期也会耦合。生产环境通常更容易运维的做法是把 FastAPI 和 gRPC 服务拆成独立进程或容器分别扩缩容并通过平台完成健康检查与优雅终止。如何选择只有一个 Python 服务、主要面向浏览器或第三方先使用 FastAPI保持系统简单。内部服务跨语言、接口契约容易漂移考虑 gRPC 和代码生成。需要服务端流、客户端流或双向流优先评估原生 gRPC并确认代理、负载均衡和超时策略支持长连接。需要浏览器直接调用 gRPC 服务使用 gRPC-Web 与代理同时接受其流式能力限制。同时提供公共 HTTP API 和内部 gRPC让两个入口共享业务层而不是在两套路由处理器里复制业务逻辑。是否使用 gRPC关键不在于它能否比“本地函数调用”更快——跨网络调用从来不是本地函数调用——而在于系统是否需要跨进程、跨机器或跨语言的强契约通信。即使需要这些能力也应以真实链路压测和运维复杂度为依据而不是把“对外 FastAPI、对内 gRPC”当作不可变的规则。参考FastAPI Lifespan、gRPC Python AsyncIO API、gRPC-Web 基础教程。