`
flychao88
  • 浏览: 742334 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【转】使用Netty实现多路复用的client

 
阅读更多

       Netty只提供的异步传输数据的方式,但是并没有实现多路复用的client。

一个分布式的客户端代码基本是这个样子的:

1
2
3
4
public Response sent(final Request request) {
    channel.writeAndFlush(request);    
    return clientChannelInitializer.getResponse(request.getMessageId());  
}

 

首先通过channel发送一个请求到server,然后等待server返回的结果。

但是Netty是异步的,writeAndFlush这个方法,只是告诉server,我要发数据了,然后就马上返回了。所以这时直接调用getResponse,会得不到值。因为netty是在回调里面写返回值的。

解决的办法是,使用BlockingQueue接收返回的数据。一旦BlockingQueue有数据了,就take出来。如果没数据,就一直等待。

 

在单线程里,这个办法没问题。这就要确保Netty的client只被一个线程访问。如果是多线程同时访问,因为异步的原因,有可能第二个线程的返回值被第一个线程拿到。举个例子:

 

  1. 1、线程A使用client发送请求

  2. 线程A从BlockingQueue中取数据。这时Queue为空,线程A等待。

  3. server接受并开启一个线程处理请求A

  4. 2、线程B使用client发送请求

  5. 线程B从BlockingQueue中取数据。这时Queue为空,线程B也等待。

  6. 3、server接受并开启另一个线程处理请求B

  7. 线程B的请求处理速度较快,先返回

  8. 4、client将返回值B写入BlockingQueue

  9. BlockQueue有数据了,线程A take到数据,但是是线程B的结果

这就导致request和response不匹配。

 

要解决这个问题有3个办法。

  1. 1、使用短连接,每次请求new一个client,接收的response后close掉这个client。这个的缺点很明显,这就相当于http服务器了,不能发挥内网长连接的优势。

  2. 2、使用连接池,每次从连接池里拿一个client,接收完response后将这个client返还连接池。这个就有点像数据库连接池。这个方法没什么缺点,但是需要自己实现连接池。

  3. 3、使用单一client,但是在request中生成一个唯一的messageId,可以是nanoTime。然后server处理完后,在response中也返回这个messageId。这样在client不是维护一个BlockingQueue,而是维护一个ConcurrentHashMap,key是messageId,value是一个空的BlockingQueue。当client发送resquest到server时,在Map里写入messageId,并实例化一个BlockingQueue(为了优化,size可以是1)。然后等待这个BlockingQueue有值。在接收到response的回调方法里,根据messageId取出blockingQueue的值,然后删除掉这个Key。

  4.  

    我自己实现了第三种方式,具体代码请参见:

    https://github.com/terrymanu/miracle-framework/tree/master/miracle-framework-remote/miracle-framework-remote-netty 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics