2023-08-02  阅读(3)
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/346

异步回调并非Java网络编程这块的内容,事实上,我曾在《透彻理解Java并发编程》专栏中详细剖析过Future模式,Java中的异步回调实际上就是基于Future模式。

考虑到后续章节,我会剖析Netty源代码,而Netty源码中大量使用了异步回调技术,并且基于Java的异步回调,设计了自己的一整套异步回调接口和实现。所以,本章我先从Java Future异步回调技术入手,然后介绍比较常用的第三方异步回调技术——谷歌公司的Guava Future相关技术,最后介绍一下Netty的异步回调技术,为后续Netty源码剖析作铺垫。

一、Future模式

Java在1.5版本之后提供了一种新的多线程的创建方式——FutureTask方式。FutureTask方式包含了一系列的Java相关类,在java.util.concurrent包中。其中最为重要的是 FutureTask 类和 Callable 接口。

1.1 Callable接口

我们都知道,Runnable接口是在Java多线程中表示线程的业务代码的抽象接口。但是,Runnable有一个重要的问题:它的run方法是没有返回值的。正因为如此,Runnable不能用于需要有返回值的应用场景。

为了解决Runnable接口的问题,Java定义了一个新的和Runnable类似的接口——Callable接口,并且将其中的代表业务处理的方法命名为
call,call方法有返回值:

    package java.util.concurrent;
    
    @FunctionalInterface
    public interface Callable<V> {
        // call方法有返回值
        V call() throws Exception;
    }

Callable接口是一个泛型接口,也声明为了“函数式接口”。其唯一的抽象方法call有返回值,返回值的类型为泛型形参的实际类型。call抽象方法还有一个Exception的异常声明,容许方法内部的异常不经过捕获。

Callable接口与Runnable接口相比,还有一个很大的不同: Callable接口的实例不能作为Thread线程实例的target来使用 ;而Runnable接口实例可以作为Thread线程实例的target构造参数,开启一个Thread线程。

那么问题来了,Java中的线程类型,只有一个Thread类,没有其他的类型,那Callable实例该如何异步执行呢?事实上,Java提供了Callable实例和Thread的target成员之间一个搭桥的类——FutureTask类。

1.2 FutureTask类

FutureTask类代表一个未来执行的任务,也位于java.util.concurrent包中。FutureTask类的构造函数的参数为Callable类型,实际上是对Callable类型的二次封装,可以执行Callable的call方法。FutureTask类间接地继承了Runnable接口,从而可以作为Thread实例的target执行目标:

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW; // ensure visibility of callable
    }

总体来说,FutureTask类首先是一个搭桥类的角色,FutureTask类能当作Thread线程去执行目标target,被异步执行;其次,如果要获取异步执行的结果,需要通过FutureTask类的方法去获取,在FutureTask类的内部,会将Callable的call方法的真正结果保存起来,以供外部获取。

1.3 Future接口

在Java语言中,将FutureTask类的一系列操作,抽象出来作为一个重要的接口——Future接口。当然,FutureTask类也实现了此接口。Future接口不复杂,主要是对并发任务的执行及获取其结果的一些操作:

    package java.util.concurrent;
    public interface Future<V> {
        boolean cancel(boolean mayInterruptRunning);
        boolean isCancelled();
        boolean isDone();
        V get() throws InterruptedException,ExecutionException;
        V get(long timeout,TimeUnitunit) throws InterruptedException,ExecutionException,TimeoutException;
    }

Future主要提供了3大功能:

  1. 判断并发任务是否执行完成;
  2. 获取并发任务执行的结果;
  3. 取消并发执行中的任务。

关于Future接口的方法,详细说明如下:

  • V get() :获取并发任务执行的结果。这个方法是阻塞性的,如果并发任务没有执行完成,调用此方法的线程会一直阻塞,直到并发任务执行完成;
  • V get(Long timeout,TimeUnit unit) :获取并发任务执行的结果。这个方法也是阻塞性的,但是会有阻塞时间限制,如果阻塞时间超过设定的timeout时间,该方法将抛出异常;
  • boolean isDone(): 获取并发任务的执行状态。如果任务执行结束,则返回true;
  • boolean isCancelled() :获取并发任务的取消状态。如果任务完成前被取消,则返回true;
  • boolean cancel(boolean mayInterruptRunning) :取消并发任务的执行。

1.4 使用示例

