Thrift

在 Github 上修改

thrift

背景

公司的服务化框架是基于 thrift 协议实现的,虽然一直在用,不过没有很仔细地去了解过。

最近需要做一个流式任务 JOIN 外部 RPC 数据(维表 JOIN)的需求,基于性能考虑,需要使用“异步”的方式调用,便深入看了下。

介绍

官网: https://thrift.apache.org

The Apache Thrift software framework, for scalable cross-language services development, combines a software stack with a code generation engine to build services that work efficiently and seamlessly between C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, OCaml and Delphi and other languages.

thrift 并不只是一个序列化协议,它是一个 framework。

下文的 thrift 版本都是 0.9.3。

序列化

没有太多说的,和 protobuf 类似,定义一个 IDL(Interface Description Language) 文件,使用命令进行代码生成。

远程调用

基本概念

  • Protocol: 数据传输格式
    • TBinaryProtocol
    • TJSONProtocol
    • TCompactProtocol
  • Transport: 数据传输方式
    • TSocket
    • THttpTransport
    • TFileTransport
    • TBufferedTransport
    • TFramedTransport
  • TServer:Server 实现
    • TSimpleServer
    • TTreadPoolServer
    • TNonBlockingServer
    • TTheadedSelectorServer

部分实现

TBinaryProtocol

使用二进制编码格式进行数据传输。

TSocket

TSocket 是对 Socket 的简单封装,表示通过 Socket 进行网络数据传输。

TBufferedTransport 和 TFramedTransport

他们都是对其他 Transport 的装饰,TBufferedTransport 的作用是维护一个 buffer,减少底层 Transport 的读写次数。

比如在 Golang 中,TSocket 每次读写都会调用底层 net.Conn 进行操作,效率比较低下,可以在上面套一层 TBufferedTransport,减少读写次数。

// socket.go
func (p *TSocket) Read(buf []byte) (int, error) {
	if !p.IsOpen() {
		return 0, NewTTransportException(NOT_OPEN, "Connection not open")
	}
	p.pushDeadline(true, false)
	n, err := p.conn.Read(buf)
	return n, NewTTransportExceptionFromError(err)
}

func (p *TSocket) Write(buf []byte) (int, error) {
	if !p.IsOpen() {
		return 0, NewTTransportException(NOT_OPEN, "Connection not open")
	}
	p.pushDeadline(false, true)
	return p.conn.Write(buf)
}
// buffered_transport.go
func (p *TBufferedTransport) Read(b []byte) (int, error) {
	n, err := p.ReadWriter.Read(b)
	if err != nil {
		p.ReadWriter.Reader.Reset(p.tp)
	}
	return n, err
}

func (p *TBufferedTransport) Write(b []byte) (int, error) {
	n, err := p.ReadWriter.Write(b)
	if err != nil {
		p.ReadWriter.Writer.Reset(p.tp)
	}
	return n, err
}

在 Java 中,没有找到对应的类,这是因为 Java 的 TScoket 已经利用 BufferedInputStream / BufferedOutputStream 做了一层 buffer 了。

// TScoket.java
public void open() throws TTransportException {
  if (isOpen()) {
    throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected.");
  }

  if (host_.length() == 0) {
    throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host.");
  }
  if (port_ <= 0) {
    throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open without port.");
  }

  if (socket_ == null) {
    initSocket();
  }

  try {
    socket_.connect(new InetSocketAddress(host_, port_), connectTimeout_);
    inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
    outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
  } catch (IOException iox) {
    close();
    throw new TTransportException(TTransportException.NOT_OPEN, iox);
  }
}

接着说 TFramedTransport,它会在底层 Transport 上外加一个 frameSize,用于记录 frame 的大小。

