多线程之CompletionService

当向Executor提交多个任务并且希望获得它们在完成之后的结果,如果用FutureTask,可以循环获取task,并调用get方法去获取task执行结果,但是如果task还未完成,获取结果的线程将阻塞直到task完成,由于不知道哪个task优先执行完毕,使用这种方式效率不会很高。在jdk5时候提出接口CompletionService,它整合了Executor和BlockingQueue的功能,可以更加方便在多个任务执行时获取到任务执行结果。

现实中的需求

完成某项任务需要满足10个前提。如果任何一个前提不满足。马上终止任务。10个前提分别用10个线程执行。

  • 使用FutureTask,效率肯定不会最高,最差的情况下不满足的前提被排在了最后。那只能等所有的线程出结果了。才能完成任务。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    int POOL_SIZE = 10;
    ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
    List<Future> tasks =new ArrayList<>();
    // 向里面扔任务
    for (int i = 0; i < POOL_SIZE; i++) {
    tasks.add(pool.submit(new Thread("Thread" + i)));
    }
    // 检查线程池任务执行结果
    tasks.forEach(task->task.get());
  • CompletionService。按前提执行结束的先后顺序拿到结果。如果不满足的前提早早的结束了。就可以不用等其它线程了。直接结束其它线程,完成任务。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
     int POOL_SIZE = 10;
    ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
    CompletionService<String> completionService = new ExecutorCompletionService<>(pool);
    List<Future> tasks =new ArrayList<>();
    // 向里面扔任务
    for (int i = 0; i < POOL_SIZE; i++) {
    completionService.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {
    return "Thread";
    }
    });
    }
    // 任务数量必须是已知的
    for (int i = 0; i < POOL_SIZE; i++) {
    completionService.take().get();
    }

总结

  • FutureTask,自己创建一个集合来保存Future存根并循环调用其返回结果的时候,主线程并不能保证首先获得的是最先完成任务的线程返回值。它只是按加入线程池的顺序返回。因为take方法是阻塞方法,后面的任务完成了,前面的任务却没有完成,主程序就那样等待在那儿,只到前面的完成了,它才知道原来后面的也完成了。

  • CompletionService,使用CompletionService来维护处理线程不的返回结果时,主线程总是能够拿到最先完成的任务的返回值,而不管它们加入线程池的顺序。