`
flychao88
  • 浏览: 743556 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Dubbo源码分析系列1---Dubbo异步通信

 
阅读更多

1、client一个线程调用远程接口,生成一个唯一的ID(比如一段随机字符串,UUID等),Dubbo是使用AtomicLong从0开始累计数字。

 

2、将打包的方法调用信息(如调用的接口名称,方法名称,参数值列表等),和处理结果的回调对象callback,全部封装在一起,组成一个对象object。

 

3、向专门存放调用信息的全局ConcurrentHashMap里面put(ID, object)。

 

4、将ID和打包的方法调用信息封装成一对象connRequest,使用IoSession.write(connRequest)异步发送出去。

 

5、当前线程再使用callback的get()方法试图获取远程返回的结果,在get()内部,则使用synchronized获取回调对象callback的锁, 再先检测是否已经获取到结果,如果没有,然后调用callback的wait()方法,释放callback上的锁,让当前线程处于等待状态。

 

6、服务端接收到请求并处理后,将结果(此结果中包含了前面的ID,即回传)发送给客户端,客户端socket连接上专门监听消息的线程收到消息,分析结果,取到ID,再从前面的ConcurrentHashMap里面get(ID),从而找到callback,将方法调用结果设置到callback对象里。

 

7、监听线程接着使用synchronized获取回调对象callback的锁(因为前面调用过wait(),那个线程已释放callback的锁了),再notifyAll(),唤醒前面处于等待状态的线程继续执行(callback的get()方法继续执行就能拿到调用结果了),至此,整个过程结束。

 

客户端部分源码:

//同步调用远程接口
public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException {
        byte protocol = getProtocol(control);
        if (!TRConstants.isValidProtocol(protocol)) {
            throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync.");
        }
        ResponseFuture future = invokeWithFuture(appRequest, control);
        return future.get();  //获取结果时让当前线程等待,ResponseFuture其实就是前面说的callback
}
public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) {
         byte protocol = getProtocol(control);
         long timeout = getTimeout(control);
         ConnectionRequest request = new ConnectionRequest(appRequest);
         request.setSerializeProtocol(protocol);
         Callback2FutureAdapter adapter = new Callback2FutureAdapter(request);
         connection.sendRequestWithCallback(request, adapter, timeout);
         return adapter;
}

 

回调部分源码如下:

Callback2FutureAdapter implements ResponseFuture
public Object get() throws RemotingException, InterruptedException {
    synchronized (this) {  // 旋锁
        while (!isDone) {  // 是否有结果了
            wait(); //没结果是释放锁,让当前线程处于等待状态
        }
    }
    if (errorCode == TRConstants.RESULT_TIMEOUT) {
         throw new TimeoutException("Wait response timeout, request["
         + connectionRequest.getAppRequest() + "].");
    }
    else if (errorCode > 0) {
        throw new RemotingException(errorMsg);
    }
    else {
         return appResp;
    }
}
客户端收到服务端结果后,回调时相关方法,即设置isDone = true并notifyAll()
public void handleResponse(Object _appResponse) {
         appResp = _appResponse; //将远程调用结果设置到callback中来
         setDone();
}
public void onRemotingException(int _errorType, String _errorMsg) {
         errorCode = _errorType;
         errorMsg = _errorMsg;
         setDone();
}
private void setDone() {
         isDone = true;
         synchronized (this) { //获取锁,因为前面wait()已经释放了callback的锁了
             notifyAll(); // 唤醒处于等待的线程
         }
}

 

Dubbo通信部分源码:

 
// 用来存放请求和回调的MAP
private final ConcurrentHashMap<Long, Object[]> requestResidents;
 
//发送消息出去
void sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) {
         long requestId = connRequest.getId();
         long waitBegin = System.currentTimeMillis();
         long waitEnd = waitBegin + timeoutMs;
         Object[] queue = new Object[4];
         int idx = 0;
         queue[idx++] = waitEnd;
         queue[idx++] = waitBegin;   //用于记录日志
         queue[idx++] = connRequest; //用于记录日志
         queue[idx++] = callback;
         requestResidents.put(requestId, queue); // 记录响应队列
         write(connRequest);
 
         // 埋点记录等待响应的Map的大小
         StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(),
                   1L);
}
public void write(final Object connectionMsg) {
//mina里的IoSession.write()发送消息
         WriteFuture writeFuture = ioSession.write(connectionMsg);
         // 注册FutureListener,当请求发送失败后,能够立即做出响应
         writeFuture.addListener(new MsgWrittenListener(this, connectionMsg));
}
 
/**
* 在得到响应后,删除对应的请求队列,并执行回调
* 调用者:MINA线程
*/
public void putResponse(final ConnectionResponse connResp) {
         final long requestId = connResp.getRequestId();
         Object[] queue = requestResidents.remove(requestId);
         if (null == queue) {
             Object appResp = connResp.getAppResponse();
             String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName();
             StringBuilder sb = new StringBuilder();
             sb.append("Not found response receiver for requestId=[").append(requestId).append("],");
             sb.append("from [").append(connResp.getHost()).append("],");
             sb.append("response type [").append(appRespClazz).append("].");
             LOGGER.warn(sb.toString());
             return;
         }
         int idx = 0;
         idx++;
         long waitBegin = (Long) queue[idx++];
         ConnectionRequest connRequest = (ConnectionRequest) queue[idx++];
         ResponseCallback callback = (ResponseCallback) queue[idx++];
         // ** 把回调任务交给业务提供的线程池执行 **
         Executor callbackExecutor = callback.getExecutor();
         callbackExecutor.execute(new CallbackExecutorTask(connResp, callback));
 
         long duration = System.currentTimeMillis() - waitBegin; // 实际读响应时间
         logIfResponseError(connResp, duration, connRequest.getAppRequest());
}

 

以上代码是dubbo老版本的,但是思路却是和新版本完全一样的,在下一篇文章中会重点介绍新版本通信代码。。。。。未完待续。。。。。

分享到:
评论

相关推荐

    dubbo:dubbo深度解析源码-2.5.4

    远程处理:提供异步同步和请求响应消息传递的网络通信框架。 群集:具有负载平衡/故障转移/群集功能的远程过程调用抽象。 注册:用于服务注册和服务事件发布/订阅的服务目录框架 有关更多详细信息,请参考 。 ...

    DubboCode:dubbo原始码阅读注解----最新

    它包含三个关键部分,其中包括: 远程处理:提供异步同步和请求响应消息传递的网络通信框架。 群集:具有负载平衡/故障转移/群集功能的远程过程调用抽象。 注册:用于服务注册和服务事件发布/订阅的服务目录框架...

    dubbo协议、netty框架总结

    Dubbo是一个开源的分布式服务框架,旨在帮助开发人员快速而简单...总之,Dubbo协议和Netty框架是实现Dubbo框架的两个关键要素,Dubbo协议实现了分布式应用程序之间的通信,而Netty框架提供了高性能的网络应用程序框架。

    Dubbo 序列化协议 5 连问,你接得住不.PDF

    默认就是⾛ dubbo 协议,单⼀⻓连接,进⾏的是 NIO 异步通信,基于 hessian 作为序列化协议。使⽤的场景是:传输数据量⼩ (每次请求在 100kb 以内),但是并发量很⾼。 为了要⽀持⾼并发场景,⼀般是服务提供者就⼏...

    百度地图毕业设计源码-netty-projects:网络项目

    1、异步事件驱动NIO框架Netty介绍 简介:介绍Netty来源,版本,目前在哪些主流公司和产品框架使用 1、Netty是由JBOSS提供的一个java开源框架, 是业界最流行的NIO框架,整合了多种协议( 包括FTP、SMTP、HTTP等各种二...

    Java基于Netty实现的高性能分布式IM即时通信系统源码+项目说明.tar

    Java基于Netty实现的高性能分布式IM即时通信系统源码+项目说明.tar 介绍 `RIM`是基于Netty实现的面相开发者的高性能分布式即时通信系统,保证消息的实时性、有序性、可靠性。 ## 技术栈 | 名称 | 作用 | | -------...

    Dubbo面试28题答案详解:核心功能+服务治理+架构设计等

    1.Dubbo是什么? Dubbo 是一个分布式、高性能、透明化的 RPC 服务框架,提供服务自动注册、自动发现等高效服务治理方案, 可以和 Spring 框架无缝集成。 RPC 指的是远程调用协议,也就是说两个服务器交互数据。 2....

    Dubbo服务框架-其他

    Dubbo主要核心部件:Remoting:网络通信框架,实现了sync-over-async和request-response消息机制RPC:一个远程过程调用的抽象,支持负载均衡、容灾和集群功能Registry:服务目录框架用于服务的注册和服务事件发布和...

    dubbo-master

    远程处理:提供异步同步和请求响应消息传递的网络通信框架。 群集:具有负载平衡/故障转移/群集功能的远程过程调用抽象。 注册:用于服务注册和服务事件发布/订阅的服务目录框架 有关更多详细信息,请参考或 。 ...

    spring-boot示例项目

    RocketMQ|[Spring Cloud Alibaba(五)RocketMQ 异步通信实现](https://github.com/smltq/spring-boot-demo/blob/master/cloud-alibaba/README5.md) ### 其它 模块名称|主要内容 ---|--- leetcode|[力扣题解...

    SpringCloud(一).md

    3、MQ的程序:与MQ的通信(5种) RPC:专门的框架(dubbo) - 简单队列:一个生产者 一个消费者 - 工作队列:一个生产者 多个消费者(分担任务) 获取消息:公平竞争 - 消息的P/S:发布与订阅 本质:对消息进行过滤...

    pacebox-netty 是一个基于netty和pacebox-core封装的便捷工具包.rar

    Hermes是一款基于Netty的可以支持百万级别的并发连接的高性能、高度可扩展的的网络通讯框架,它参考了dubbo和sofa-bolt的网络通讯模块的设计,hemers可以使用在IM、长连接等领域,它具有以下的特性: 私有的通讯...

    基于Python实现的一个简单的分布式高并发RPC框架+源代码+文档说明

    &gt; + 不是想要造轮子,Dubbo、gRPC、Thift这些轮子已经非常好用了 &gt; + RPC在微服务、分布式系统、Web服务器方面应用太广泛了,需要对底层通信过程有基本认识 &gt; + Nignx、Hadoop、K8s、Tensorflow等系统或软件的底层...

    微服务面试专题及答案.pdf

    说说 Dubbo 的实现原理 dubbo 作为 rpc 框架,实现的效果就是调用远程的方法就像在本地调用一样。如何做到 呢?就是本地有对远程方法的描述,包括方法名、参数、返回值,在 dubbo 中是远程和本 地使用同样的接口;...

    基于springcloud+Netty+MQ+mysql的分布式即时聊天系统源码+数据库+项目说明.zip

    1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子信息等专业的课程设计、期末大作业和毕设项目,作为参考资料学习借鉴。 3、本资源作为“参考资料”如果需要实现其他功能,...

    JAVA上百实例源码以及开源项目源代码

    Java 源码包 Applet钢琴模拟程序java源码 2个目标文件,提供基本的音乐编辑功能。编辑音乐软件的朋友,这款实例会对你有所帮助。 Calendar万年历 1个目标文件 EJB 模拟银行ATM流程及操作源代码 6个目标文件,EJB来...

    Netty初探:掌握高性能网络通信框架,提升Java网络编程技能

    Netty是一个高性能、异步的网络通信框架,它解决了JDK自带的NIO API使用复杂的问题,提供了更高的性能和更低的延迟。Netty广泛应用于互联网行业、游戏行业和大数据领域,如阿里的Dubbo和RocketMQ等都使用了Netty作为...

    JAVA上百实例源码以及开源项目

    笔者当初为了学习JAVA,收集了很多经典源码,源码难易程度分为初级、中级、高级等,详情看源码列表,需要的可以直接下载! 这些源码反映了那时那景笔者对未来的盲目,对代码的热情、执着,对IT的憧憬、向往!此时此...

    基于springboot , zookeeper , redis 分布式事务强一致性方案+源代码+文档说明

    fat.redis.pool.max-wait=-1 # 连接池中的最大空闲连接 fat.redis.pool.max-idle=10 # 连接池中的最小空闲连接 fat.redis.pool.min-idle=2 # 连接超时时间(毫秒) fat.redis.timeout=1000 # 集群模式,如有配置,将...

    Netty高性能网络应用框架.rar

    Netty是一款高性能的网络通信框架,其应用也很广泛,比如常用的消息队列RocketMQ,RPC框架Dubbo在底层都有使用到Netty。在学习Netty之前,我们需要对IO模型要有一定的了解,其中最重要的就是NIO,所以今天打算先对NIO...

Global site tag (gtag.js) - Google Analytics