Exchange服务器之Dubbo Exchange 信息交换层
白羽 2018-07-26 来源 :网络 阅读 1489 评论 0

摘要:本文将带你了解Exchange服务器之Dubbo Exchange 信息交换层,希望本文对大家学Exchange有所帮助

ReferenceCountExchangeClient:将请求交HeaderExchangeClient处理,不进行任何其他操作。


public ResponseFuture request(Object request) throws RemotingException {return client.request(request);}


HeaderExchangeClient:提供心跳检查功能;将send、request、close等事件转由HeaderExchangeChannel处理,HeaderExchangeChannel对象中的Channel为所选的NIO框架对应的client对象;以request为例,调用流程如下:HeaderExchangeClient.request(Object request)->HeaderExchangeChannel.request(Object request)->(NettyClient)AbstractPeer.send(Object message)->(NettyClient)AbstractClient.send(Object message,boolean sent)。


public ResponseFuture request(Object request) throws RemotingException {return channel.request(request);//HeaderExchangeChannel}


HeaderExchangeChannel:主要是完成同步转异步,在request(Object request,int timeout)方法中,将请求转换成Request对象,将请求消息设置到data属性上,构建DefaultFuture对象,调用NIO框架对应的Client对象(默认NettyClient)的send方法将消息发送出去,返回DefultFuture对象。


public ResponseFuture request(Object request, int timeout) throws RemotingException {if (closed) {throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}// create request.Request req = new Request();req.setVersion("2.0.0");req.setTwoWay(true);req.setData(request);DefaultFuture future = new DefaultFuture(channel, req, timeout);try{channel.send(req);}catch (RemotingException e) {future.cancel();throw e;}return future;}



NettyClient:完成消息的发送。在调用链的最后一个方法AbstractClient.send(Object message, boolean sent)中,首先通过调用NettyClient.getChannel()获取NettyChannel对象,在构建对象时封装了NIOSocketChannel对象(在初始化NettyClient对象时,根据nettyclient和server端建立连接时获取的socket通道)、统一数据模型URL以及channelHandler对象(NettyClient对象自身),然后调用NettyChannel对象的send方法,将Request消息写入NIOSocketChannel通道中,完成消息发送。


public void send(Object message, boolean sent) throws RemotingException {//AbstractClientif (send_reconnect && !isConnected()){connect();}Channel channel = getChannel();//TODO getChannel返回的状态是否包含null需要改进if (channel == null || ! channel.isConnected()) {throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());}channel.send(message, sent);}




@Overrideprotected com.sitech.hsf.remoting.Channel getChannel() {//nettyclientChannel c = channel;if (c == null || ! c.isConnected())return null;return NettyChannel.getOrAddChannel(c, getUrl(), this);}




public void send(Object message, boolean sent) throws RemotingException {//NettyChannelsuper.send(message, sent);boolean success = true;int timeout = 0;try {//调用Netty框架ChannelFuture future = channel.write(message);if (sent) {//sent=true等待发送失败将抛出异常,false不等待消息发出,将消息放入IO队列,即刻返回。timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);success = future.await(timeout);}Throwable cause = future.getCause();if (cause != null) {throw cause;}} catch (Throwable e) {throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);}if(! success) {throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()+ "in timeout(" + timeout + "ms) limit");}}




HeaderExchangeServer:提供心跳检查功能;启动心跳监测线程池,该线程初始化了一个线程,在线程中调用类HeartBeatTask进行心跳检查,HeartBeatTask处理心跳的规则:

(1)若通道的最新的写入时间或者最新的读取时间与当前时间相比,已经超过了心跳间隔时间,则发送心跳请求;

(2)如果通道的最新的读取时间与当前时间相比,已经超过了心跳的超时时间,对于客户端来说则重连,对于服务端来说则关闭通道。


public HeaderExchangeServer(Server server) {if (server == null) {throw new IllegalArgumentException("server == null");}this.server = server;this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);//this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);if (heartbeatTimeout < heartbeat * 2) {            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");        }        startHeatbeatTimer();    }
3位心跳时长为心跳超时,如果不设置,默认心跳为0。



private void startHeatbeatTimer() {//HeaderExchangeServerstopHeartbeatTimer();if (heartbeat > 0) {heatbeatTimer = scheduled.scheduleWithFixedDelay(new HeartBeatTask( new HeartBeatTask.ChannelProvider() {public Collection<Channel> getChannels() {return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels() );}}, heartbeat, heartbeatTimeout),heartbeat, heartbeat,TimeUnit.MILLISECONDS);}}

如果心跳为0,不执行心跳检测功能。


public void run() {//HeartBeatTasktry {long now = System.currentTimeMillis();//获取当前时间for ( Channel channel : channelProvider.getChannels() ) {if (channel.isClosed()) {//已经关闭了continue;}try {                 //最后一次读的时间戳Long lastRead = ( Long ) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP );//最后一次写的时间戳Long lastWrite = ( Long ) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP );//如果当前时间距离最后一次写或读超过一个心跳时间if ( ( lastRead != null && now - lastRead > heartbeat )|| ( lastWrite != null && now - lastWrite > heartbeat ) ) {Request req = new Request();req.setVersion( "2.0.0" );req.setTwoWay( true );req.setEvent( Request.HEARTBEAT_EVENT );channel.send( req );if ( logger.isDebugEnabled() ) {logger.debug( "Send heartbeat to remote channel " + channel.getRemoteAddress()+ ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms" );}}//如果上一次读距离现在已经超过心路超时时长,尝试重连if ( lastRead != null && now - lastRead > heartbeatTimeout ) {logger.warn( "Close channel " + channel+ ", because heartbeat read idle time out: " + heartbeatTimeout + "ms" );if (channel instanceof Client) {                         try {                         ((Client)channel).reconnect();                         }catch (Exception e) { //do nothing }} else {                         channel.close();}}} catch ( Throwable t ) {logger.warn( "Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t );}}} catch ( Throwable t ) {logger.warn( "Unhandled exception when heartbeat, cause: " + t.getMessage(), t );}}

在起动NettyClient和NettyServer连接时,都添加了一个HeartbeatHandler


public void received(Channel channel, Object message) throws RemotingException {setReadTimestamp(channel);if (isHeartbeatRequest(message)) {//如果是心跳请求Request req = (Request) message;if (req.isTwoWay()) {Response res = new Response(req.getId(), req.getVersion());res.setEvent(Response.HEARTBEAT_EVENT);channel.send(res);if (logger.isInfoEnabled()) {int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);if(logger.isDebugEnabled()) {logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()+ ", cause: The channel has no data-transmission exceeds a heartbeat period"+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));} }}return;}if (isHeartbeatResponse(message)) {//如果是心跳响应if (logger.isDebugEnabled()) {             logger.debug(new StringBuilder(32).append("Receive heartbeat response in thread ").append(Thread.currentThread().getName()).toString());}return;}handler.received(channel, message);}


多线程并发请求与单一长连接

如果客户端多线程并发请求的话,服务端通过单一长连接接受并返回响应信息,如果客户端不加控制就会导制通道中的数据变成无序,无法正确的处理请求。为此,Dubbo给每个请求添加一个唯一的标识ID,服务端响应请求也要携带此ID,供客户端多线程领取对应的响应数据,主要采用多线程编程中的Future模式来解决此问题。客户端的实现具体如下:

(1)当客户端发起远程请求时,最终调用HeaderExchangeClient.request方法,在该方法中调用HeaderExchangeChannel.request方法,并返回DefaultFutrue对象。首先创建Request对象,请求消息作为Data值,并创建唯一标识ID;然后在初始化DefaultFuture对象的过程中,将自身this对象以及channel对象存入全局变量DefaultFuture.FUTURES:ConcurrenthashMap和DefaultFuture.CHANNELS:ConcurrentHashMap中,以标识ID为key。

public ResponseFuture request(Object request, int timeout) throws RemotingException {//HeaderExchangeChannel.requestif (closed) {throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}// create request.Request req = new Request();req.setVersion("2.0.0");req.setTwoWay(true);req.setData(request);DefaultFuture future = new DefaultFuture(channel, req, timeout);try{channel.send(req);}catch (RemotingException e) {future.cancel();throw e;}return future;}

(2)该方法返回DefaultFuture对象给客户端线程,该线程会在DefaultFuture对象的get方法上面阻塞,有两种情况唤醒该线程:(一)接收到响应消息并调用received方法,根据响应消息中返回的ID从前面的ConcurrentHashMap里面get(ID)里面获取DefaultFuture对象,然后更新该对象的Response变量值。(二)RemotingInvocationTimeoutScan线程,定时扫描响应是否超时,若超时,则从FUTURES:ConcurrentHashMap中删除掉Future对象并且将Response变量设置为超时信息。


同步转异步的逻辑

<dubbo:method>标签中async属性表示是否异步招待;oninvoke表示方法执行前的拦截方法,onreturn表示方法执行返回后的拦截方法,onthrow表示方法执行有异常的拦截方法。

FutureFilter为客户端的过滤器,只有客户端才使用该过过滤器。在FutureFilter的invoke方法中,首先调用oninvoker属性配置的方法,然后调用invoker链的invoke方法,最后调用onreturn和onthrow配置的方法。

(1)当async=true时表示异步执行,获取本地线程类ThreadLocal<RpcContetext>的DefaultFuture对象,设置回调用类callback(该回调用类实现了ResponseCallback接口的done和caught方法),在DefaultFuture对象中若收到响应之后调用回调用类的done方法。该done方法中调用客户端配置的onreturn和onthrow方法。

(2)当async=false时表示由不,在调用完invoker链的invoke方法后继续用客户端配置的onreturn和onthrow方法。

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标系统运维之Exchange频道!

本文由 @白羽 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved