1、client一个线程调用远程接口,生成一个唯一的ID(比如一段随机字符串,UUID等),Dubbo是使用AtomicLong从0开始累计数字。
2、将打包的方法调用信息(如调用的接口名称,方法名称,参数值列表等),和处理结果的回调对象callback,全部封装在一起,组成一个对象object。
3、向专门存放调用信息的全局ConcurrentHashMap里面put(ID, object)。
4、将ID和打包的方法调用信息封装成一对象connRequest,使用IoSession.write(connRequest)异步发送出去。
5、当前线程再使用callback的get()方法试图获取远程返回的结果,在get()内部,则使用synchronized获取回调对象callback的锁, 再先检测是否已经获取到结果,如果没有,然后调用callback的wait()方法,释放callback上的锁,让当前线程处于等待状态。
6、服务端接收到请求并处理后,将结果(此结果中包含了前面的ID,即回传)发送给客户端,客户端socket连接上专门监听消息的线程收到消息,分析结果,取到ID,再从前面的ConcurrentHashMap里面get(ID),从而找到callback,将方法调用结果设置到callback对象里。
7、监听线程接着使用synchronized获取回调对象callback的锁(因为前面调用过wait(),那个线程已释放callback的锁了),再notifyAll(),唤醒前面处于等待状态的线程继续执行(callback的get()方法继续执行就能拿到调用结果了),至此,整个过程结束。
客户端部分源码:
//同步调用远程接口 public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException { byte protocol = getProtocol(control); if (!TRConstants.isValidProtocol(protocol)) { throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync."); } ResponseFuture future = invokeWithFuture(appRequest, control); return future.get(); //获取结果时让当前线程等待,ResponseFuture其实就是前面说的callback } public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) { byte protocol = getProtocol(control); long timeout = getTimeout(control); ConnectionRequest request = new ConnectionRequest(appRequest); request.setSerializeProtocol(protocol); Callback2FutureAdapter adapter = new Callback2FutureAdapter(request); connection.sendRequestWithCallback(request, adapter, timeout); return adapter; }
回调部分源码如下:
Callback2FutureAdapter implements ResponseFuture public Object get() throws RemotingException, InterruptedException { synchronized (this) { // 旋锁 while (!isDone) { // 是否有结果了 wait(); //没结果是释放锁,让当前线程处于等待状态 } } if (errorCode == TRConstants.RESULT_TIMEOUT) { throw new TimeoutException("Wait response timeout, request[" + connectionRequest.getAppRequest() + "]."); } else if (errorCode > 0) { throw new RemotingException(errorMsg); } else { return appResp; } } 客户端收到服务端结果后,回调时相关方法,即设置isDone = true并notifyAll() public void handleResponse(Object _appResponse) { appResp = _appResponse; //将远程调用结果设置到callback中来 setDone(); } public void onRemotingException(int _errorType, String _errorMsg) { errorCode = _errorType; errorMsg = _errorMsg; setDone(); } private void setDone() { isDone = true; synchronized (this) { //获取锁,因为前面wait()已经释放了callback的锁了 notifyAll(); // 唤醒处于等待的线程 } }
Dubbo通信部分源码:
// 用来存放请求和回调的MAP private final ConcurrentHashMap<Long, Object[]> requestResidents; //发送消息出去 void sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) { long requestId = connRequest.getId(); long waitBegin = System.currentTimeMillis(); long waitEnd = waitBegin + timeoutMs; Object[] queue = new Object[4]; int idx = 0; queue[idx++] = waitEnd; queue[idx++] = waitBegin; //用于记录日志 queue[idx++] = connRequest; //用于记录日志 queue[idx++] = callback; requestResidents.put(requestId, queue); // 记录响应队列 write(connRequest); // 埋点记录等待响应的Map的大小 StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(), 1L); } public void write(final Object connectionMsg) { //mina里的IoSession.write()发送消息 WriteFuture writeFuture = ioSession.write(connectionMsg); // 注册FutureListener,当请求发送失败后,能够立即做出响应 writeFuture.addListener(new MsgWrittenListener(this, connectionMsg)); } /** * 在得到响应后,删除对应的请求队列,并执行回调 * 调用者:MINA线程 */ public void putResponse(final ConnectionResponse connResp) { final long requestId = connResp.getRequestId(); Object[] queue = requestResidents.remove(requestId); if (null == queue) { Object appResp = connResp.getAppResponse(); String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName(); StringBuilder sb = new StringBuilder(); sb.append("Not found response receiver for requestId=[").append(requestId).append("],"); sb.append("from [").append(connResp.getHost()).append("],"); sb.append("response type [").append(appRespClazz).append("]."); LOGGER.warn(sb.toString()); return; } int idx = 0; idx++; long waitBegin = (Long) queue[idx++]; ConnectionRequest connRequest = (ConnectionRequest) queue[idx++]; ResponseCallback callback = (ResponseCallback) queue[idx++]; // ** 把回调任务交给业务提供的线程池执行 ** Executor callbackExecutor = callback.getExecutor(); callbackExecutor.execute(new CallbackExecutorTask(connResp, callback)); long duration = System.currentTimeMillis() - waitBegin; // 实际读响应时间 logIfResponseError(connResp, duration, connRequest.getAppRequest()); }
以上代码是dubbo老版本的,但是思路却是和新版本完全一样的,在下一篇文章中会重点介绍新版本通信代码。。。。。未完待续。。。。。
相关推荐
远程处理:提供异步同步和请求响应消息传递的网络通信框架。 群集:具有负载平衡/故障转移/群集功能的远程过程调用抽象。 注册:用于服务注册和服务事件发布/订阅的服务目录框架 有关更多详细信息,请参考 。 ...
它包含三个关键部分,其中包括: 远程处理:提供异步同步和请求响应消息传递的网络通信框架。 群集:具有负载平衡/故障转移/群集功能的远程过程调用抽象。 注册:用于服务注册和服务事件发布/订阅的服务目录框架...
Dubbo是一个开源的分布式服务框架,旨在帮助开发人员快速而简单...总之,Dubbo协议和Netty框架是实现Dubbo框架的两个关键要素,Dubbo协议实现了分布式应用程序之间的通信,而Netty框架提供了高性能的网络应用程序框架。
默认就是⾛ dubbo 协议,单⼀⻓连接,进⾏的是 NIO 异步通信,基于 hessian 作为序列化协议。使⽤的场景是:传输数据量⼩ (每次请求在 100kb 以内),但是并发量很⾼。 为了要⽀持⾼并发场景,⼀般是服务提供者就⼏...
1、异步事件驱动NIO框架Netty介绍 简介:介绍Netty来源,版本,目前在哪些主流公司和产品框架使用 1、Netty是由JBOSS提供的一个java开源框架, 是业界最流行的NIO框架,整合了多种协议( 包括FTP、SMTP、HTTP等各种二...
Java基于Netty实现的高性能分布式IM即时通信系统源码+项目说明.tar 介绍 `RIM`是基于Netty实现的面相开发者的高性能分布式即时通信系统,保证消息的实时性、有序性、可靠性。 ## 技术栈 | 名称 | 作用 | | -------...
1.Dubbo是什么? Dubbo 是一个分布式、高性能、透明化的 RPC 服务框架,提供服务自动注册、自动发现等高效服务治理方案, 可以和 Spring 框架无缝集成。 RPC 指的是远程调用协议,也就是说两个服务器交互数据。 2....
Dubbo主要核心部件:Remoting:网络通信框架,实现了sync-over-async和request-response消息机制RPC:一个远程过程调用的抽象,支持负载均衡、容灾和集群功能Registry:服务目录框架用于服务的注册和服务事件发布和...
远程处理:提供异步同步和请求响应消息传递的网络通信框架。 群集:具有负载平衡/故障转移/群集功能的远程过程调用抽象。 注册:用于服务注册和服务事件发布/订阅的服务目录框架 有关更多详细信息,请参考或 。 ...
RocketMQ|[Spring Cloud Alibaba(五)RocketMQ 异步通信实现](https://github.com/smltq/spring-boot-demo/blob/master/cloud-alibaba/README5.md) ### 其它 模块名称|主要内容 ---|--- leetcode|[力扣题解...
3、MQ的程序:与MQ的通信(5种) RPC:专门的框架(dubbo) - 简单队列:一个生产者 一个消费者 - 工作队列:一个生产者 多个消费者(分担任务) 获取消息:公平竞争 - 消息的P/S:发布与订阅 本质:对消息进行过滤...
Hermes是一款基于Netty的可以支持百万级别的并发连接的高性能、高度可扩展的的网络通讯框架,它参考了dubbo和sofa-bolt的网络通讯模块的设计,hemers可以使用在IM、长连接等领域,它具有以下的特性: 私有的通讯...
> + 不是想要造轮子,Dubbo、gRPC、Thift这些轮子已经非常好用了 > + RPC在微服务、分布式系统、Web服务器方面应用太广泛了,需要对底层通信过程有基本认识 > + Nignx、Hadoop、K8s、Tensorflow等系统或软件的底层...
说说 Dubbo 的实现原理 dubbo 作为 rpc 框架,实现的效果就是调用远程的方法就像在本地调用一样。如何做到 呢?就是本地有对远程方法的描述,包括方法名、参数、返回值,在 dubbo 中是远程和本 地使用同样的接口;...
1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子信息等专业的课程设计、期末大作业和毕设项目,作为参考资料学习借鉴。 3、本资源作为“参考资料”如果需要实现其他功能,...
Java 源码包 Applet钢琴模拟程序java源码 2个目标文件,提供基本的音乐编辑功能。编辑音乐软件的朋友,这款实例会对你有所帮助。 Calendar万年历 1个目标文件 EJB 模拟银行ATM流程及操作源代码 6个目标文件,EJB来...
Netty是一个高性能、异步的网络通信框架,它解决了JDK自带的NIO API使用复杂的问题,提供了更高的性能和更低的延迟。Netty广泛应用于互联网行业、游戏行业和大数据领域,如阿里的Dubbo和RocketMQ等都使用了Netty作为...
笔者当初为了学习JAVA,收集了很多经典源码,源码难易程度分为初级、中级、高级等,详情看源码列表,需要的可以直接下载! 这些源码反映了那时那景笔者对未来的盲目,对代码的热情、执着,对IT的憧憬、向往!此时此...
fat.redis.pool.max-wait=-1 # 连接池中的最大空闲连接 fat.redis.pool.max-idle=10 # 连接池中的最小空闲连接 fat.redis.pool.min-idle=2 # 连接超时时间(毫秒) fat.redis.timeout=1000 # 集群模式,如有配置,将...
Netty是一款高性能的网络通信框架,其应用也很广泛,比如常用的消息队列RocketMQ,RPC框架Dubbo在底层都有使用到Netty。在学习Netty之前,我们需要对IO模型要有一定的了解,其中最重要的就是NIO,所以今天打算先对NIO...