我们来看一个使用FutureTask完成异步回调的示例。定义两个异步任务,主线程通过FutureTask获取异步任务的执行结果:

    /**
     * 烧水任务
     */
    class HotWarterJob implements Callable<Boolean>
    {
        @Override
        public Boolean call() throws Exception
        {
            try {
                Logger.info("洗好水壶");
                Logger.info("灌上凉水");
                Logger.info("放在火上");
                //线程睡眠一段时间,代表烧水中
                Thread.sleep(500);
                Logger.info("水开了");
            } catch (InterruptedException e) {
                Logger.info(" 发生异常被中断.");
                return false;
            }
            Logger.info(" 运行结束.");
            return true;
        }
    }
    /**
     * 洗茶具任务类
     */
    class WashJob implements Callable<Boolean> {
        @Override
        public Boolean call() throws Exception {
            try {
                Logger.info("洗茶壶");
                Logger.info("洗茶杯");
                Logger.info("拿茶叶");
                //线程睡眠一段时间,代表清洗中
                Thread.sleep(500);
                Logger.info("洗完了");
            } catch (InterruptedException e) {
                Logger.info(" 清洗工作发生异常被中断.");
                return false;
            }
            Logger.info(" 清洗工作运行结束.");
            return true;
        }
    }
    /**
     * 喝茶类
     */
    public class JavaFutureDemo {
        public static void drinkTea(boolean warterOk, boolean cupOk) {
            if (warterOk && cupOk) {
                Logger.info("泡茶喝");
            } else if (!warterOk) {
                Logger.info("烧水失败,没有茶喝了");
            } else if (!cupOk) {
                Logger.info("杯子洗不了,没有茶喝了");
            }
        }
    
        public static void main(String args[]) {
            // 1.创建烧水异步任务
            Callable<Boolean> hJob = new HotWarterJob();
            FutureTask<Boolean> hTask = new FutureTask<>(hJob);
            Thread hThread = new Thread(hTask, "** 烧水-Thread");
    
            // 2.创建洗茶具异步任务
            Callable<Boolean> wJob = new WashJob();
            FutureTask<Boolean> wTask = new FutureTask<>(wJob);
            Thread wThread = new Thread(wTask, "$$ 清洗-Thread");
    
            // 3.启动异步任务
            hThread.start();
            wThread.start();
    
            // 4.获取任务执行结果
            try {
                // 通过FutureTask类的get方法,获取异步结果时,主线程会被阻塞
                boolean warterOk = hTask.get();
                boolean cupOk = wTask.get();
                drinkTea(warterOk, cupOk);
            } catch (InterruptedException e) {
                Logger.info("发生异常被中断.");
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

二、Guava回调模式

上述使用Future模式的过程中,主线程在获取异步任务的执行结果时,如果调用Future.get()方法,会被阻塞住,所以Future模式本质还是一种异步阻塞模式。被阻塞的主线程不能干任何事情,唯一能干的,就是在傻傻地等待。原生Java API,除了阻塞的获取结果外,并没有实现非阻塞的异步结果获取方法。如果需要用到获取异步的结果,则需要引入一些额外的框架,我这里介绍Google Guava这个常用框架。

2.1 Google Guava

何为Guava?它是谷歌公司提供的Java扩展包,提供了一种异步回调的解决方案。相关的源代码在com.google.common.util.concurrent包中。Guava中的很多类,都是对java.util.concurrent能力的扩展和增强。例如,Guava的异步任务接口ListenableFuture扩展了Java的Future接口,实现了 非阻塞获取异步结果 的功能。

Guava对Java的异步回调机制,做了以下的增强:

  1. 引入了一个新的接口ListenableFuture(继承自Java的Future接口),使得能以异步非阻塞的方式获取任务执行结果;
  2. 引入了一个新的接口FutureCallback(这是一个独立接口),该接口可以在异步任务执行完成后,根据任务结果完成不同的回调处理。

2.2 FutureCallback

FutureCallback是一个新增的接口,用来实现异步任务执行完后的回调逻辑:

    package com.google.common.util.concurrent;
    public interface FutureCallback<V> {
        void onSuccess(@Nullable V var1);
        void onFailure(Throwable var1);
    }

可以看到,FutureCallback拥有两个回调方法:

  • onSuccess: 在异步任务执行成功后被回调。调用时,异步任务的执行结果作为onSuccess方法的参数被传入;
  • onFailure: 在异步任务执行抛出异常时被回调。调用时,异步任务所抛出的异常作为onFailure方法的参数被传入。

2.3 ListenableFuture

Guava的ListenableFuture接口是对Java的Future接口的扩展,可以理解为异步任务的实例:

    package com.google.common.util.concurrent;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Future;
    
    public interface ListenableFuture<V> extends Future<V> {
    // 此方法由Guava内部调用
    void addListener(Runnable r, Executor e);
    }

可以看到,ListenableFuture仅仅增加了一个方法——addListener方法。它的作用就是将FutureCallback封装成一个内部的Runnable异步回调任务,在Callable异步任务完成后,回调FutureCallback进行善后处理。

注意,ListenableFuture的addListener方法只在Guava内部调用,如果对它感兴趣,可以查看Guava源代码。在实际编程中,我们不会调用addListener。

在实际编程中,我们可以使用Guava的 Futures 工具类,将FutureCallback回调逻辑绑定到异步的ListenableFuture任务上:

    Futures.addCallback(listenableFuture, new FutureCallback<Boolean>() {
        public void onSuccess(Boolean r) {
            // listenableFuture内部的Callable 成功时回调此方法
        }
        public void onFailure(Throwable t) {
            // listenableFuture内部的Callable异常时回调此方法
        }
    });

2.4 Guava线程池

还剩下一个问题,我们如何创建ListenableFuture实例呢?事实上,Google Guava提供了自己的线程池,往里面提交Callable任务,就会返回ListenableFuture实例。Guava线程池,本质是对Java线程池的一种装饰,创建Guava线程池的方法如下:

    // 1.首先创建Java线程池
    ExecutorService jPool= Executors.newFixedThreadPool(10);
    // 2.构造一个Guava线程池
    ListeningExecutorService gPool= MoreExecutors.listeningDecorator(jPool);

有了Guava的线程池之后,就可以通过submit方法来提交任务了。任务提交之后的返回结果,就是我们所要的ListenableFuture异步任务实例了:

    // 调用submit方法来提交任务,返回异步任务实例
    ListenableFuture<Boolean> hFuture = gPool.submit(hJob);

2.5 使用示例

最后,我们通过Google Guava来完成第一节中的异步任务示例:

    public class GuavaFutureDemo {
    
        public static void main(String args[]) {
            // 创建一个泡茶主线程
            MainJob mainJob = new MainJob();
            Thread mainThread = new Thread(mainJob);
            mainThread.setName("主线程");
            mainThread.start();
    
            // 烧水的业务逻辑实例
            Callable<Boolean> hotJob = new HotWarterJob();
            Callable<Boolean> washJob = new WashJob();
            ExecutorService jPool = Executors.newFixedThreadPool(10);
            ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);
    
            // 烧水异步任务
            ListenableFuture<Boolean> hotFuture = gPool.submit(hotJob);
            Futures.addCallback(hotFuture, new FutureCallback<Boolean>() {
                public void onSuccess(Boolean r) {
                    if (r) {
                        mainJob.warterOk = true;
                    }
                }
                public void onFailure(Throwable t) {
                    Logger.info("烧水失败,没有茶喝了");
                }
            });
    
            // 洗茶具异步任务
            ListenableFuture<Boolean> washFuture = gPool.submit(washJob);
            Futures.addCallback(washFuture, new FutureCallback<Boolean>() {
                public void onSuccess(Boolean r) {
                    if (r) {
                        mainJob.cupOk = true;
                    }
                }
                public void onFailure(Throwable t) {
                    Logger.info("杯子洗不了,没有茶喝了");
                }
            });
        }
    
        // 泡茶喝主线程类
        static class MainJob implements Runnable {
            boolean warterOk = false;
            boolean cupOk = false;
            @Override
            public void run() {
                while (true) {
                    try {
                        Thread.sleep(50);
                        Logger.info("读书中......");
                    } catch (InterruptedException e) {
                        Logger.info(getCurThreadName() + "发生异常被中断.");
                    }
                    if (warterOk && cupOk) {
                        drinkTea(warterOk, cupOk);
                    }
                }
            }
    
            public void drinkTea(Boolean wOk, Boolean cOK) {
                if (wOk && cOK) {
                    Logger.info("泡茶喝,茶喝完");
                } else if (!wOk) {
                    Logger.info("烧水失败,没有茶喝了");
                } else if (!cOK) {
                    Logger.info("杯子洗不了,没有茶喝了");
                }
            }
        }
    }

通过上述的示例,可以看到,Guava异步回调和Java的FutureTask异步回调,本质的不同在于:

  • Guava是 非阻塞 的异步回调,调用线程是不阻塞的,可以继续执行自己的业务逻辑;
  • FutureTask是 阻塞 的异步回调,调用线程是阻塞的,在获取异步结果的过程中,一直阻塞,等待异步线程返回结果。

三、Netty回调模式

Netty官方文档中指出Netty的网络操作都是异步的。在Netty源代码中,大量使用了异步回调处理模式。在Netty的业务开发层面,Netty应用的Handler处理器中的业务处理代码,也都是异步执行的。所以,了解Netty的异步回调,无论是Netty应用级的开发还是源代码级的开发,都是十分重要的。

Netty和Guava一样,实现了自己的异步回调体系:Netty继承和扩展了JDK Future的API,定义了自身的Future系列接口和类,实现了异步任务的监控、异步执行结果的获取。总体来说,Netty对Java Future异步任务的扩展如下:

  1. 继承Java的Future接口,得到了一个新的属于Netty自己的Future异步任务接口。该接口对原有的接口进行了增强,使得Netty异步任务能够以非阻塞的方式处理回调的结果;
  2. 引入了一个新接口——GenericFutureListener,表示异步执行完成的监听器。这个接口和Guava的FutureCallbak回调接口不同。Netty使用了监听器模式,将异步任务执行完成后的回调逻辑抽象成了Listener监听器接口;
  3. Netty的GenericFutureListener监听器接口实现类,可以加入Netty异步任务Future中,实现对异步任务执行状态的事件监听。

总体上说,在异步非阻塞回调的设计思路上,Netty和Guava的思路是一致的。对应关系为:

  • Netty的Future接口,可以对应到Guava的ListenableFuture接口;
  • Netty的GenericFutureListener接口,可以对应到Guava的FutureCallback接口。

3.1 GenericFutureListener

GenericFutureListener接口,用来封装异步回调的逻辑:

    package io.netty.util.concurrent;
    import java.util.EventListener;
    
    public interface GenericFutureListener<F extends Future<?>> extends EventListener {
        //监听器的回调方法
        void operationComplete(F var1) throws Exception;
    }

在大多数情况下,Netty的异步回调的代码编写在GenericFutureListener.operationComplete()方法中,operationComplete方法会在Future异步任务执行完成后回调。

3.2 Future接口

Netty自定义了自己的Future接口,位于io.netty.util.concurrent包中:

    public interface Future<V> extendsjava.util.concurrent.Future<V> {
        // 判断是否执行成功
        boolean isSuccess();
        // 判断是否取消
        boolean isCancellable(); 
        // 获取异步任务异常的原因
        Throwable cause();
        // 增加监听器Listener
        Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
        // 移除监听器Listener
        Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    }

实际使用时,一般使用Future的子接口,代表了不同类型的异步任务,比如常用的ChannelFuture接口。

3.3 ChannelFuture

在Netty的网络编程中,网络连接通道的输入和输出处理都是异步进行的,都会返回一个ChannelFuture接口的实例。通过返回的Future实例,可以为它增加异步回调的监听器,在异步任务真正完成后,执行回调:

    // connect是异步的,仅提交异步任务
    ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com",80));
    // 添加监听器
    future.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if(channelFuture.isSuccess()){
                System.out.println("Connection established");
            } else {
                System.err.println("Connection attempt failed");
                channelFuture.cause().printStackTrace();
            }
        }
    });

上述的ChannelFutureListener是GenericFutureListener的子接口。

3.4 使用实例

Netty的出站和入站操作都是异步的。异步回调的方法,和上面Netty建立连接的异步回调是一样的。以最为经典的NIO出站操作——write为例,说明一下ChannelFuture的使用。

在调用write操作后,Netty并没有完成对Java NIO底层连接的写入操作,因为是异步执行的:

    // write方法,返回一个异步任务
    ChannelFuture future = ctx.channel().write(msg);
    // 为异步任务加上监听器
    future.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            // write操作完成后的回调代码
        }
    });

四、总结

本章,我对Java编程中的异步回调模式进行了讲解,主要对Java原生异步模式,Guaua异步模式,Netty异步模式进行了介绍和比较,读者需要根据实际的业务场景来选择合适的模式。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文