
etcd 解决这种问题的方案是 follower 节点也可以处理线性读请求。follower 节点通过和 leader 节点交互获取“最新”数据然后在本地存储引擎读取最新数据。这种方式大大减轻了 leader 节点的压力。笔者觉得这是非常巧妙的解决方案。func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { ... // 1. 判断是否开始线性读 if !r.Serializable { // 2. 进入线性读处理逻辑 err s.linearizableReadNotify(ctx) ... if err ! nil { return nil, err } } ... // 3. get 用于读存储引擎中的数据 get : func() { resp, _, err txn.Range(ctx, s.Logger(), s.KV(), r) } // 4. 可串行化读流程会回调 get 获取数据 if serr : s.doSerialize(ctx, chk, get); serr ! nil { err serr return nil, err } // 5. 返回 resp 数据 return resp, err }整个读请求流程由EtcdServer.Range处理注意 leader 或 follower 都可以处理该请求。线性读和可串行化读的区别在于是否进入 2 线性读处理逻辑。这里 2 是核心我们在进入 2 之前先把可串行化读逻辑看一遍毕竟这部分是共用的。可串行化读可串行化读主要由 3 和 4 处理3 是一个闭包函数通过 4 回调。我们看下 4 在干嘛func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error { // 处理认证鉴权 ai, err : s.AuthInfoFromCtx(ctx) if err ! nil { return err } ... // 回调 get 函数 get() ... return nil }不难理解如果请求通过认证鉴权就会进入 get 函数处理该读请求func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) { ... // 调用 mvcc kv.Read 获取读事务 txnRead : kv.Read(mvcc.SharedBufReadTxMode, trace) defer txnRead.End() // executeRange 处理读请求 resp, err executeRange(ctx, lg, txnRead, r) return resp, trace, err } func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { ... // 根据请求组装 mvcc.RangeOptions 结构 ro : mvcc.RangeOptions{ Limit: limit, Rev: r.Revision, Count: r.CountOnly, } // 读事务读请求的数据 rr, err : txnRead.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro) if err ! nil { return nil, err } // 对请求数据 rr 进行处理得到 resp 并返回 return resp, nil }核心工作流程是通过读事务的Range方法获取存储引擎中的数据。这是可串行化读的流程我们从之前的分析可知线性读需要和 leader 节点交互获取“最新”数据然后等本地更新最新数据之后才能走可串行化读流程这部分就是在linearizableReadNotify做的。线性读是时候看看期待已久的linearizableReadNotify在做什么了。func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error { s.readMu.RLock() nc : s.readNotifier s.readMu.RUnlock() // signal linearizable loop for current notify if it hasnt been already select { case s.readwaitc - struct{}{}: default: } // wait for read state notification select { case -nc.c: return nc.err case -ctx.Done(): return ctx.Err() case -s.done: return errors.ErrStopped } }方法并不复杂不过逻辑很有意思。首先理清它的流程才能知道这个方法到底在干嘛。方法先把s.readNotifier赋给变量 nc注意这里用的是读锁意味着读请求都将获得s.readNotifier。接着进入 select它将struct{}{}发给s.readwaitc通道该通道是一个有缓冲容量为 1 的通道s.readwaitc make(chan struct{}, 1)这个 select 是在做什么呢我们想象下第一个请求进来往s.readwaitc发通知如果s.readwaitc还在处理请求那么同时来的其它请求会进入select:default分支。最后请求都会阻塞在nc.c等待处理结果。大致知道发送端的流程我们看消费端是怎么消费s.readwaitc的func (s *EtcdServer) linearizableReadLoop() { for { // 生成下一个请求的 id requestID : s.reqIDGen.Next() select { ... // 阻塞在 readwaitc 通道 case -s.readwaitc: ... } // 生成下一个 notifier nextnr : newNotifier() // 加写锁 s.readMu.Lock() // 将 s.readNotifier 赋值给变量 nr nr : s.readNotifier // 将下一个 notifier 赋值给 s.readNotifier s.readNotifier nextnr s.readMu.Unlock() confirmedIndex, err : s.requestCurrentIndex(leaderChangedNotifier, requestID) if isStopped(err) { return } if err ! nil { nr.notify(err) continue } // 获取本节点应用索引 appliedIndex : s.getAppliedIndex() // 如果本节点应用索引小于 confirmed 索引说明请求的数据还没应用到本节点 // 需要继续等 if appliedIndex confirmedIndex { select { case -s.applyWait.Wait(confirmedIndex): case -s.stopping: return } } // 如果本节点应用索引大于等于 confirmed 索引说明请求的数据已应用到本节点 // 直接返回即可 nr.notify(nil) ... } }可以把 readwaitc 理解为一个“批处理触发器”。第一个到达的读请求负责通知后台协程执行一次 ReadIndex。后续同时到达的读请求不会再次触发 ReadIndex而是挂到同一个 notifier 上等待结果。因此1000 个同时到达的线性读请求最终只会触发一次 Leader 交互。这正是 etcd 线性读高性能的重要原因。我们也可以用sync.Cond来实现如果多个请求发现已经有请求在处理了则休眠等待请求处理完被唤醒。但是sync.Cond不能天然的表示分批的概念不如通道实现来的优雅。etcd 用这种方式高效且巧妙的解决了高并发分批读请求。linearizableReadLoop接收到读请求通知后会和 leader 交互换取现在最新的 committed index后续根据该 committed index 从本地读数据。如果本地应用 index 已经大于等于 committed index则表示本地已经是最新了可以直接走可串行化读。如果本地应用 index 小于 committed index表示本地不是最新数据需要等待本地 raft 更新应用 index 到 committed index 才能开始可串行化读。这里的重点在// requestCurrentIndex 获取 confirmed index也就是 leader 的 committed index confirmedIndex, err : s.requestCurrentIndex(leaderChangedNotifier, requestID) func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier -chan struct{}, requestID uint64) (uint64, error) { // sendReadIndex 发送 ReadIndex 消息给 leader err : s.sendReadIndex(requestID) if err ! nil { return 0, err } ... for { select { case rs : -s.r.readStateC: // 阻塞等待 raftNode.readStateC 通道 requestIDBytes : uint64ToBigEndianBytes(requestID) // 判断返回的响应是否是本次请求的响应 gotOwnResponse : bytes.Equal(rs.RequestCtx, requestIDBytes) if !gotOwnResponse { ... // 如果不是继续阻塞等本次请求的响应 continue } // 如果是返回本次请求的 committed index return rs.Index, nil ... } ... }requestCurrentIndex的逻辑包含两块:sendReadIndex发送ReadIndex消息给 leaderfunc (s *EtcdServer) sendReadIndex(requestIndex uint64) error { ctxToSend : uint64ToBigEndianBytes(requestIndex) cctx, cancel : context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) // 调用本机 raft 发送 ReadIndex 消息给 leader err : s.r.ReadIndex(cctx, ctxToSend) cancel() .. return nil } func (n *node) ReadIndex(ctx context.Context, rctx []byte) error { // 发送 MsgReadIndex 给 leader return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) }阻塞等待本节点raftNode.readStateC通道获取 leader 的响应requestCurrentIndex 的核心目标只有一个向 Leader 询问当前已确认的 committed index。leader 处理线性读请求follower 将pb.MsgReadIndex消息发送给 leaderleader 是如何处理的呢我们继续看func stepLeader(r *raft, m pb.Message) error { switch m.Type { case pb.MsgReadIndex: // 判断是否只有一个投票节点leader 自己如果是的话 leader 直接返回它的 committed index if r.trk.IsSingleton() { if resp : r.responseToReadIndexReq(m, r.raftLog.committed); resp.To ! None { r.send(resp) } return nil } // 检查 leader 是否是新上任的 leader且当前 term 还没有写 index if !r.committedEntryInCurrentTerm() { // 如果是新上任 leader 且没有写 index 则进入 pendingReadIndexMessages r.pendingReadIndexMessages append(r.pendingReadIndexM