我正在用java做一些多线程工作,我有一个关于 future 的问题get方法。

我有一个ExecutorSevice它提交 n Callable。提交返回 Future它被添加到列表中(稍后在一些代码中)。可调用的 call方法循环遍历列表,并对列表中的每个项目 someMethod被调用并返回 CompletableFuture<T> 。然后,Completable future 被添加到 map 中。

循环结束call返回包含一堆 CompletableFuture 的映射。

因此,初始列表 ( uglyList ) 包含 n 个 <String, CompletableFuture<String>> 的映射。

我的问题是,当我调用get时在 uglyList 的元素上执行将被阻止,直到整个列表(作为参数传递给 Callable)被访问并且所有 CompletableFutures已经插入到 map 中了,对吧?

因为,我有一个疑问,有没有可能get在此示例中,还等待 CompletableFutures 的完成在 map 上?我不知道如何测试这个

 
public class A { 
 
    private class CallableC implements Callable<Map<String, CompletableFuture<String>>> { 
        private List<String> listString; 
 
        Map<String, CompletableFuture<String>> futureMap; 
 
        public CallableC(List<String> listString) throws Exception {         
            this.listString = listString; 
            this.futureMap = new HashMap<>(); 
        } 
 
        @Override 
        public Map<String, CompletableFuture<String>> call() throws Exception { 
 
            for (int j = 0; j < listString.size(); ++j) { 
                CompletableFuture<String> future = someMethod();         
                futureMap.put(listString.get(i), future); 
            } 
 
            return futureMap; 
        } 
    } 
 
 
    public void multiThreadMethod(List<String> items) throws Exception { 
 
        List<Future<Map<String, CompletableFuture<String>>>> uglyList = new ArrayList<>(); 
 
        int coresNumber = (Runtime.getRuntime().availableProcessors()); 
 
                // This method split a List in n subList 
        List<List<String>> splittedList = split(items, coresNumber); 
 
        ExecutorService executor = Executors.newFixedThreadPool(coresNumber); 
 
        try { 
 
            for (int i = 0; i < coresNumber; ++i) { 
                threads.add(executor.submit(new CallableC(splittedList.get(i),))); 
            } 
 
            for (int i = 0; i < coresNumber; ++i) { 
                uglyList.get(i).get(); //here 
            } 
 
 
        } catch (Exception e) { 
                     // YAY errors 
        } 
        finally { 
            executor.shutdown(); 
        } 
    } 
} 
 

请您参考如下方法:

when i call get on a element of uglyList the execution is blocked until the entire list (passed as parameter to the Callable) is visited and all CompletableFutures have been inserted in the map, right?

是的,这是正确的。 get() 将等待 Callable 的完成,在您的情况下,这意味着 CallableC#call() 方法的结束,当 CompletableFutures 全部创建并插入到 futureMap 中时,将在 for 循环结束时到达该方法,无论这些 CompletableFuture 的状态/完成情况如何。

it is possible that the get in this example, also waits the completion of the CompletableFutures in the map?

可以实现这一点,但需要额外的代码。

在代码中执行此操作的最低方法是在任务内的 CompletableFuture 上添加 join 调用。

@Override 
    public Map<String, CompletableFuture<String>> call() throws Exception { 
 
        for (int j = 0; j < listString.size(); ++j) { 
            CompletableFuture<String> future = someMethod();         
            futureMap.put(listString.get(j), future); 
        } 
        // This will wait for all the futures inside the map to terminate 
        try { 
            CompletableFuture.allOf(futureMap.values().toArray(new CompletableFuture[0])).join(); 
        } catch (CompletionException e) { 
            // ignored - the status will be checked later on 
        } 
 
        return futureMap; 
    } 

另一种方法是在最后等待(为了简洁起见,我跳过异常处理):

for (int i = 0; i < coresNumber; ++i) { 
    threads.add(executor.submit(new CallableC(splittedList.get(i),))); 
} 
// This will wait for all the futures to terminate (thanks to f.get()) 
// And then, all the completable futures will be added to an array of futures that will also be waited upon 
CompletableFuture.allOf( 
    uglyList.stream().flatMap(f -> f.get().values().stream()).toArray(CompletableFuture[]::new) 
).join(); 

但是如果对你来说可能的话,我会仅使用可完成的 future 来简化整个代码,除非你有充分的理由以你的方式中断任务(使用列表列表、专用执行器等......)。这可能看起来像这样(可能有一些错别字)

List<String> items = ... 
ConcurrentMap<String, String> resultsByItem = new ConcurrentHashMap<>(); 
 
Future<Void> allWork = CompletableFuture.allOf( 
    items.stream() 
        .map(item -> someWork(item) // Get a completable future for this item 
                     .andThen(result -> resultsByItem.put(item, result)) // Append the result to the map 
        ).toArray(CompletableFuture[]::new) 
); 
allWork.join(); 
// Your futureMap is completed. 

这将替换问题中的整个代码。


评论关闭
IT源码网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!