当前位置:首页 » 《随便一记》 » 正文

[Java 并发基础]多线程编程

15 人参与  2024年02月08日 18:01  分类 : 《随便一记》  评论

点击全文阅读


文章参考:
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html
https://juejin.cn/post/6970558076642394142

文章目录

线程的创建方式继承 `Thread`实现 `Runnable` 接口实现 `Callable` 接口使用 `Lambda`使用线程池 线程创建相关的 `jdk`源码`Thread`类`Runnable`函数接口`Callable<V>`函数接口`executors` 线程扩展知识`future`_Method Summary_简单示例代码 `CompletableFuture``All MethodsStatic MethodsInstance MethodsConcrete Methods`简单示例代码 CompletableFuture使用场景创建异步任务supplyAsync方法runAsync方法 任务异步回调1. thenRun/thenRunAsync2.thenAccept/thenAcceptAsync3. thenApply/thenApplyAsync4. exceptionally5. whenComplete方法6. handle方法 多个任务组合处理AND组合关系OR 组合的关系 AllOfAnyOfthenCompose CompletableFuture使用有哪些注意点1. Future需要获取返回值,才能获取异常信息2. CompletableFuture的get()方法是阻塞的。3. 默认线程池的注意点4. 自定义线程池时,注意饱和策略

线程的创建方式

继承 Thread

创建一个继承 Thread 类的子类。重写 Thread 类的 run() 方法。在 run() 方法中编写线程要执行的任务。创建 Thread 子类的对象。调用 Thread 子类对象的 start() 方法来启动线程。
public class Demo {    static class MyThread extends Thread {        @Override        public void run() {            System.out.println(">>>> run ");        }    }    public static void main(String[] args) {        new MyThread().start();            }}

实现 Runnable 接口

创建一个实现 Runnable 接口的类。在 Runnable 接口的 run() 方法中编写线程要执行的任务。创建 Runnable 接口的实现类的对象。将 Runnable 接口的实现类的对象传递给 Thread 类的构造方法来创建 Thread 对象。调用 Thread 对象的 start() 方法来启动线程。
package org.example.create;public class Demo1 {    static class MyThread1 implements Runnable {        @Override        public void run() {            System.out.println(">>>>> 2. 实现Runnable接口");        }    }    public static void main(String[] args) throws Exception {        new MyThread1().run();        System.out.println("end");    }}

实现 Callable 接口

package org.example.create;import java.util.concurrent.Callable;public class Demo2 {    static class MyThread implements Callable<Void> {        @Override        public Void call() throws Exception {            System.out.println("3. 实现 Callable ");            return null;        }    }    public static void main(String[] args) throws Exception {        new MyThread().call();        System.out.println("end");    }}

使用 Lambda

package org.example.create;public class Demo3 {    public static void main(String[] args) throws Exception {        new Thread(()->{            System.out.println("4. 使用 lambda 表达式");        }).run();        System.out.println("end");    }}

使用线程池

