并发:功能强大而简单的抽象,让编写正确的并发代码更加容易。
ListenableFuture:完成后回调的Future
Service:启动和关闭的服务,为你处理复杂的状态逻辑。
1.ListenableFuture
并发是一个困难的问题,但是通过使用功能强大且简单的抽象可以大大简化并发。为了简化问题,Guava使用ListenableFuture
扩展了JDK的Future
接口。
我们强烈建议你在所有代码中始终使用ListenableFuture
而不是Future
,因为:
- 大多数
Futures
方法都需要它。 - 比以后更改为
ListenableFuture
更容易。 - 工具方法的提供者无需提供其方法的
Future
和ListenableFuture
变体。
1.1接口
传统的Future
表示异步计算的结果:可能已经或可能尚未完成产生结果的计算。Future
可以作为正在进行的计算的句柄,是服务向我们提供结果的承诺。
ListenableFuture
允许你在计算完成后或在计算已经完成时立即注册要执行的回调。这个简单的附加功能使它可以有效地支持基本Future
接口无法支持的许多操作。
ListenableFuture
添加的基本操作是addListener(Runnable, Executor)
,它指定当此Future
表示的计算完成时,指定的Runnable
将在指定的Executor
上运行。
1.2添加回调
大多数用户更喜欢使用Futures.addCallback(ListenableFuture, FutureCallback, Executor)
。FutureCallback
实现两种方法:
onSuccess(V)
,如果Future成功,则根据其结果执行的操作onFailure(Throwable)
,如果Future失败,则根据失败执行的操作
1.3创建
对应于JDK的ExecutorService.submit(Callable)
方法来启动异步计算,Guava提供了ListeningExecutorService
接口,该接口在ExecutorService
返回正常Future
的任何地方都返回ListenableFuture
。要将ExecutorService
转换为ListeningExecutorService
,只需使用MoreExecutors.listeningDecorator(ExecutorService)
。
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(
new Callable<Explosion>() {
public Explosion call() {
return pushBigRedButton();
}
});
Futures.addCallback(
explosion,
new FutureCallback<Explosion>() {
// we want this handler to run immediately after we push the big red button!
public void onSuccess(Explosion explosion) {
walkAwayFrom(explosion);
}
public void onFailure(Throwable thrown) {
battleArchNemesis(); // escaped the explosion!
}
},
service);
另外,如果你要从基于FutureTask
的API进行转换,则Guava提供了ListenableFutureTask.create(Callable)
和ListenableFutureTask.create(Runnable, V)
。与JDK不同,ListenableFutureTask
不能直接扩展。
如果你更喜欢抽象的方式设置future值,而不是实现一种计算该值的方法,请考虑扩展AbstractFuture
或直接使用SettableFuture
。
如果必须将另一个API提供的Future
转换为ListenableFuture
,则别无选择,只能使用重量级的JdkFutureAdapters.listenInPoolThread(Future)
将Future
转换为ListenableFuture
。只要有可能,最好修改原始代码以返回ListenableFuture
。
1.4应用
使用ListenableFuture
的最重要原因是可以拥有复杂的异步操作链。
ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
AsyncFunction<RowKey, QueryResult> queryFunction =
new AsyncFunction<RowKey, QueryResult>() {
public ListenableFuture<QueryResult> apply(RowKey rowKey) {
return dataService.read(rowKey);
}
};
ListenableFuture<QueryResult> queryFuture =
Futures.transformAsync(rowKeyFuture, queryFunction, queryExecutor);
ListenableFuture
可以有效地支持许多其他操作,而单独的Future
不能支持。不同的执行者可以执行不同的操作,并且单个ListenableFuture
可以有多个操作在等待它。
当多个操作应该在另一个操作启动时立即开始时——“扇出”——ListenableFuture
只起作用:它触发所有请求的回调。稍微多做一些工作,我们可以“扇入”或触发一个ListenableFuture
,以便在其他几个future都完成后立即进行计算:有关示例,请参见Futures.allAsList
的实现。
方法 | 描述 | 参见 |
---|---|---|
transformAsync(ListenableFuture,AsyncFunction,Executor)* | 返回一个新的ListenableFuture,其结果是将给定的AsyncFunction应用于给定ListenableFuture的结果的产物。 | transformAsync(ListenableFuture,AsyncFunction) |
transform(ListenableFuture,Function,Executor) | 返回一个新的ListenableFuture,其结果是将给定的Function应用于给定ListenableFuture的结果的产物。 | transform(ListenableFuture,Function) |
allAsList(Iterable>) | 返回一个ListenableFuture,其值是按顺序包含每个输入future的值的列表。如果任何一个输入future失败或被取消,则该future失败或被取消。 | allAsList(ListenableFuture...) |
successfulAsList(Iterable>) | 返回一个ListenableFuture,其值是按顺序包含每个成功输入future的值的列表。与失败或取消的future相对应的值将替换为null。 | successfulAsList(ListenableFuture...) |
*
AsyncFunction<A, B>
提供一个方法ListenableFuture<B> apply(A input)
。它可以用于异步转换值。
List<ListenableFuture<QueryResult>> queries;
// The queries go to all different data centers, but we want to wait until they're all done or failed.
ListenableFuture<List<QueryResult>> successfulQueries = Futures.successfulAsList(queries);
Futures.addCallback(successfulQueries, callbackOnSuccessfulQueries);
1.5避免嵌套Future
在代码调用通用接口并返回Future
的情况下,可能会以嵌套的Future
s结尾。例如:
executorService.submit(new Callable<ListenableFuture<Foo>() {
@Override
public ListenableFuture<Foo> call() {
return otherExecutorService.submit(otherCallable);
}
});
将返回一个ListenableFuture<ListenableFuture<Foo>>
。这段代码是不正确的,因为如果外部future的取消与外部future的完成进行竞争,则该取消将不会传播到内部future。使用get()
或监听器检查另一个future是否失败也是常见的错误,但是除非特别小心,否则从otherCallable
抛出的异常将被抑制。为了避免这种情况,Guava的所有future处理方法(以及JDK中的某些方法)都具有*Async
版本,可以安全地解开此嵌套——transform(ListenableFuture, Function, Executor)
、transformAsync(ListenableFuture, AsyncFunction, Executor)
、ExecutorService.submit(Callable)
和submitAsync(AsyncCallable, Executor)
等等。
2.Service
Guava Service
接口表示一个具有操作状态的对象,并带有启动和停止的方法。例如,Web服务器,RPC服务器和计时器可以实现Service
接口。管理这些服务的状态(需要适当的启动和关闭管理)并非易事,特别是在涉及多线程或日程调度schedule的情况下。Guava提供了一些框架来为你管理状态逻辑和同步细节。
2.1使用Service
服务Service
的正常生命周期是
Service.State.NEW
到Service.State.STARTING
到Service.State.RUNNING
到Service.State.STOPPING
到Service.State.TERMINATED
已停止的服务无法重新启动。如果服务在启动、运行或停止的地方失败,它将进入Service.State.FAILED
状态。
如果服务是NEW
,则可以使用startAsync()
异步启动服务。因此,你应该将应用程序结构化为在每个服务启动时都有唯一的位置(统一)。
使用异步stopAsync()
方法来停止服务也是类似的。但是与startAsync()
不同,多次调用此方法是安全的。这使得处理关闭服务时可能发生的竞争成为可能。
服务还提供了几种方法来等待服务转换完成。
- 异步使用
addListener()
。addListener()
允许你添加一个Service.Listener
,它将在服务的每个状态转换时调用。注意:如果在添加监听器时服务不是NEW
新建的,那么任何已经发生的状态转换都不会在监听器上重新触发。 - 同步使用
awaitRunning()
。这是不中断的,不会抛出已检查的异常,并在服务启动完成后返回。如果服务启动失败,则会抛出IllegalStateException
。同样,awaitTerminated()
等待服务达到终端状态(TERMINATED
或FAILED
)。两种方法都具有重载的允许指定超时时间。
Service
接口是微妙而复杂的。我们不建议直接实现它。相反,请使用guava中的抽象基类之一作为实现的基础。每个基类都支持特定的线程模型。
2.2实现
2.2.1AbstractIdleService
AbstractIdleService
框架实现了Service
,该服务在处于“运行”状态时不需要执行任何操作——因此在运行时不需要线程——但具有要执行的启动和关闭操作。实现这样的服务与扩展AbstractIdleService
以及实现startUp()
和shutDown()
方法一样容易。
protected void startUp() {
servlets.add(new GcStatsServlet());
}
protected void shutDown() {}
请注意,对GcStatsServlet
的任何查询都已经有一个在运行的线程。在服务运行时,我们不需要该服务自行执行任何操作。
2.2.2AbstractExecutionThreadService
AbstractExecutionThreadService
在单个线程中执行启动、运行和关闭操作。你必须重写run()
方法,并且它必须响应停止请求。例如,你可以在工作循环中执行操作:
public void run() {
while (isRunning()) {
// perform a unit of work
}
}
或者,你可以以任何方式重写,从而使run()
返回。
重写startUp()
和shutDown()
是可选的,但是将为你管理服务状态。
protected void startUp() {
dispatcher.listenForConnections(port, queue);
}
protected void run() {
Connection connection;
while ((connection = queue.take() != POISON)) {
process(connection);
}
}
protected void triggerShutdown() {
dispatcher.stopListeningForConnections(queue);
queue.put(POISON);
}
请注意,start()
调用你的startUp()
方法,为你创建一个线程,并在该线程中调用run()
。stop()
调用triggerShutdown()
方法并等待线程死亡。
2.2.3AbstractScheduledService
AbstractScheduledService
在运行时执行一些周期性任务。子类实现runOneIteration()
来指定任务的一次迭代,以及熟悉的startUp()
和shutDown()
方法。
要描述执行日程调度schedule,你必须实现scheduler()
方法。通常,你将使用AbstractScheduledService.Scheduler
提供的日程schedule之一,newFixedRateSchedule(initialDelay, delay, TimeUnit)
或newFixedDelaySchedule(initialDelay, delay, TimeUnit)
,与ScheduledExecutorService
中熟悉的方法相对应。可以使用CustomScheduler
来实现自定义日程调度schedule;有关详细信息,请参见Javadoc。
2.2.4AbstractService
当你需要执行自己的手动线程管理时,请直接重写AbstractService
。通常,上述实现之一应该可以为你提供良好的服务,但是当你在建模某种提供自己的线程语义作为Service
时,建议你实现AbstractService
,因为你有自己特定的线程需求。
要实现AbstractService
,必须实现2个方法。
doStart()
:doStart()
是第一次调用startAsync()
直接调用的,你的doStart()
方法应执行所有的初始化,如果启动成功,则最终调用notifyStarted()
,如果启动失败,则最终调用notifyFailed()
。doStop()
:doStop()
是由第一次调用stopAsync()
直接调用的,你的doStop()
方法应关闭服务,如果关闭成功,则最终调用notifyStopped()
,如果关闭失败,则最终调用notifyFailed()
。
你的doStart
和doStop
方法应该是快速的。如果你需要进行昂贵的初始化,例如读取文件、打开网络连接或任何可能阻塞的操作,则应考虑将该工作移至另一个线程。
2.3使用ServiceManager
除了Service
框架实现之外,Guava还提供了ServiceManager
类,它使涉及多个服务实现的某些操作更加容易。使用Services
集合创建一个新的ServiceManager
。然后,你可以管理它们:
startAsync()
将启动管理下的所有服务。与Service#startAsync()
类似,如果所有服务都是NEW
,则只能调用此方法一次。stopAsync()
将停止管理下的所有服务。addListener
将添加一个ServiceManager.Listener
,它将在主要状态转换时调用。awaitHealthy()
将等待所有服务达到RUNNING
状态。awaitStopped()
将等待所有服务达到终端状态。
或检查它们:
- 如果所有服务都是
RUNNING
,则isHealthy()
返回true。 servicesByState()
返回按状态索引的所有服务的一致快照。startupTimes()
返回管理下的Service
到该服务启动所需的时间(以毫秒为单位)的映射。返回的映射保证按启动时间排序。
虽然建议通过ServiceManager
管理服务生命周期,但是通过其他机制启动的状态转换 不会影响其方法的正确性 。例如,如果服务是由startAsync()
之外的某种机制启动的,则监听器将在适当的时候被调用,而awaitHealthy()
仍将按预期工作。ServiceManager
强制执行的唯一要求是,在构造ServiceManager
时,所有Service
都必须是NEW
。
本文参考:
ListenableFutureExplained
ServiceExplained
guava-tests-concurrent