// framed_transport.go
func (p *TFramedTransport) Read(buf []byte) (l int, err error) {
	if p.frameSize == 0 {
		p.frameSize, err = p.readFrameHeader()
		if err != nil {
			return
		}
	}
	if p.frameSize < uint32(len(buf)) {
		frameSize := p.frameSize
		tmp := make([]byte, p.frameSize)
		l, err = p.Read(tmp)
		copy(buf, tmp)
		if err == nil {
			// Note: It's important to only return an error when l
			// is zero.
			// In io.Reader.Read interface, it's perfectly fine to
			// return partial data and nil error, which means
			// "This is all the data we have right now without
			// blocking. If you need the full data, call Read again
			// or use io.ReadFull instead".
			// Returning partial data with an error actually means
			// there's no more data after the partial data just
			// returned, which is not true in this case
			// (it might be that the other end just haven't written
			// them yet).
			if l == 0 {
				err = NewTTransportExceptionFromError(fmt.Errorf("Not enough frame size %d to read %d bytes", frameSize, len(buf)))
			}
			return
		}
	}
	got, err := p.reader.Read(buf)
	p.frameSize = p.frameSize - uint32(got)
	// sanity check
	if p.frameSize < 0 {
		return 0, NewTTransportException(UNKNOWN_TRANSPORT_EXCEPTION, "Negative frame size")
	}
	return got, NewTTransportExceptionFromError(err)
}
TSimpleServer

一个“玩具”实现,单线程接收 & 处理请求。

public void serve() {
  try {
    serverTransport_.listen();
  } catch (TTransportException ttx) {
    LOGGER.error("Error occurred during listening.", ttx);
    return;
  }

  // Run the preServe event
  if (eventHandler_ != null) {
    eventHandler_.preServe();
  }

  setServing(true);

  while (!stopped_) {
    TTransport client = null;
    TProcessor processor = null;
    TTransport inputTransport = null;
    TTransport outputTransport = null;
    TProtocol inputProtocol = null;
    TProtocol outputProtocol = null;
    ServerContext connectionContext = null;
    try {
      client = serverTransport_.accept();
      if (client != null) {
        processor = processorFactory_.getProcessor(client);
        inputTransport = inputTransportFactory_.getTransport(client);
        outputTransport = outputTransportFactory_.getTransport(client);
        inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
        if (eventHandler_ != null) {
          connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
        }
        while (true) {
          if (eventHandler_ != null) {
            eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
          }
          if(!processor.process(inputProtocol, outputProtocol)) {
            break;
          }
        }
      }
    } catch (TTransportException ttx) {
      // Client died, just move on
    } catch (TException tx) {
      if (!stopped_) {
        LOGGER.error("Thrift error occurred during processing of message.", tx);
      }
    } catch (Exception x) {
      if (!stopped_) {
        LOGGER.error("Error occurred during processing of message.", x);
      }
    }

    if (eventHandler_ != null) {
      eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
    }

    if (inputTransport != null) {
      inputTransport.close();
    }

    if (outputTransport != null) {
      outputTransport.close();
    }

  }
  setServing(false);
}
TThreadPoolServer

实现比较简单,内部有一个线程池,每来一个请求,就交给线程池进行处理。