package org.example.create;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class Demo4 {    public static void main(String[] args) {        ExecutorService executor = Executors.newFixedThreadPool(5);        // 创建Runnable对象        Runnable runnable = () -> {            // 线程的执行逻辑            System.out.println(Thread.currentThread().getId() + "线程执行逻辑");        };        // 提交任务给线程池        for (int i = 0; i < 50; i++) {            executor.submit(runnable);        }        // 关闭线程池        executor.shutdown();    }}

线程创建相关的 jdk源码

Thread

package java.lang;public class Thread implements Runnable {    /* Make sure registerNatives is the first thing <clinit> does. */    private static native void registerNatives();    static {        registerNatives();    }}

Runnable函数接口

package java.lang;/**The Runnable interface should be implemented by any class whose instances are intended to be executed by a thread. The class must define a method of no arguments called run.This interface is designed to provide a common protocol for objects that wish to execute code while they are active. For example, Runnable is implemented by class Thread. Being active simply means that a thread has been started and has not yet been stopped.In addition, Runnable provides the means for a class to be active while not subclassing Thread. A class that implements Runnable can run without subclassing Thread by instantiating a Thread instance and passing itself in as the target. In most cases, the Runnable interface should be used if you are only planning to override the run() method and no other Thread methods. This is important because classes should not be subclassed unless the programmer intends on modifying or enhancing the fundamental behavior of the class.Since:1.0See Also:Thread, java.util.concurrent.Callable**/@FunctionalInterfacepublic interface Runnable {    /**     * When an object implementing interface {@code Runnable} is used     * to create a thread, starting the thread causes the object's     * {@code run} method to be called in that separately executing     * thread.     * <p>     * The general contract of the method {@code run} is that it may     * take any action whatsoever.     *     * @see     java.lang.Thread#run()     */    public abstract void run();}

Callable<V>函数接口

package java.util.concurrent;/**A task that returns a result and may throw an exception. Implementors define a single method with no arguments called call.The Callable interface is similar to Runnable, in that both are designed for classes whose instances are potentially executed by another thread. A Runnable, however, does not return a result and cannot throw a checked exception.The Executors class contains utility methods to convert from other common forms to Callable classes.Since:1.5See Also:ExecutorAuthor:Doug LeaType parameters:<V> – the result type of method call**/@FunctionalInterfacepublic interface Callable<V> {    /**     * Computes a result, or throws an exception if unable to do so.     *     * @return computed result     * @throws Exception if unable to compute a result     */    V call() throws Exception;}

executors

package java.util.concurrent;/**Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. This class supports the following kinds of methods:Methods that create and return an ExecutorService set up with commonly useful configuration settings.Methods that create and return a ScheduledExecutorService set up with commonly useful configuration settings.Methods that create and return a "wrapped" ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.Methods that create and return a ThreadFactory that sets newly created threads to a known state.Methods that create and return a Callable out of other closure-like forms, so they can be used in execution methods requiring Callable.Since:1.5Author:Doug Lea**/public class Executors {}

线程扩展知识

future

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html

java.util.concurrentInterface Future<V>● Type Parameters:V - The result type returned by this Future's get methodAll Known Subinterfaces:  ○ Response<T>,   ○ RunnableFuture<V>,   ○ RunnableScheduledFuture<V>,   ○ ScheduledFuture<V>● All Known Implementing Classes:CompletableFuture, CountedCompleter, ForkJoinTask, FutureTask, RecursiveAction, RecursiveTask, SwingWorker

public interface Future<V>

A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready. Cancellation is performed by the cancel method. Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled. If you would like to use a Future for the sake of cancellability but not provide a usable result, you can declare types of the form Future<?> and return null as a result of the underlying task.
Sample Usage (Note that the following classes are all made-up.)

interface ArchiveSearcher { String search(String target); }class App {    ExecutorService executor = ...    ArchiveSearcher searcher = ...    void showSearch(final String target)    throws InterruptedException {        Future<String> future        = executor.submit(new Callable<String>() {            public String call() {                return searcher.search(target);            }});        displayOtherThings(); // do other things while searching        try {            displayText(future.get()); // use future        } catch (ExecutionException ex) { cleanup(); return; }    }}

The FutureTask class is an implementation of Future that implements Runnable, and so may be executed by an Executor. For example, the above construction with submit could be replaced by:

FutureTask<String> future =new FutureTask<String>(new Callable<String>() {    public String call() {        return searcher.search(target);    }});executor.execute(future);

Memory consistency effects: Actions taken by the asynchronous computation happen-before actions following the corresponding Future.get() in another thread.

内存一致性影响:异步计算所采取的操作发生在另一个线程中对应的Future.get()之后的操作之前。

Method Summary

Modifier and TypeMethod and Description
booleancancel(boolean mayInterruptIfRunning)
Attempts to cancel execution of this task.
Vget()
Waits if necessary for the computation to complete, and then retrieves its result.
Vget(long timeout, TimeUnit unit)
Waits if necessary for at most the given time for the computation to complete, and then retrieves its result, if available.
booleanisCancelled()
Returns true if this task was cancelled before it completed normally.
booleanisDone()
Returns true if this task completed.

简单示例代码

package org.example.future;import org.example.service.MedalService;import org.example.service.UserInfoService;import java.util.concurrent.*;public class FutureTest {    public static void main(String[] args) throws ExecutionException, InterruptedException {        ExecutorService executorService = Executors.newFixedThreadPool(10);        UserInfoService userInfoService = new UserInfoService();        MedalService medalService = new MedalService();        long userId = 666L;        long startTime = System.currentTimeMillis();        //调用用户服务获取用户基本信息        FutureTask<UserInfoService.UserInfo> userInfoFutureTask = new FutureTask<>(new Callable<UserInfoService.UserInfo>() {            @Override            public UserInfoService.UserInfo call() throws Exception {                return userInfoService.getUserInfo(userId);            }        });        executorService.submit(userInfoFutureTask);        Thread.sleep(300); //模拟主线程其它操作耗时        FutureTask<MedalService.MedalInfo> medalInfoFutureTask = new FutureTask<>(new Callable<MedalService.MedalInfo>() {            @Override            public MedalService.MedalInfo call() throws Exception {                return medalService.getMedalInfo(userId);            }        });        executorService.submit(medalInfoFutureTask);        UserInfoService.UserInfo userInfo = userInfoFutureTask.get();//获取个人信息结果        MedalService.MedalInfo medalInfo = medalInfoFutureTask.get();//获取勋章信息结果        System.out.println(userInfo);        System.out.println(medalInfo);        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");        executorService.shutdown();    }}

CompletableFuture

异步编程利器:CompletableFuture详解 |Java 开发实战 - 掘金
image.png

java.lang.Objectjava.util.concurrent.CompletableFuture<T>All Implemented Interfaces:CompletionStage<T>, Future<T>public class CompletableFuture<T>extends Objectimplements Future<T>, CompletionStage<T>

A Future that may be explicitly completed (setting its value and status), and may be used as a CompletionStage, supporting dependent functions and actions that trigger upon its completion.
When two or more threads attempt to complete, completeExceptionally, or cancel a CompletableFuture, only one of them succeeds.

可以显式完成的Future(设置其值和状态),可以用作CompletionStage,支持在其完成时触发的依赖函数和操作。
当两个或多个线程尝试完成、completeException或取消一个CompletableFuture时,只有一个线程成功。

In addition to these and related methods for directly manipulating status and results, CompletableFuture implements interface CompletionStage with the following policies:

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

为非异步方法的依赖完成提供的操作可以由完成当前CompletableFuture的线程执行,也可以由完成方法的任何其他调用者执行。

All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task). To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interface CompletableFuture.AsynchronousCompletionTask.

所有没有显式Executor参数的异步方法都使用 ForkJoinPool.commonPool() 执行(除非它不支持至少两个并行级别,在这种情况下,创建一个新线程来运行每个任务)。为了简化监视、调试和跟踪,所有生成的异步任务都是标记接口 CompletableFuture.AsynchronousCompletionTask 的实例。

All CompletionStage methods are implemented independently of other public methods, so the behavior of one method is not impacted by overrides of others in subclasses.

所有CompletionStage方法的实现都独立于其他公共方法,因此一个方法的行为不受子类中其他方法的覆盖的影响。

CompletableFuture also implements Future with the following policies:

Since (unlike FutureTask) this class has no direct control over the computation that causes it to be completed, cancellation is treated as just another form of exceptional completion. Method cancel has the same effect as completeExceptionally(new CancellationException()). Method isCompletedExceptionally() can be used to determine if a CompletableFuture completed in any exceptional fashion.

由于(与FutureTask不同)该类对导致其完成的计算没有直接控制,因此取消被视为另一种形式的异常完成。方法 cancelcompleteException (new CancellationException()) 具有相同的效果。方法 isCompletedException() 可用于确定 CompletableFuture 是否以任何异常方式完成。

In case of exceptional completion with a CompletionException, methods get() and get(long, TimeUnit) throw an ExecutionException with the same cause as held in the corresponding CompletionException. To simplify usage in most contexts, this class also defines methods join() and getNow(T) that instead throw the CompletionException directly in these cases.

在使用 CompletionException 异常完成的情况下,方法 get()get(long, TimeUnit) 抛出一个 ExecutionException,其原因与对应的 CompletionException 中持有的原因相同。
为了简化大多数上下文中的使用,这个类还定义了 join()getNow(T) 方法,在这些情况下直接抛出 CompletionException

All MethodsStatic MethodsInstance MethodsConcrete Methods

Modifier and TypeMethod and DescriptionCompletableFuture<Void>acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)Returns a new CompletionStage that, when either this or the other given stage complete normally, is executed with the corresponding result as argument to the supplied action.CompletableFuture<Void>acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)Returns a new CompletionStage that, when either this or the other given stage complete normally, is executed using this stage's default asynchronous execution facility, with the corresponding result as argument to the supplied action.CompletableFuture<Void>acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)Returns a new CompletionStage that, when either this or the other given stage complete normally, is executed using the supplied executor, with the corresponding result as argument to the supplied function.static CompletableFuture<Void>allOf(CompletableFuture<?>... cfs)Returns a new CompletableFuture that is completed when all of the given CompletableFutures complete.static CompletableFuture<Object>anyOf(CompletableFuture<?>... cfs)Returns a new CompletableFuture that is completed when any of the given CompletableFutures complete, with the same result.<U> CompletableFuture<U>applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)Returns a new CompletionStage that, when either this or the other given stage complete normally, is executed with the corresponding result as argument to the supplied function.<U> CompletableFuture<U>applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)Returns a new CompletionStage that, when either this or the other given stage complete normally, is executed using this stage's default asynchronous execution facility, with the corresponding result as argument to the supplied function.<U> CompletableFuture<U>applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)Returns a new CompletionStage that, when either this or the other given stage complete normally, is executed using the supplied executor, with the corresponding result as argument to the supplied function.booleancancel(boolean mayInterruptIfRunning)If not already completed, completes this CompletableFuture with a CancellationException.booleancomplete(T value)If not already completed, sets the value returned by get() and related methods to the given value.static <U> CompletableFuture<U>completedFuture(U value)Returns a new CompletableFuture that is already completed with the given value.booleancompleteExceptionally(Throwable ex)If not already completed, causes invocations of get() and related methods to throw the given exception.CompletableFuture<T>exceptionally(Function<Throwable,? extends T> fn)Returns a new CompletableFuture that is completed when this CompletableFuture completes, with the result of the given function of the exception triggering this CompletableFuture's completion when it completes exceptionally; otherwise, if this CompletableFuture completes normally, then the returned CompletableFuture also completes normally with the same value.Tget()Waits if necessary for this future to complete, and then returns its result.Tget(long timeout, TimeUnit unit)Waits if necessary for at most the given time for this future to complete, and then returns its result, if available.TgetNow(T valueIfAbsent)Returns the result value (or throws any encountered exception) if completed, else returns the given valueIfAbsent.intgetNumberOfDependents()Returns the estimated number of CompletableFutures whose completions are awaiting completion of this CompletableFuture.<U> CompletableFuture<U>handle(BiFunction<? super T,Throwable,? extends U> fn)Returns a new CompletionStage that, when this stage completes either normally or exceptionally, is executed with this stage's result and exception as arguments to the supplied function.<U> CompletableFuture<U>handleAsync(BiFunction<? super T,Throwable,? extends U> fn)Returns a new CompletionStage that, when this stage completes either normally or exceptionally, is executed using this stage's default asynchronous execution facility, with this stage's result and exception as arguments to the supplied function.<U> CompletableFuture<U>handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)Returns a new CompletionStage that, when this stage completes either normally or exceptionally, is executed using the supplied executor, with this stage's result and exception as arguments to the supplied function.booleanisCancelled()Returns true if this CompletableFuture was cancelled before it completed normally.booleanisCompletedExceptionally()Returns true if this CompletableFuture completed exceptionally, in any way.booleanisDone()Returns true if completed in any fashion: normally, exceptionally, or via cancellation.Tjoin()Returns the result value when complete, or throws an (unchecked) exception if completed exceptionally.voidobtrudeException(Throwable ex)Forcibly causes subsequent invocations of method get() and related methods to throw the given exception, whether or not already completed.voidobtrudeValue(T value)Forcibly sets or resets the value subsequently returned by method get() and related methods, whether or not already completed.CompletableFuture<Void>runAfterBoth(CompletionStage<?> other, Runnable action)Returns a new CompletionStage that, when this and the other given stage both complete normally, executes the given action.CompletableFuture<Void>runAfterBothAsync(CompletionStage<?> other, Runnable action)Returns a new CompletionStage that, when this and the other given stage complete normally, executes the given action using this stage's default asynchronous execution facility.CompletableFuture<Void>runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)Returns a new CompletionStage that, when this and the other given stage complete normally, executes the given action using the supplied executor.CompletableFuture<Void>runAfterEither(CompletionStage<?> other, Runnable action)Returns a new CompletionStage that, when either this or the other given stage complete normally, executes the given action.CompletableFuture<Void>runAfterEitherAsync(CompletionStage<?> other, Runnable action)Returns a new CompletionStage that, when either this or the other given stage complete normally, executes the given action using this stage's default asynchronous execution facility.CompletableFuture<Void>runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)Returns a new CompletionStage that, when either this or the other given stage complete normally, executes the given action using the supplied executor.static CompletableFuture<Void>runAsync(Runnable runnable)Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() after it runs the given action.static CompletableFuture<Void>runAsync(Runnable runnable, Executor executor)Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor after it runs the given action.static <U> CompletableFuture<U>supplyAsync(Supplier<U> supplier)Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier.static <U> CompletableFuture<U>supplyAsync(Supplier<U> supplier, Executor executor)Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier.CompletableFuture<Void>thenAccept(Consumer<? super T> action)Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied action.CompletableFuture<Void>thenAcceptAsync(Consumer<? super T> action)Returns a new CompletionStage that, when this stage completes normally, is executed using this stage's default asynchronous execution facility, with this stage's result as the argument to the supplied action.CompletableFuture<Void>thenAcceptAsync(Consumer<? super T> action, Executor executor)Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied action.<U> CompletableFuture<Void>thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)Returns a new CompletionStage that, when this and the other given stage both complete normally, is executed with the two results as arguments to the supplied action.<U> CompletableFuture<Void>thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using this stage's default asynchronous execution facility, with the two results as arguments to the supplied action.<U> CompletableFuture<Void>thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using the supplied executor, with the two results as arguments to the supplied function.<U> CompletableFuture<U>thenApply(Function<? super T,? extends U> fn)Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function.<U> CompletableFuture<U>thenApplyAsync(Function<? super T,? extends U> fn)Returns a new CompletionStage that, when this stage completes normally, is executed using this stage's default asynchronous execution facility, with this stage's result as the argument to the supplied function.<U> CompletableFuture<U>thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function.<U,V> CompletableFuture<V>thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)Returns a new CompletionStage that, when this and the other given stage both complete normally, is executed with the two results as arguments to the supplied function.<U,V> CompletableFuture<V>thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using this stage's default asynchronous execution facility, with the two results as arguments to the supplied function.<U,V> CompletableFuture<V>thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using the supplied executor, with the two results as arguments to the supplied function.<U> CompletableFuture<U>thenCompose(Function<? super T,? extends CompletionStage<U>> fn)Returns a new CompletionStage that, when this stage completes normally, is executed with this stage as the argument to the supplied function.<U> CompletableFuture<U>thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)Returns a new CompletionStage that, when this stage completes normally, is executed using this stage's default asynchronous execution facility, with this stage as the argument to the supplied function.<U> CompletableFuture<U>thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function.CompletableFuture<Void>thenRun(Runnable action)Returns a new CompletionStage that, when this stage completes normally, executes the given action.CompletableFuture<Void>thenRunAsync(Runnable action)Returns a new CompletionStage that, when this stage completes normally, executes the given action using this stage's default asynchronous execution facility.CompletableFuture<Void>thenRunAsync(Runnable action, Executor executor)Returns a new CompletionStage that, when this stage completes normally, executes the given action using the supplied Executor.CompletableFuture<T>toCompletableFuture()Returns this CompletableFuture.StringtoString()Returns a string identifying this CompletableFuture, as well as its completion state.CompletableFuture<T>whenComplete(BiConsumer<? super T,? super Throwable> action)Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes.CompletableFuture<T>whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)Returns a new CompletionStage with the same result or exception as this stage, that executes the given action using this stage's default asynchronous execution facility when this stage completes.CompletableFuture<T>whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)Returns a new CompletionStage with the same result or exception as this stage, that executes the given action using the supplied Executor when this stage completes.

