Dubbo服务调用源码


前言

前面对于服务导出和服务引入有了比较详细的解释,这里主要是对服务的整个调用过程进行解析。首先我们看一下Dubbo在整个调用过程情况。官方源码解析;

  1. 应用启动时,服务消费者首先会从注册中心拉取服务存到服务目录。
  2. 调用的时候会先从服务目录吧当前集群所有DubboInvokers全部撸出来。
  3. 根据路由配置规则从DubboInvokers中筛选得出满足条件的DubboInvokers
  4. 根据负载均衡从满足条件的DubboInvokers中得到一个DubboInvokers
  5. 执行消费者filter链
  6. 通过Exchange封装请求对象
  7. 对请求对象进行编码通过Netty(Dubbo默认网络框架,也可以是别的)发送数据
  8. 服务提供方接受到请求,进行解码
  9. 将这个请求对象封装成runable对象,由Dispatcher决定这个请求由IO线程或业务线程池执行
  10. 执行invoker链
  11. 执行提供者filter链
  12. invoker反射调用方法(实现类)
  13. 结果进行编码并返回
  14. 消费者对响应数据进行解码,并返回

服务消费方发送请求

大家都知道我们DubboReference下的是一个代理类,在这个代理类里面会调用InvokerInvocationHandler#invoke方法(从官方对代理类反编译得知)。

在这个方法里面比较核心的主要是一句代码invoker.invoke(rpcInvocation).recreate()

这句代码里面有2点

  • invoke:开始调用逻辑
  • recreate:会调用AppResponse中的recreate方法,如果AppResponse对象中存在exception信息,此方法中会throw

之后通过invoker.invoke开始调用一系列的Invoker,这里我们主要看一下MockClusterInvoker、FailoverClusterInvoker、DubboInvoker

MockClusterInvoker

这里逻辑比较简单,了解mock应用基本可以很快过掉这段代码

public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    // // 获取 mock 配置值
    String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
    // 没有mock逻辑
    if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
        //no mock
        result = this.invoker.invoke(invocation);
    // 强制mock返回,不发起远程调用
    } else if (value.startsWith("force")) {
        if (logger.isWarnEnabled()) {
            logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
        }
        //force:direct mock
        result = doMockInvoke(invocation, null);
    // 消费方调用失败执行 mock 逻辑,不抛出异常
    } else {
        //fail-mock
        try {
            result = this.invoker.invoke(invocation);

            //fix:#4585
            if(result.getException() != null && result.getException() instanceof RpcException){
                RpcException rpcException= (RpcException)result.getException();
                if(rpcException.isBiz()){
                    throw  rpcException;
                }else {
                    result = doMockInvoke(invocation, rpcException);
                }
            }

        } catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            }

            if (logger.isWarnEnabled()) {
                logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
            }
            result = doMockInvoke(invocation, e);
        }
    }
    return result;
}

FailoverClusterInvoker

这个Invoker主要负责的是集群容错,包括负载均衡,异常重试

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyInvokers = invokers;
    checkInvokers(copyInvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    // 重试次数
    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    // 存储已经调用过的Invoker
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    // 循环调用,循环重试次数
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        if (i > 0) {
            checkWhetherDestroyed();
            copyInvokers = list(invocation);
            // check again
            checkInvokers(copyInvokers, invocation);
        }
        // 通过负载均衡选择 Invoker
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        // 添加到 invoker 到 invoked 列表中,,这个列表表示调用过的Invoker,这里存了之后可以方便我们负载均衡的时候筛选掉已经调用过的服务
        invoked.add(invoker);
        // 设置 invoked 到 RPC 上下文中
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            // 调用目标 Invoker 的 invoke 方法
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) {
                logger.warn();
            }
            // 调用成功返回
            return result;
        } catch (RpcException e) {
            // 调用异常 重试
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException();
}

DubboInvoker

protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        // 表示只需要调用,不需要返回结果,单向通信
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        // 当前方法超时时间
        int timeout = calculateTimeout(invocation, methodName);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            ExecutorService executor = getCallbackExecutor(getUrl(), inv);
            // 异步去请求,得到一个CompletableFuture
            // 这里会通过ReferenceCountExchangeClient发送请求
            CompletableFuture<AppResponse> appResponseFuture =
                    currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
            // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
            FutureContext.getContext().setCompatibleFuture(appResponseFuture);
            AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
            result.setExecutor(executor);
            return result;
        }
    } catch (TimeoutException e) {
        throw new RpcException();
    } catch (RemotingException e) {
        throw new RpcException();
    }
}

这里会通过ReferenceCountExchangeClient#request继续往后调用,直到调用到HeaderExchangeChannel#request会发送请求直到调用到NettyChannel#send逻辑

下面是调用逻辑图

NettyChannel#send

调用NioSocketChannel的writeAndFlush发送数据,然后判断send如果是true,那么则阻塞url中指定的timeout时间,因为如果sent是false,在HeaderExchangeChannel中会阻塞timeout时间

public void send(Object message, boolean sent) throws RemotingException {
    // whether the channel is closed
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        ChannelFuture future = channel.writeAndFlush(message);
        if (sent) {
            // wait timeout ms
            timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            success = future.await(timeout);
        }
        Throwable cause = future.cause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        removeChannelIfDisconnected(channel);
        throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }
    if (!success) {
        throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()
                                    + "in timeout(" + timeout + "ms) limit");
    }
}

服务提供方接受请求

服务消费者通过Netty发送请求给提供方,提供方会通过Netty接收请求。这这里的逻辑在NettyServer中体现,这里构造了netty(服务导出的时候构建)。

所以从netty里面的pipeline大致可以看出他的编解码逻辑和handler是谁

编解码逻辑在这里就不细说了,具体代码细节可以看官网

protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();

    bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
    workerGroup = NettyEventLoopFactory.eventLoopGroup(
            getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            "NettyServerWorker");

    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NettyEventLoopFactory.serverSocketChannelClass())
            .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // FIXME: should we use getTimeout()?
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                        ch.pipeline().addLast("negotiation",
                                SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
                    }
                    ch.pipeline()
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                            .addLast("handler", nettyServerHandler);
                }
            });
    // bind
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();

}

从这段代码可以很明显的发现处理请求的时NettyServerHandler

NettyServerHandler接受到请求会交由channelRead方法处理

NettyServerHandler

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    handler.received(channel, msg);
}

MultiMessageHandler

判断接收到的数据是否是MultiMessage,如果是则获取MultiMessage中的单个Message

HeartbeatHandler

判断是不是心跳消息,如果是直接返回

AllChannelHandler

把接收到的Message封装为一个ChannelEventRunnable对象,扔给线程池进行处理

ChannelEventRunnable中会调用DecodeHandler

public void run() {
    if (state == ChannelState.RECEIVED) {
        try {
            handler.received(channel, message);
        } catch (Exception e) {
            logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                    + ", message is " + message, e);
        }
    } else {
        switch (state) {
        case CONNECTED:
            try {
                handler.connected(channel);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
            }
            break;
        case DISCONNECTED:
            try {
                handler.disconnected(channel);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
            }
            break;
        case SENT:
            try {
                handler.sent(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
            }
            break;
        case CAUGHT:
            try {
                handler.caught(channel, exception);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is: " + message + ", exception is " + exception, e);
            }
            break;
        default:
            logger.warn("unknown state: " + state + ", message is " + message);
        }
    }

}

DecodeHandler

按Dubbo协议的数据格式,解析当前请求的path,version,方法,方法参数等等,然后把解析好了的请求交给HeaderExchangeHandler

HeaderExchangeHandler

处理Request数据,首先构造一个Response对象,然后调用ExchangeHandlerAdapter得到一个CompletionStage future,然后给future通过whenComplete绑定一个回调函数,当future执行完了之后,就可以从回调函数中得到ExchangeHandlerAdapter的执行结果,并把执行结果设置给Response对象,通过channel发送出去

ExchangeHandlerAdapter这个在DubboProtocol重写了

所以我们来到DubboProtocol#ExchangeHandlerAdapter

DubboProtocol#ExchangeHandlerAdapter

Result result = invoker.invoke(inv);

这里的invoker是ProtocolFilterWrapper$CallbackRegistrationInvoker

ProtocolFilterWrapper$CallbackRegistrationInvoker

这个invoker主要负责执行过滤器链

过滤器链

EchoFilter >> ClassLoaderFilter >> GenericFilter >> ContextFilter >> TraceFilter >> TimeoutFilter >> MonitorFilter >> ExceptionFilter

AbstractProxyInvoker

这里会吧异常封装交给消费端进行异常抛出,在服务端不会进行异常抛出,为了保证和本地调用效果一样。

public Result invoke(Invocation invocation) throws RpcException {
     try {
         // 执行服务,得到一个接口,可能是一个CompletableFuture(表示异步调用),可能是一个正常的服务执行结果(同步调用)
         // 如果是同步调用会阻塞,如果是异步调用不会阻塞
         Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
// 将同步调用的服务执行结果封装为CompletableFuture类型
CompletableFuture<Object> future = wrapWithFuture(value);
         CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
             // 如果是同步服务出异常了,封装异常
             AppResponse result = new AppResponse();
             if (t != null) {
                 if (t instanceof CompletionException) {
                     result.setException(t.getCause());
                 } else {
                     result.setException(t);
                 }
             } else {
                 result.setValue(obj);
             }
             return result;
         });
         return new AsyncRpcResult(appResponseFuture, invocation);
     } catch (InvocationTargetException e) {
         if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
             logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
         }
         return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
     } catch (Throwable e) {
         throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
     }
 }

文章作者: dm
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 dm !
评论
 上一篇
ZooKeeper安装及常用命令 ZooKeeper安装及常用命令
发现zookeeper3.5.3之后出了许多新特性,就趁此时机安装一下较新版本zookeeper,顺便记录一下安装步骤,方便以后查验。 首先准备zookeeper,在这里我使用的是centos7,java环境是1.8.安装的zookeepe
2023-08-01
下一篇 
Dubbo服务导出源码 Dubbo服务导出源码
入口还记得springboot会注册DubboBootstrapApplicationListener监听事件,在这个事件中onContextRefreshedEvent调用了dubboBootstrap.start()方法。说明了在Spr
2023-07-01
  目录