public void serve() {
  try {
    serverTransport_.listen();
  } catch (TTransportException ttx) {
    LOGGER.error("Error occurred during listening.", ttx);
    return;
  }

  // Run the preServe event
  if (eventHandler_ != null) {
    eventHandler_.preServe();
  }

  stopped_ = false;
  setServing(true);
  int failureCount = 0;
  while (!stopped_) {
    try {
      TTransport client = serverTransport_.accept();
      WorkerProcess wp = new WorkerProcess(client);

      int retryCount = 0;
      long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);
      while(true) {
        try {
          executorService_.execute(wp);
          break;
        } catch(Throwable t) {
          if (t instanceof RejectedExecutionException) {
            retryCount++;
            try {
              if (remainTimeInMillis > 0) {
                //do a truncated 20 binary exponential backoff sleep
                long sleepTimeInMillis = ((long) (random.nextDouble() *
                                                  (1L << Math.min(retryCount, 20)))) * beBackoffSlotInMillis;
                sleepTimeInMillis = Math.min(sleepTimeInMillis, remainTimeInMillis);
                TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
                remainTimeInMillis = remainTimeInMillis - sleepTimeInMillis;
              } else {
                client.close();
                wp = null;
                LOGGER.warn("Task has been rejected by ExecutorService " + retryCount
                            + " times till timedout, reason: " + t);
                break;
              }
            } catch (InterruptedException e) {
              LOGGER.warn("Interrupted while waiting to place client on executor queue.");
              Thread.currentThread().interrupt();
              break;
            }
          } else if (t instanceof Error) {
            LOGGER.error("ExecutorService threw error: " + t, t);
            throw (Error)t;
          } else {
            //for other possible runtime errors from ExecutorService, should also not kill serve
            LOGGER.warn("ExecutorService threw error: " + t, t);
            break;
          }
        }
      }
    } catch (TTransportException ttx) {
      if (!stopped_) {
        ++failureCount;
        LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
      }
    }
  }

  executorService_.shutdown();

  // Loop until awaitTermination finally does return without a interrupted
  // exception. If we don't do this, then we'll shut down prematurely. We want
  // to let the executorService clear it's task queue, closing client sockets
  // appropriately.
  long timeoutMS = stopTimeoutUnit.toMillis(stopTimeoutVal);
  long now = System.currentTimeMillis();
  while (timeoutMS >= 0) {
    try {
      executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
      break;
    } catch (InterruptedException ix) {
      long newnow = System.currentTimeMillis();
      timeoutMS -= (newnow - now);
      now = newnow;
    }
  }
  setServing(false);
}
TNonBlockingServer 和 TTheadedSelectorServer

这两个 TServer 实现都是基于 Java NIO 的,当代高性能 Server 实现的套路。TNonBlockingServer 实现相对比较简单,应该只是一个和 TSimpleServer 类似的玩具实现吧,所有的逻辑都在一个线程中,有比较大的限制,不推荐用于生产环境,比如:业务代码中往往少不了一些阻塞逻辑(RPC 调用、访问 Database 等)、单线程不能充分利用多核且容易处理不过来等等。

TNonBlockingServer 主要逻辑:

private void select() {
  try {
    // wait for io events.
    selector.select();

    // process the io events we received
    Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
    while (!stopped_ && selectedKeys.hasNext()) {
      SelectionKey key = selectedKeys.next();
      selectedKeys.remove();

      // skip if not valid
      if (!key.isValid()) {
        cleanupSelectionKey(key);
        continue;
      }

      // if the key is marked Accept, then it has to be the server
      // transport.
      if (key.isAcceptable()) {
        handleAccept();
      } else if (key.isReadable()) {
        // deal with reads
        handleRead(key);
      } else if (key.isWritable()) {
        // deal with writes
        handleWrite(key);
      } else {
        LOGGER.warn("Unexpected state in select! " + key.interestOps());
      }
    }
  } catch (IOException e) {
    LOGGER.warn("Got an IOException while selecting!", e);
  }
}

protected void handleRead(SelectionKey key) {
  FrameBuffer buffer = (FrameBuffer) key.attachment();
  if (!buffer.read()) {
    cleanupSelectionKey(key);
    return;
  }

  // if the buffer's frame read is complete, invoke the method.
  if (buffer.isFrameFullyRead()) {
    if (!requestInvoke(buffer)) {
      cleanupSelectionKey(key);
    }
  }
}

protected boolean requestInvoke(FrameBuffer frameBuffer) {
  frameBuffer.invoke();
  return true;
}

TTheadedSelectorServer 实现则相对复杂一些,将 Accept 和 读写线程进行分离,并且引入 Worker 线程池处理业务逻辑。

protected boolean requestInvoke(FrameBuffer frameBuffer) {
  Runnable invocation = getRunnable(frameBuffer);
  if (invoker != null) {
    try {
      invoker.execute(invocation);
      return true;
    } catch (RejectedExecutionException rx) {
      LOGGER.warn("ExecutorService rejected execution!", rx);
      return false;
    }
  } else {
    // Invoke on the caller's thread
    invocation.run();
    return true;
  }
}

几个问题

为什么异步调用必须使用 TFramedTransport?

多扯一句,这里说的异步调用是指用户代码视角,内部还是使用的 Java NIO 实现(即上面说到的 TNonBlockingServer 和 TTheadedSelectorServer),在网络编程中不是真正的异步,只能算同步非阻塞。

