【最新直播源码php】【外汇期权源码】【aide源码案例】dubbo 源码

时间:2024-12-26 01:39:59 编辑:delphi html源码 来源:政府源码下载

1.Dubbo调用超时那些事儿
2.Dubbo之SPI实现原理详解

dubbo 源码

Dubbo调用超时那些事儿

       其实之前很早就看过Dubbo源码中关于超时这部分的处理逻辑,但是没有记录下来,最近在某脉上看到有人问了这个问题,想着再回顾一下。最新直播源码php

开始

       从dubbo的请求开始,看看dubbo(2.6.6)在超时这块是怎么处理的:

com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)@Overridepublic ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}// create request.Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setData(request);DefaultFuture future = new DefaultFuture(channel, req, timeout);try { channel.send(req);} catch (RemotingException e) { future.cancel();throw e;}return future;}DefaultFuture

       从返回值ResponseFuture类型可以看出,这是一个异步方法(不等同于Dubbo的异步调用)。那么调用超时的关键可以从ResponseFuture来看:

public interface ResponseFuture { Object get() throws RemotingException;Object get(int timeoutInMillis) throws RemotingException;void setCallback(ResponseCallback callback);boolean isDone();}

       可以看到这是一个接口,从request方法可以得知实现类是DefaultFuture,从构造函数入手:

public DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel;this.request = request;this.id = request.getId();this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);// put into waiting map.FUTURES.put(id, this);CHANNELS.put(id, channel);}

       可以得知每一个DefaultFuture都有一个id,并且等于requestId,外汇期权源码timeout是从url中获取的配置,没有时默认ms。

       从代码的注释可以看到FUTURES这个map应该就是关键,是一个waiting map。

       DefaultFuture中还有一个方法:

public static void received(Channel channel,aide源码案例 Response response) { try { DefaultFuture future = FUTURES.remove(response.getId());if (future != null) { future.doReceived(response);} else { logger.warn("The timeout response finally returned at "+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))+ ", response " + response+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()+ " -> " + channel.getRemoteAddress()));}} finally { CHANNELS.remove(response.getId());}}

       可以看到调用的地方为:

       com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received

@Overridepublic void received(Channel channel, Object message) throws RemotingException { //省略一些代码} else if (message instanceof Response) { handleResponse(channel, (Response) message);//省略一些代码}}

       com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleResponse

static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response);}}

       回到DefaultFuture.received,可以看到通过Response id从FUTURES中拿了一个DefaultFuture出来,然后调用了doReceived方法,也就是说Response id和Request id 相同。结下来看看doReceived做了什么:

private void doReceived(Response res) { lock.lock();try { response = res;if (done != null) { done.signal();}} finally { lock.unlock();}if (callback != null) { invokeCallback(callback);}}

       首先是加锁,然后通过唤醒了阻塞在Condition上的国外 指标源码线程。看看什么地方会阻塞在done这个条件上:

@Overridepublic Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT;}if (!isDone()) { long start = System.currentTimeMillis();lock.lock();try { while (!isDone()) { done.await(timeout, TimeUnit.MILLISECONDS);if (isDone() || System.currentTimeMillis() - start > timeout) { break;}}} catch (InterruptedException e) { throw new RuntimeException(e);} finally { lock.unlock();}if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));}}return returnFromResponse();}

       是get方法,get方法确实在request请求后被调用:

(Result) currentClient.request(inv, timeout).get()

       可以看到get方法的大致逻辑为,先获取锁,然后循环判断isDone,并阻塞等到条件,号码预测源码当条件超时,如果任务完成,或者超过timeout结束循环,接着判断isDone,如果超时抛出TimeoutException。并且通过sent(request请求时间)是否>0()来判断是clientSide还是serverSide超时。

       isDone逻辑如下:

@Overridepublic boolean isDone() { return response != null;}

       如果是正常Response,也有可能是超时的现象,可以看到get方法最后调用了一个函数:

public interface ResponseFuture { Object get() throws RemotingException;Object get(int timeoutInMillis) throws RemotingException;void setCallback(ResponseCallback callback);boolean isDone();}0TIMEOUT SIDE

       SERVER_TIMEOUT(服务端超时): 这个就是正常的我们消费端请求一个RPC接口,服务端由于性能等一些原因处理时间超过了timeout配置时间。

       CLIENT_TIMEOUT:我们可以看到是通过sent(上面有说sent>0)这个来判断是否clientTimeout,那么这个sent什么时候改变呢?就在发送请求的地方:

public interface ResponseFuture { Object get() throws RemotingException;Object get(int timeoutInMillis) throws RemotingException;void setCallback(ResponseCallback callback);boolean isDone();}1

       也就是说handler.sent一旦调用成功返回,那么就不算clientSide Timeout了。那么CLIENT_TIMEOUT大概率就是由于client端网络,系统等原因超时。

原文:/post/

Dubbo之SPI实现原理详解

         SPI全称为Service Provider Interface,是一种服务提供机制,比如在现实中我们经常会有这种场景,就是对于一个规范定义方而言(可以理解为一个或多个接口),具体的服务实现方是不可知的(可以理解为对这些接口的实现类),那么在定义这些规范的时候,就需要规范定义方能够通过一定的方式来获取到这些服务提供方具体提供的是哪些服务,而SPI就是进行这种定义的。

        说明:

       

       

        Dubbo 的扩展点加载是基于JDK 标准的 SPI 扩展点发现机制增强而来的,Dubbo 改进了 JDK 标准的 SPI 的以下问题:

       

        dubbo对于SPI的实现主要是在ExtensionLoader这个类中,这个类主要有三个方法:

       

        如下是getExtension()方法的源码:

       

createExtension()方法的源码:

        在createExtension()方法中,其主要做了三件事:

        关于wrapper对象,这里需要说明的是,其主要作用是为目标对象实现AOP。wrapper对象有两个特点:

       

getExtensionClasses()方法的源码

       

loadDirectory()方法的源码:

       

loadClass()方法的源码

        loadClass()方法主要作用是对子类进行划分,这里主要划分成了三部分:

        总结而言,getExtension()方法主要是获取指定名称对应的子类。在获取过程中,首先会从缓存中获取是否已经加载过该子类,如果没加载过则通过定义文件加载,并且使用获取到的wrapper对象封装目标对象返回。

       

       

getAdaptiveExtension()方法源码