背景
公司的服务化框架是基于 thrift 协议实现的,虽然一直在用,不过没有很仔细地去了解过。
最近需要做一个流式任务 JOIN 外部 RPC 数据(维表 JOIN)的需求,基于性能考虑,需要使用“异步”的方式调用,便深入看了下。
介绍
官网: https://thrift.apache.org
The Apache Thrift software framework, for scalable cross-language services development, combines a software stack with a code generation engine to build services that work efficiently and seamlessly between C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, OCaml and Delphi and other languages.
thrift 并不只是一个序列化协议,它是一个 framework。
下文的 thrift 版本都是 0.9.3。
序列化
没有太多说的,和 protobuf 类似,定义一个 IDL(Interface Description Language) 文件,使用命令进行代码生成。
远程调用
基本概念
- Protocol: 数据传输格式
- TBinaryProtocol
- TJSONProtocol
- TCompactProtocol
- Transport: 数据传输方式
- TSocket
- THttpTransport
- TFileTransport
- …
- TBufferedTransport
- TFramedTransport
- …
- TServer:Server 实现
- TSimpleServer
- TTreadPoolServer
- TNonBlockingServer
- TTheadedSelectorServer
部分实现
TBinaryProtocol
使用二进制编码格式进行数据传输。
TSocket
TSocket 是对 Socket 的简单封装,表示通过 Socket 进行网络数据传输。
TBufferedTransport 和 TFramedTransport
他们都是对其他 Transport 的装饰,TBufferedTransport 的作用是维护一个 buffer,减少底层 Transport 的读写次数。
比如在 Golang 中,TSocket 每次读写都会调用底层 net.Conn 进行操作,效率比较低下,可以在上面套一层 TBufferedTransport,减少读写次数。
// socket.go
func (p *TSocket) Read(buf []byte) (int, error) {
if !p.IsOpen() {
return 0, NewTTransportException(NOT_OPEN, "Connection not open")
}
p.pushDeadline(true, false)
n, err := p.conn.Read(buf)
return n, NewTTransportExceptionFromError(err)
}
func (p *TSocket) Write(buf []byte) (int, error) {
if !p.IsOpen() {
return 0, NewTTransportException(NOT_OPEN, "Connection not open")
}
p.pushDeadline(false, true)
return p.conn.Write(buf)
}
// buffered_transport.go
func (p *TBufferedTransport) Read(b []byte) (int, error) {
n, err := p.ReadWriter.Read(b)
if err != nil {
p.ReadWriter.Reader.Reset(p.tp)
}
return n, err
}
func (p *TBufferedTransport) Write(b []byte) (int, error) {
n, err := p.ReadWriter.Write(b)
if err != nil {
p.ReadWriter.Writer.Reset(p.tp)
}
return n, err
}
在 Java 中,没有找到对应的类,这是因为 Java 的 TScoket 已经利用 BufferedInputStream / BufferedOutputStream 做了一层 buffer 了。
// TScoket.java
public void open() throws TTransportException {
if (isOpen()) {
throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected.");
}
if (host_.length() == 0) {
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host.");
}
if (port_ <= 0) {
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open without port.");
}
if (socket_ == null) {
initSocket();
}
try {
socket_.connect(new InetSocketAddress(host_, port_), connectTimeout_);
inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
} catch (IOException iox) {
close();
throw new TTransportException(TTransportException.NOT_OPEN, iox);
}
}
接着说 TFramedTransport,它会在底层 Transport 上外加一个 frameSize,用于记录 frame 的大小。
// framed_transport.go
func (p *TFramedTransport) Read(buf []byte) (l int, err error) {
if p.frameSize == 0 {
p.frameSize, err = p.readFrameHeader()
if err != nil {
return
}
}
if p.frameSize < uint32(len(buf)) {
frameSize := p.frameSize
tmp := make([]byte, p.frameSize)
l, err = p.Read(tmp)
copy(buf, tmp)
if err == nil {
// Note: It's important to only return an error when l
// is zero.
// In io.Reader.Read interface, it's perfectly fine to
// return partial data and nil error, which means
// "This is all the data we have right now without
// blocking. If you need the full data, call Read again
// or use io.ReadFull instead".
// Returning partial data with an error actually means
// there's no more data after the partial data just
// returned, which is not true in this case
// (it might be that the other end just haven't written
// them yet).
if l == 0 {
err = NewTTransportExceptionFromError(fmt.Errorf("Not enough frame size %d to read %d bytes", frameSize, len(buf)))
}
return
}
}
got, err := p.reader.Read(buf)
p.frameSize = p.frameSize - uint32(got)
// sanity check
if p.frameSize < 0 {
return 0, NewTTransportException(UNKNOWN_TRANSPORT_EXCEPTION, "Negative frame size")
}
return got, NewTTransportExceptionFromError(err)
}
TSimpleServer
一个“玩具”实现,单线程接收 & 处理请求。
public void serve() {
try {
serverTransport_.listen();
} catch (TTransportException ttx) {
LOGGER.error("Error occurred during listening.", ttx);
return;
}
// Run the preServe event
if (eventHandler_ != null) {
eventHandler_.preServe();
}
setServing(true);
while (!stopped_) {
TTransport client = null;
TProcessor processor = null;
TTransport inputTransport = null;
TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
ServerContext connectionContext = null;
try {
client = serverTransport_.accept();
if (client != null) {
processor = processorFactory_.getProcessor(client);
inputTransport = inputTransportFactory_.getTransport(client);
outputTransport = outputTransportFactory_.getTransport(client);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
if (eventHandler_ != null) {
connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
}
while (true) {
if (eventHandler_ != null) {
eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
}
if(!processor.process(inputProtocol, outputProtocol)) {
break;
}
}
}
} catch (TTransportException ttx) {
// Client died, just move on
} catch (TException tx) {
if (!stopped_) {
LOGGER.error("Thrift error occurred during processing of message.", tx);
}
} catch (Exception x) {
if (!stopped_) {
LOGGER.error("Error occurred during processing of message.", x);
}
}
if (eventHandler_ != null) {
eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
}
if (inputTransport != null) {
inputTransport.close();
}
if (outputTransport != null) {
outputTransport.close();
}
}
setServing(false);
}
TThreadPoolServer
实现比较简单,内部有一个线程池,每来一个请求,就交给线程池进行处理。
public void serve() {
try {
serverTransport_.listen();
} catch (TTransportException ttx) {
LOGGER.error("Error occurred during listening.", ttx);
return;
}
// Run the preServe event
if (eventHandler_ != null) {
eventHandler_.preServe();
}
stopped_ = false;
setServing(true);
int failureCount = 0;
while (!stopped_) {
try {
TTransport client = serverTransport_.accept();
WorkerProcess wp = new WorkerProcess(client);
int retryCount = 0;
long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);
while(true) {
try {
executorService_.execute(wp);
break;
} catch(Throwable t) {
if (t instanceof RejectedExecutionException) {
retryCount++;
try {
if (remainTimeInMillis > 0) {
//do a truncated 20 binary exponential backoff sleep
long sleepTimeInMillis = ((long) (random.nextDouble() *
(1L << Math.min(retryCount, 20)))) * beBackoffSlotInMillis;
sleepTimeInMillis = Math.min(sleepTimeInMillis, remainTimeInMillis);
TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
remainTimeInMillis = remainTimeInMillis - sleepTimeInMillis;
} else {
client.close();
wp = null;
LOGGER.warn("Task has been rejected by ExecutorService " + retryCount
+ " times till timedout, reason: " + t);
break;
}
} catch (InterruptedException e) {
LOGGER.warn("Interrupted while waiting to place client on executor queue.");
Thread.currentThread().interrupt();
break;
}
} else if (t instanceof Error) {
LOGGER.error("ExecutorService threw error: " + t, t);
throw (Error)t;
} else {
//for other possible runtime errors from ExecutorService, should also not kill serve
LOGGER.warn("ExecutorService threw error: " + t, t);
break;
}
}
}
} catch (TTransportException ttx) {
if (!stopped_) {
++failureCount;
LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
}
}
}
executorService_.shutdown();
// Loop until awaitTermination finally does return without a interrupted
// exception. If we don't do this, then we'll shut down prematurely. We want
// to let the executorService clear it's task queue, closing client sockets
// appropriately.
long timeoutMS = stopTimeoutUnit.toMillis(stopTimeoutVal);
long now = System.currentTimeMillis();
while (timeoutMS >= 0) {
try {
executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
break;
} catch (InterruptedException ix) {
long newnow = System.currentTimeMillis();
timeoutMS -= (newnow - now);
now = newnow;
}
}
setServing(false);
}
TNonBlockingServer 和 TTheadedSelectorServer
这两个 TServer 实现都是基于 Java NIO 的,当代高性能 Server 实现的套路。TNonBlockingServer 实现相对比较简单,应该只是一个和 TSimpleServer 类似的玩具实现吧,所有的逻辑都在一个线程中,有比较大的限制,不推荐用于生产环境,比如:业务代码中往往少不了一些阻塞逻辑(RPC 调用、访问 Database 等)、单线程不能充分利用多核且容易处理不过来等等。
TNonBlockingServer 主要逻辑:
private void select() {
try {
// wait for io events.
selector.select();
// process the io events we received
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// skip if not valid
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
// if the key is marked Accept, then it has to be the server
// transport.
if (key.isAcceptable()) {
handleAccept();
} else if (key.isReadable()) {
// deal with reads
handleRead(key);
} else if (key.isWritable()) {
// deal with writes
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
protected void handleRead(SelectionKey key) {
FrameBuffer buffer = (FrameBuffer) key.attachment();
if (!buffer.read()) {
cleanupSelectionKey(key);
return;
}
// if the buffer's frame read is complete, invoke the method.
if (buffer.isFrameFullyRead()) {
if (!requestInvoke(buffer)) {
cleanupSelectionKey(key);
}
}
}
protected boolean requestInvoke(FrameBuffer frameBuffer) {
frameBuffer.invoke();
return true;
}
TTheadedSelectorServer 实现则相对复杂一些,将 Accept 和 读写线程进行分离,并且引入 Worker 线程池处理业务逻辑。
protected boolean requestInvoke(FrameBuffer frameBuffer) {
Runnable invocation = getRunnable(frameBuffer);
if (invoker != null) {
try {
invoker.execute(invocation);
return true;
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected execution!", rx);
return false;
}
} else {
// Invoke on the caller's thread
invocation.run();
return true;
}
}
几个问题
为什么异步调用必须使用 TFramedTransport?
多扯一句,这里说的异步调用是指用户代码视角,内部还是使用的 Java NIO 实现(即上面说到的 TNonBlockingServer 和 TTheadedSelectorServer),在网络编程中不是真正的异步,只能算同步非阻塞。
这是因为在 TNonBlockingServer 和 TTreadedSelectorServer 的实现中,需要先知道数据的大小,然后等待读取完整个数据包后,再进行处理。
个人感觉如果非要实现,不使用 FramedTransport 应该也是可以的,不过会增加实现复杂度。
为什么 Golang 里不需要那么多乱七八糟的 TServer 实现?
在 Golang 的实现中,只看到一个 TSimpleServer,每来一个请求,新开一个协程进行处理即可。这也正是协程的巨大优势,在代码层面,用很少的代码 + 同步处理的方式 即可实现一个相对高性能的 Server。
代码示例
这里用 Java 实现一个最简单的 EchoServer。
IDL
struct TrafficEnv {
1: bool Open = false,
2: string Env = "",
}
struct Base {
1: string LogID = "",
2: string Caller = "",
3: string Addr = "",
4: string Client = "",
5: optional TrafficEnv TrafficEnv,
6: optional map<string, string> Extra,
}
struct BaseResp {
1: string StatusMessage = "",
2: i32 StatusCode = 0,
}
include "base.thrift"
namespace py zyc.demo.demo
namespace go zyc.demo.demo
struct EchoRequest {
1: required string input,
255: required base.Base Base,
}
struct EchoResponse {
1: required string output,
255: required base.BaseResp BaseResp,
}
service DemoServer {
EchoResponse echo(1: EchoRequest request)
}
同步调用
// server
public class Server {
public static void main(String[] args) throws TTransportException {
TServerSocket serverTransport = new TServerSocket(12345);
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
DemoServer.Processor<DemoServer.Iface> processor = new DemoServer.Processor<DemoServer.Iface>(
new DemoServer.Iface() {
@Override
public EchoResponse echo(EchoRequest request) throws TException {
return new EchoResponse(request.input, new BaseResp());
}
});
TThreadPoolServer.Args tArgs = new TThreadPoolServer.Args(serverTransport)
.processor(processor)
.protocolFactory(protocolFactory);
TServer server = new TThreadPoolServer(tArgs);
server.serve();
}
}
// client
public class Client {
public static void main(String[] args) throws TException {
TTransport transport = new TSocket("127.0.0.1", 12345);
DemoServer.Client client = new DemoServer.Client(new TBinaryProtocol(transport));
transport.open();
System.out.println(client.echo(new EchoRequest("abc", new Base())).getOutput());;
transport.close();
}
}
异步调用
// server
public class Server {
public static void main(String[] args) throws TTransportException {
DemoServer.AsyncProcessor<DemoServer.AsyncIface> processor = new DemoServer.AsyncProcessor<>(new DemoServer.AsyncIface() {
@Override
public void echo(EchoRequest request, AsyncMethodCallback resultHandler) throws TException {
resultHandler.onComplete(new EchoResponse(request.input, new BaseResp()));
}
});
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(12345);
TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport)
.processor(processor)
.transportFactory(new TFramedTransport.Factory())
.protocolFactory(new TBinaryProtocol.Factory())
.selectorThreads(2)
.workerThreads(10);
TServer server = new TThreadedSelectorServer(tArgs);
server.serve();
}
}
// client
public class Client {
public static void main(String[] args) throws IOException, TException, InterruptedException {
DemoServer.AsyncClient client = new DemoServer.AsyncClient(
new TBinaryProtocol.Factory(),
new TAsyncClientManager(),
new TNonblockingSocket("127.0.0.1", 12345)
);
client.echo(new EchoRequest("abc", new Base()), new AsyncMethodCallback<DemoServer.AsyncClient.echo_call>() {
@Override
public void onComplete(DemoServer.AsyncClient.echo_call result) {
EchoResponse resp = null;
try {
resp = result.getResult();
System.out.println(resp.getOutput());
} catch (TException e) {
e.printStackTrace();
}
}
@Override
public void onError(Exception e) {
System.out.println(e);
}
});
Thread.sleep(1000);
}
}