原创

Dubbo服务并发通信原理及源码解析

dubbo默认采用netty进行TCP通讯。TCP是传输层协议,在应用层,往往会拓展自定义的协议,一是可以处理TCP本身的粘包拆包问题,二是约定通讯过程的其他细节。
所以dubbo默认采用自定义的dubbo协议。文档描述:
Dubbo缺省协议采用单一长连接和NIO异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。

缺省协议,使用基于netty3.2.5+hessian3.2.1交互。
连接个数:单连接
连接方式:长连接
传输协议:TCP
传输方式:NIO异步传输
序列化:Hessian二进制序列化
适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用dubbo协议传输大文件或超大字符串。
适用场景:常规远程服务方法调用

dubbo协议中对字节流的处理在com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec类中,包含了对request请求的编码和解码,response响应的编码和解码。
dubbo协议采用固定长度的消息头(16字节)和不定长度的消息体来进行数据传输,消息头定义了一些通讯框架netty在IO线程处理时需要的信息。具体dubbo协议的报文格式如下:
dubbo协议报文消息头详解:
magic:类似java字节码文件里的魔数,用来判断是不是dubbo协议的数据包。魔数是常量0xdabb
flag:标志位, 一共8个地址位。低四位用来表示消息体数据用的序列化工具的类型(默认hessian),高四位中,第一位为1表示是request请求,第二位为1表示双向传输(即有返回response),第三位为1表示是心跳ping事件。
status:状态位, 设置请求响应状态,dubbo定义了一些响应的类型。具体类型见com.alibaba.dubbo.remoting.exchange.Response
invoke id:消息id, long 类型。每一个请求的唯一识别id(由于采用异步通讯的方式,用来把请求request和返回的response对应上)
body length:消息体 body 长度, int 类型,即记录Body Content有多少个字节。

因为采用单一长连接,所以如果消费者多线程请求,服务端处理完消息后返回,就会造成消息错乱的问题。解决这个问题的思路跟解决socket中的粘包问题类似。
socket粘包问题解决方法:用的最多的其实是定义一个定长的数据包头,其中包含了完整数据包的长度,以此来完成服务器端拆包工作。
类似的,那么dubbo解决上述问题的方法:就是给包头中添加一个全局唯一标识id,服务器端响应请求时也要携带这个id,供客户端多线程领取对应的响应数据提供线索。

接下来我们直接分析一下源码:
在HeaderExchangeChannel这个类中,先来看看其中的request方法

public ResponseFuture request(Object request, int timeout) throws RemotingException {
   if (closed) {
       throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
   }
   // create request.
   Request req = new Request();
   req.setVersion("2.0.0");
   req.setTwoWay(true);
   req.setData(request);

//这个future就是前面我们提到的:客户端并发请求线程阻塞的对象
   DefaultFuture future = new DefaultFuture(channel, req, timeout);
   try{
       channel.send(req);  //非阻塞调用
   }catch (RemotingException e) {
       future.cancel();
       throw e;
   }
   return future;
}

注意这个方法返回的ResponseFuture对象,当前处理客户端请求的线程在经过一系列调用后,会拿到ResponseFuture对象,最终该线程会阻塞在这个对象的get()方法调用上,如下:

public Object get(int timeout) throws RemotingException {
    if (timeout <= 0) {
        timeout = Constants.DEFAULT_TIMEOUT;
    }
    if (! isDone()) {
        long start = System.currentTimeMillis();
        lock.lock();
        try {
            while (! isDone()) {    //无限连
                done.await(timeout, TimeUnit.MILLISECONDS);
                if (isDone() || System.currentTimeMillis() - start > timeout) {
                    break;
                }
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
        if (! isDone()) {
            throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
        }
    }
    return returnFromResponse();
}

上面我已经看到请求线程已经阻塞,那么又是如何被唤醒的呢?现在来仔细看一下HeaderExchangeHandler类的定义,先看一下它定义的received方法,下面是代码片段:

public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
          .....
        } else if (message instanceof Response) {   
            //这里就是作为消费者的dubbo客户端在接收到响应后,触发通知对应等待线程的起点
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
           .....
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

我们主要看中间的那个条件分支,它是用来处理响应消息的,也就是说当dubbo客户端接收到来自服务端的响应后会执行到这个分支,它简单的调用了handleResponse方法:

static void handleResponse(Channel channel, Response response) throws RemotingException {
    if (response != null && !response.isHeartbeat()) {  //排除心跳类型的响应
        DefaultFuture.received(channel, response);
    }
}

这里的DefaultFuture,它是实现了我们上面说的ResponseFuture接口类型。
继续来看一下DefaultFuture.received方法的实现细节:

public static void received(Channel channel, Response response) {
    try {
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at " 
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 
                        + ", response " + response 
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 
                            + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}

通过前面说的标志id,DefaultFuture.FUTURES可以拿到具体的那个DefaultFuture对象,它就是上面我们提到的,阻塞请求线程的那个对象。找到目标后,调用它的doReceived方法唤醒对应的线程往下执行:

private void doReceived(Response res) {
    lock.lock();
    try {
        response = res;
        if (done != null) {
            done.signal();
        }
    } finally {
        lock.unlock();
    }
    if (callback != null) {
        invokeCallback(callback);
    }
}

对上面的源码总结成以下的步骤:
1.消费者一个线程调用远程接口,生成一个唯一的ID(比如一段随机字符串,UUID等),Dubbo是使用AtomicLong从0开始累计数字的。
2.将方法调用信息(如调用的接口名称,方法名称,参数值列表等)和处理结果的回调对象callback(即ResponseFuture对象),全部封装在一起组成一个对象object。向专门存放调用信息的全局ConcurrentHashMap里面put(ID, object)。
3.将ID和打包的方法调用信息封装成一对象connRequest,通过Netty异步发送出去。
4.当前线程再使用callback的get()方法试图获取远程返回的结果,在get()内部,则使用lock.lock()获取回调对象callback的锁, 再先检测是否已经获取到结果,如果没有,然后调用callback的wait()方法,释放callback上的锁,让当前线程处于等待状态。
5.服务端接收到请求并处理后,将结果(此结果中包含了前面的ID,即回传)发送给客户端,客户端socket连接上专门监听消息的线程收到消息,分析结果,取到ID,再从前面的ConcurrentHashMap里面get(ID),从而找到callback,将方法调用结果设置到callback对象里。
6.监听线程接着使用lock.lock()获取回调对象callback的锁,再调用signal()(类似notifyAll())方法唤醒前面处于等待状态的线程继续执行。至此,整个过程结束。

正文到此结束
Loading...