这是因为在 TNonBlockingServer 和 TTreadedSelectorServer 的实现中,需要先知道数据的大小,然后等待读取完整个数据包后,再进行处理。

个人感觉如果非要实现,不使用 FramedTransport 应该也是可以的,不过会增加实现复杂度。

为什么 Golang 里不需要那么多乱七八糟的 TServer 实现?

在 Golang 的实现中,只看到一个 TSimpleServer,每来一个请求,新开一个协程进行处理即可。这也正是协程的巨大优势,在代码层面,用很少的代码 + 同步处理的方式 即可实现一个相对高性能的 Server。

代码示例

这里用 Java 实现一个最简单的 EchoServer。

IDL
struct TrafficEnv {
    1: bool Open = false,
    2: string Env = "",
}

struct Base {
    1: string LogID = "",
    2: string Caller = "",
    3: string Addr = "",
    4: string Client = "",
    5: optional TrafficEnv TrafficEnv,
    6: optional map<string, string> Extra,
}

struct BaseResp {
    1: string StatusMessage = "",
    2: i32 StatusCode = 0,
}

include "base.thrift"

namespace py zyc.demo.demo
namespace go zyc.demo.demo

struct EchoRequest {
    1: required string input,
    255: required base.Base Base,
}

struct EchoResponse {
    1: required string output,
    255: required base.BaseResp BaseResp,
}

service DemoServer {
    EchoResponse echo(1: EchoRequest request)
}
同步调用
// server
public class Server {
    public static void main(String[] args) throws TTransportException {
        TServerSocket serverTransport = new TServerSocket(12345);
        TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
        DemoServer.Processor<DemoServer.Iface> processor = new DemoServer.Processor<DemoServer.Iface>(
                new DemoServer.Iface() {
                    @Override
                    public EchoResponse echo(EchoRequest request) throws TException {
                        return new EchoResponse(request.input, new BaseResp());
                    }
                });
        TThreadPoolServer.Args tArgs = new TThreadPoolServer.Args(serverTransport)
                .processor(processor)
                .protocolFactory(protocolFactory);
        TServer server = new TThreadPoolServer(tArgs);
        server.serve();
    }
}


// client
public class Client {
    public static void main(String[] args) throws TException {
        TTransport transport = new TSocket("127.0.0.1", 12345);
        DemoServer.Client client = new DemoServer.Client(new TBinaryProtocol(transport));
        transport.open();
        System.out.println(client.echo(new EchoRequest("abc", new Base())).getOutput());;
        transport.close();
    }
}
异步调用
// server
public class Server {
    public static void main(String[] args) throws TTransportException {
        DemoServer.AsyncProcessor<DemoServer.AsyncIface> processor = new DemoServer.AsyncProcessor<>(new DemoServer.AsyncIface() {
            @Override
            public void echo(EchoRequest request, AsyncMethodCallback resultHandler) throws TException {
                resultHandler.onComplete(new EchoResponse(request.input, new BaseResp()));
            }
        });
        TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(12345);
        TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport)
                .processor(processor)
                .transportFactory(new TFramedTransport.Factory())
                .protocolFactory(new TBinaryProtocol.Factory())
                .selectorThreads(2)
                .workerThreads(10);
        TServer server = new TThreadedSelectorServer(tArgs);
        server.serve();
    }
}

// client
public class Client {
    public static void main(String[] args) throws IOException, TException, InterruptedException {
        DemoServer.AsyncClient client = new DemoServer.AsyncClient(
                new TBinaryProtocol.Factory(),
                new TAsyncClientManager(),
                new TNonblockingSocket("127.0.0.1", 12345)
        );

        client.echo(new EchoRequest("abc", new Base()), new AsyncMethodCallback<DemoServer.AsyncClient.echo_call>() {
            @Override
            public void onComplete(DemoServer.AsyncClient.echo_call result) {
                EchoResponse resp = null;
                try {
                    resp = result.getResult();
                    System.out.println(resp.getOutput());
                } catch (TException e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void onError(Exception e) {
                System.out.println(e);
            }
        });
        Thread.sleep(1000);
    }
}