简单示例代码

package org.example.future;import org.example.service.MedalService;import org.example.service.UserInfoService;import java.util.concurrent.*;import static org.example.service.MedalService.MedalInfo;import static org.example.service.UserInfoService.UserInfo;public class CompletableFutureTest {    public static void main(String[] args) throws ExecutionException, TimeoutException, InterruptedException {        ExecutorService executorService = Executors.newFixedThreadPool(10);        UserInfoService userInfoService = new UserInfoService();        MedalService medalService = new MedalService();        long userId = 666L;        long startTime = System.currentTimeMillis();        //调用用户服务获取用户基本信息        CompletableFuture<UserInfo> completableUserInfoFuture = CompletableFuture            .supplyAsync(() -> userInfoService.getUserInfo(userId), executorService);        Thread.sleep(300); //模拟主线程其它操作耗时        CompletableFuture<MedalInfo> completableMedalInfoFuture = CompletableFuture            .supplyAsync(() -> medalService.getMedalInfo(userId), executorService);        UserInfo userInfo = completableUserInfoFuture.get(2, TimeUnit.SECONDS);//获取个人信息结果        MedalInfo medalInfo = completableMedalInfoFuture.get();//获取勋章信息结果        System.out.println(userInfo);        System.out.println(medalInfo);        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");        executorService.shutdown();    }}

CompletableFuture使用场景

创建异步任务

CompletableFuture创建异步任务,一般有supplyAsync和runAsync两个方法

supplyAsync执行CompletableFuture任务,支持返回值runAsync执行CompletableFuture任务,没有返回值。
supplyAsync方法
//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)//自定义线程,根据supplier构建执行任务public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
runAsync方法
//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务public static CompletableFuture<Void> runAsync(Runnable runnable) //自定义线程,根据runnable构建执行任务public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)

实例代码如下:

public class FutureTest {    public static void main(String[] args) {        //可以自定义线程池        ExecutorService executor = Executors.newCachedThreadPool();        //runAsync的使用        CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("run,关注公众号:捡田螺的小男孩"), executor);        //supplyAsync的使用        CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {            System.out.print("supply,关注公众号:捡田螺的小男孩");            return "捡田螺的小男孩"; }, executor);        //runAsync的future没有返回值,输出null        System.out.println(runFuture.join());        //supplyAsync的future,有返回值        System.out.println(supplyFuture.join());        executor.shutdown(); // 线程池需要关闭    }}//输出run,关注公众号:捡田螺的小男孩nullsupply,关注公众号:捡田螺的小男孩捡田螺的小男孩

任务异步回调

1. thenRun/thenRunAsync
public CompletableFuture<Void> thenRun(Runnable action);public CompletableFuture<Void> thenRunAsync(Runnable action);

CompletableFuture的thenRun方法,通俗点讲就是,做完第一个任务后,再做第二个任务。某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值

public class FutureThenRunTest {    public static void main(String[] args) throws ExecutionException, InterruptedException {        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(            ()->{                System.out.println("先执行第一个CompletableFuture方法任务");                return "捡田螺的小男孩";            }        );        CompletableFuture thenRunFuture = orgFuture.thenRun(() -> {            System.out.println("接着执行第二个任务");        });        System.out.println(thenRunFuture.get());    }}//输出先执行第一个CompletableFuture方法任务接着执行第二个任务null

thenRun 和thenRunAsync有什么区别呢?可以看下源码哈:

java复制代码   private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();public CompletableFuture<Void> thenRun(Runnable action) {return uniRunStage(null, action);}public CompletableFuture<Void> thenRunAsync(Runnable action) {return uniRunStage(asyncPool, action);}

如果你执行第一个任务的时候,传入了一个自定义线程池:

调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池

TIPS: 后面介绍的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个哈。

2.thenAccept/thenAcceptAsync

CompletableFuture的thenAccept方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。

csharp复制代码public class FutureThenAcceptTest {    public static void main(String[] args) throws ExecutionException, InterruptedException {        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(                ()->{                    System.out.println("原始CompletableFuture方法任务");                    return "捡田螺的小男孩";                }        );        CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> {            if ("捡田螺的小男孩".equals(a)) {                System.out.println("关注了");            }            System.out.println("先考虑考虑");        });        System.out.println(thenAcceptFuture.get());    }}
3. thenApply/thenApplyAsync

CompletableFuture的thenApply方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

csharp复制代码public class FutureThenApplyTest {    public static void main(String[] args) throws ExecutionException, InterruptedException {        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(                ()->{                    System.out.println("原始CompletableFuture方法任务");                    return "捡田螺的小男孩";                }        );        CompletableFuture<String> thenApplyFuture = orgFuture.thenApply((a) -> {            if ("捡田螺的小男孩".equals(a)) {                return "关注了";            }            return "先考虑考虑";        });        System.out.println(thenApplyFuture.get());    }}//输出原始CompletableFuture方法任务关注了
4. exceptionally

CompletableFuture的exceptionally方法表示,某个任务执行异常时,执行的回调方法;并且有抛出异常作为参数,传递到回调方法。

php复制代码public class FutureExceptionTest {    public static void main(String[] args) throws ExecutionException, InterruptedException {        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(                ()->{                    System.out.println("当前线程名称:" + Thread.currentThread().getName());                    throw new RuntimeException();                }        );        CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> {            e.printStackTrace();            return "你的程序异常啦";        });        System.out.println(exceptionFuture.get());    }}//输出当前线程名称:ForkJoinPool.commonPool-worker-1java.util.concurrent.CompletionException: java.lang.RuntimeExceptionat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)Caused by: java.lang.RuntimeExceptionat cn.eovie.future.FutureWhenTest.lambda$main$0(FutureWhenTest.java:13)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)... 5 more你的程序异常啦
5. whenComplete方法

CompletableFuture的whenComplete方法表示,某个任务执行完成后,执行的回调方法,无返回值;并且whenComplete方法返回的CompletableFuture的result是上个任务的结果

csharp复制代码public class FutureWhenTest {    public static void main(String[] args) throws ExecutionException, InterruptedException {        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(                ()->{                    System.out.println("当前线程名称:" + Thread.currentThread().getName());                    try {                        Thread.sleep(2000L);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    return "捡田螺的小男孩";                }        );        CompletableFuture<String> rstFuture = orgFuture.whenComplete((a, throwable) -> {            System.out.println("当前线程名称:" + Thread.currentThread().getName());            System.out.println("上个任务执行完啦,还把" + a + "传过来");            if ("捡田螺的小男孩".equals(a)) {                System.out.println("666");            }            System.out.println("233333");        });        System.out.println(rstFuture.get());    }}//输出当前线程名称:ForkJoinPool.commonPool-worker-1当前线程名称:ForkJoinPool.commonPool-worker-1上个任务执行完啦,还把捡田螺的小男孩传过来666233333捡田螺的小男孩
6. handle方法

CompletableFuture的handle方法表示,某个任务执行完成后,执行回调方法,并且是有返回值的;并且handle方法返回的CompletableFuture的result是回调方法执行的结果。

csharp复制代码public class FutureHandlerTest {    public static void main(String[] args) throws ExecutionException, InterruptedException {        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(                ()->{                    System.out.println("当前线程名称:" + Thread.currentThread().getName());                    try {                        Thread.sleep(2000L);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    return "捡田螺的小男孩";                }        );        CompletableFuture<String> rstFuture = orgFuture.handle((a, throwable) -> {            System.out.println("上个任务执行完啦,还把" + a + "传过来");            if ("捡田螺的小男孩".equals(a)) {                System.out.println("666");                return "关注了";            }            System.out.println("233333");            return null;        });        System.out.println(rstFuture.get());    }}//输出当前线程名称:ForkJoinPool.commonPool-worker-1上个任务执行完啦,还把捡田螺的小男孩传过来666关注了

多个任务组合处理

AND组合关系


thenCombine / thenAcceptBoth / runAfterBoth都表示:将两个CompletableFuture组合起来,只有这两个都正常执行完了,才会执行某个任务
区别在于:

thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值runAfterBoth 不会把执行结果当做方法入参,且没有返回值。
arduino复制代码public class ThenCombineTest {    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {        CompletableFuture<String> first = CompletableFuture.completedFuture("第一个异步任务");        ExecutorService executor = Executors.newFixedThreadPool(10);        CompletableFuture<String> future = CompletableFuture                //第二个异步任务                .supplyAsync(() -> "第二个异步任务", executor)                // (w, s) -> System.out.println(s) 是第三个任务                .thenCombineAsync(first, (s, w) -> {                    System.out.println(w);                    System.out.println(s);                    return "两个异步任务的组合";                }, executor);        System.out.println(future.join());        executor.shutdown();    }}//输出第一个异步任务第二个异步任务两个异步任务的组合
OR 组合的关系


applyToEither / acceptEither / runAfterEither 都表示:将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务。
区别在于:

applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值runAfterEither: 不会把执行结果当做方法入参,且没有返回值。
kotlin复制代码public class AcceptEitherTest {    public static void main(String[] args) {        //第一个异步任务,休眠2秒,保证它执行晚点        CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{            try{                Thread.sleep(2000L);                System.out.println("执行完第一个异步任务");}                catch (Exception e){                    return "第一个任务异常";                }            return "第一个异步任务";        });        ExecutorService executor = Executors.newSingleThreadExecutor();        CompletableFuture<Void> future = CompletableFuture                //第二个异步任务                .supplyAsync(() -> {                            System.out.println("执行完第二个任务");                            return "第二个任务";}                , executor)                //第三个任务                .acceptEitherAsync(first, System.out::println, executor);        executor.shutdown();    }}//输出执行完第二个任务第二个任务

AllOf

所有任务都执行完成后,才执行 allOf返回的CompletableFuture。如果任意一个任务异常,allOf的CompletableFuture,执行get方法,会抛出异常

kotlin复制代码public class allOfFutureTest {    public static void main(String[] args) throws ExecutionException, InterruptedException {        CompletableFuture<Void> a = CompletableFuture.runAsync(()->{            System.out.println("我执行完了");        });        CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {            System.out.println("我也执行完了");        });        CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((m,k)->{            System.out.println("finish");        });    }}//输出我执行完了我也执行完了finish

AnyOf

任意一个任务执行完,就执行anyOf返回的CompletableFuture。如果执行的任务异常,anyOf的CompletableFuture,执行get方法,会抛出异常

csharp复制代码public class AnyOfFutureTest {    public static void main(String[] args) throws ExecutionException, InterruptedException {        CompletableFuture<Void> a = CompletableFuture.runAsync(()->{            try {                Thread.sleep(3000L);            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println("我执行完了");        });        CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {            System.out.println("我也执行完了");        });        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m,k)->{            System.out.println("finish");//            return "捡田螺的小男孩";        });        anyOfFuture.join();    }}//输出我也执行完了finish

thenCompose

thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例

如果该CompletableFuture实例的result不为null,则返回一个基于该result新的CompletableFuture实例;如果该CompletableFuture实例为null,然后就执行这个新任务
arduino复制代码public class ThenComposeTest {    public static void main(String[] args) throws ExecutionException, InterruptedException {        CompletableFuture<String> f = CompletableFuture.completedFuture("第一个任务");        //第二个异步任务        ExecutorService executor = Executors.newSingleThreadExecutor();        CompletableFuture<String> future = CompletableFuture                .supplyAsync(() -> "第二个任务", executor)                .thenComposeAsync(data -> {                    System.out.println(data); return f; //使用第一个任务作为返回                }, executor);        System.out.println(future.join());        executor.shutdown();    }}//输出第二个任务第一个任务

CompletableFuture使用有哪些注意点

CompletableFuture 使我们的异步编程更加便利的、代码更加优雅的同时,我们也要关注下它,使用的一些注意点。

1. Future需要获取返回值,才能获取异常信息

ini复制代码ExecutorService executorService = new ThreadPoolExecutor(5, 10, 5L,    TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {      int a = 0;      int b = 666;      int c = b / a;      return true;   },executorService).thenAccept(System.out::println);    //如果不加 get()方法这一行,看不到异常信息 //future.get();

Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。小伙伴们使用的时候,注意一下哈,考虑是否加try…catch…或者使用exceptionally方法。

2. CompletableFuture的get()方法是阻塞的。

CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间~

csharp复制代码//反例 CompletableFuture.get();//正例CompletableFuture.get(5, TimeUnit.SECONDS);

3. 默认线程池的注意点

CompletableFuture代码中又使用了默认的线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。

4. 自定义线程池时,注意饱和策略

CompletableFuture的get()方法是阻塞的,我们一般建议使用future.get(3, TimeUnit.SECONDS)。并且一般建议使用自定义线程池。
但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离哈。


点击全文阅读


本文链接:http://zhangshiyu.com/post/67859.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1