本文先简单介绍一下brpc,然后是brpc官方对stream方式使用的介绍,再看brpc官方的stream方式的使用示例。不过brpc官方示例是client通过stream一直向server发消息,而笔者希望能通过stream进行双向通信,所以对示例进行了修改,以实现client和server的通过stream方式的双向通信。
bRPC简介
bRPC基础介绍。
什么是RPC?
互联网上的机器大都通过TCP/IP协议相互访问,但TCP/IP只是往远端发送了一段二进制数据,为了建立服务还有很多问题需要抽象:
数据以什么格式传输?不同机器间,网络间可能是不同的字节序,直接传输内存数据显然是不合适的;随着业务变化,数据字段往往要增加或删减,怎么兼容前后不同版本的格式?
一个TCP连接可以被多个请求复用以减少开销么?多个请求可以同时发往一个TCP连接么?
如何管理和访问很多机器?
连接断开时应该干什么?
万一server不发送回复怎么办?
…
RPC可以解决这些问题,它把网络交互类比为“client访问server上的函数”:client向server发送request后开始等待,直到server收到、处理、回复client后,client又再度恢复并根据response做出反应。
我们来看看上面的一些问题是如何解决的:数据需要序列化,protobuf在这方面做的不错。用户填写protobuf::Message类型的request,RPC结束后,从同为protobuf::Message类型的response中取出结果。protobuf有较好的前后兼容性,方便业务调整字段。http广泛使用json作为序列化方法。
用户无需关心连接如何建立,但可以选择不同的连接方式:短连接,连接池,单连接。
大量机器一般通过命名服务被发现,可基于DNS, ZooKeeper, etcd等实现。在百度内,我们使用BNS (Baidu Naming Service)。brpc也提供“list://“和”file://“。用户可以指定负载均衡算法,让RPC每次选出一台机器发送请求,包括: round-robin, randomized, consistent-hashing(murmurhash3 or md5)和 locality-aware.
连接断开时可以重试。
如果server没有在给定时间内回复,client会返回超时错误。
哪里可以使用RPC?
几乎所有的网络交互。
RPC不是万能的抽象,否则我们也不需要TCP/IP这一层了。但是在我们绝大部分的网络交互中,RPC既能解决问题,又能隔离更底层的网络问题。
对于RPC常见的质疑有:
- 我的数据非常大,用protobuf序列化太慢了。首先这可能是个伪命题,你得用profiler证明慢了才是真的慢,其次很多协议支持携带二进制数据以绕过序列化。
- 我传输的是流数据,RPC表达不了。事实上brpc中很多协议支持传递流式数据,包括http中的ProgressiveReader, h2的streams, streaming rpc, 和专门的流式协议RTMP。
- 我的场景不需要回复。简单推理可知,你的场景中请求可丢可不丢,可处理也可不处理,因为client总是无法感知,你真的确认这是OK的?即使场景真的不需要,我们仍然建议用最小的结构体回复,因为这不大会是瓶颈,并且追查复杂bug时可能是很有价值的线索。
什么是brpc?
百度内最常使用的工业级RPC框架, 有1,000,000+个实例(不包含client)和上千种服务, 在百度内叫做”baidu-rpc”. 目前只开源C++版本。
你可以使用它:
- 搭建能在一个端口支持多协议的服务, 或访问各种服务
- Server能同步或异步处理请求。
- Client支持同步、异步、半同步,或使用组合channels简化复杂的分库或并发访问。
- 通过http界面调试服务, 使用cpu, heap, contention profilers.
- 获得更好的延时和吞吐.
- 把你组织中使用的协议快速地加入brpc,或定制各类组件, 包括命名服务 (dns, zk, etcd), 负载均衡 (rr, random, consistent hashing)
流式(stream)RPC概述
在一些应用场景中, client或server需要向对面发送大量数据,这些数据非常大或者持续地在产生以至于无法放在一个RPC的附件中。比如一个分布式系统的不同节点间传递replica或snapshot。client/server之间虽然可以通过多次RPC把数据切分后传输过去,但存在如下问题:
- 如果这些RPC是并行的,无法保证接收端有序地收到数据,拼接数据的逻辑相当复杂。
- 如果这些RPC是串行的,每次传递都得等待一次网络RTT+处理数据的延时,特别是后者的延时可能是难以预估的。
为了让大块数据以流水线的方式在client/server之间传递, 我们提供了Streaming RPC这种交互模型。Streaming RPC让用户能够在client/service之间建立用户态连接,称为Stream, 同一个TCP连接之上能同时存在多个Stream。 Stream的传输数据以消息为基本单位, 输入端可以源源不断的往Stream中写入消息, 接收端会按输入端写入顺序收到消息。
Streaming RPC保证: - 有消息边界。
- 接收消息的顺序和发送消息的顺序严格一致。
- 全双工。
- 支持流控。
- 提供超时提醒
目前的实现还没有自动切割过大的消息,同一个tcp连接上的多个Stream之间可能有Head-of-line blocking问题,请尽量避免过大的单个消息,实现自动切割后我们会告知并更新文档。
例子见example/streaming_echo_c++。
建立Stream
目前Stream都由Client端建立。Client先在本地创建一个Stream,再通过一次RPC(必须使用baidu_std协议)与指定的Service建立一个Stream,如果Service在收到请求之后选择接受这个Stream, 那在response返回Client后Stream就会建立成功。过程中的任何错误都把RPC标记为失败,同时也意味着Stream创建失败。用linux下建立连接的过程打比方,Client先创建socket(创建Stream),再调用connect尝试与远端建立连接(通过RPC建立Stream),远端accept后连接就建立了(service接受后创建成功)。
如果Client尝试向不支持Streaming RPC的老Server建立Stream,将总是失败。
程序中我们用StreamId代表一个Stream,对Stream的读写,关闭操作都将作用在这个Id上。
1 | struct StreamOptions |
读取Stream
在建立或者接受一个Stream的时候, 用户可以继承StreamInputHandler并把这个handler填入StreamOptions中. 通过这个handler,用户可以处理对端的写入数据,连接关闭以及idle timeout
1 | class StreamInputHandler { |
第一次收到请求的时间
在client端,如果建立过程是一次同步RPC, 那在等待的线程被唤醒之后,on_received_message就可能会被调用到。 如果是异步RPC请求, 那等到这次请求的done->Run() 执行完毕之后, on_received_message就可能会被调用。
在server端, 当框架传入的done->Run()被调用完之后, on_received_message就可能会被调用。
写入Stream
1 | // Write |message| into |stream_id|. The remote-side handler will received the |
流控
当存在较多已发送但未接收的数据时,发送端的Write操作会立即失败(返回EAGAIN), 这时候可以通过同步或异步的方式等待对端消费掉数据。
1 | // Wait util the pending buffer size is less than |max_buf_size| or error occurs |
关闭Stream
1 | // Close |stream_id|, after this function is called: |
对原始例子的修改
brpc stream使用的官方原始例子为https://github.com/apache/incubator-brpc/tree/master/example/streaming_echo_c++。
不过官方的原始例子只是streaming_echo_client端一直往streaming_echo_server端发stream流消息。
client.cpp代码为:streaming_echo_client端代码
server.cpp代码为:https://github.com/apache/incubator-brpc/blob/master/example/streaming_echo_c%2B%2B/server.cpp
修改后client.cpp代码为:
1 |
|
修改后server.cpp代码为:
1 |
|
关键的是在在建立(StreamCreate方法)或者接受(brpc::StreamAccept(&_sd, *cntl, &stream_options))一个Stream的时候, 把继承StreamInputHandler的具体实现的handler填入StreamOptions中,然后放入到StreamCreate方法和StreamAccept方法。
修改后运行streaming_echo_server端打印信息:
修改后运行streaming_echo_client端打印信息:
通过如上的修改,就达到了client和server端通过stream双向通信的目的 :)