我正在用java做一些多线程工作,我有一个关于 future 的问题get
方法。
我有一个ExecutorSevice
它提交 n Callable。提交返回 Future
它被添加到列表中(稍后在一些代码中)。可调用的 call
方法循环遍历列表,并对列表中的每个项目 someMethod
被调用并返回 CompletableFuture<T>
。然后,Completable future 被添加到 map 中。
循环结束call
返回包含一堆 CompletableFuture 的映射。
因此,初始列表 ( uglyList
) 包含 n 个 <String, CompletableFuture<String>>
的映射。
我的问题是,当我调用get
时在 uglyList
的元素上执行将被阻止,直到整个列表(作为参数传递给 Callable)被访问并且所有 CompletableFutures
已经插入到 map 中了,对吧?
因为,我有一个疑问,有没有可能get
在此示例中,还等待 CompletableFutures
的完成在 map 上?我不知道如何测试这个
public class A {
private class CallableC implements Callable<Map<String, CompletableFuture<String>>> {
private List<String> listString;
Map<String, CompletableFuture<String>> futureMap;
public CallableC(List<String> listString) throws Exception {
this.listString = listString;
this.futureMap = new HashMap<>();
}
@Override
public Map<String, CompletableFuture<String>> call() throws Exception {
for (int j = 0; j < listString.size(); ++j) {
CompletableFuture<String> future = someMethod();
futureMap.put(listString.get(i), future);
}
return futureMap;
}
}
public void multiThreadMethod(List<String> items) throws Exception {
List<Future<Map<String, CompletableFuture<String>>>> uglyList = new ArrayList<>();
int coresNumber = (Runtime.getRuntime().availableProcessors());
// This method split a List in n subList
List<List<String>> splittedList = split(items, coresNumber);
ExecutorService executor = Executors.newFixedThreadPool(coresNumber);
try {
for (int i = 0; i < coresNumber; ++i) {
threads.add(executor.submit(new CallableC(splittedList.get(i),)));
}
for (int i = 0; i < coresNumber; ++i) {
uglyList.get(i).get(); //here
}
} catch (Exception e) {
// YAY errors
}
finally {
executor.shutdown();
}
}
}
请您参考如下方法:
when i call get on a element of uglyList the execution is blocked until the entire list (passed as parameter to the Callable) is visited and all CompletableFutures have been inserted in the map, right?
是的,这是正确的。 get()
将等待 Callable
的完成,在您的情况下,这意味着 CallableC#call()
方法的结束,当 CompletableFutures
全部创建并插入到 futureMap
中时,将在 for 循环结束时到达该方法,无论这些 CompletableFuture
的状态/完成情况如何。
it is possible that the get in this example, also waits the completion of the CompletableFutures in the map?
可以实现这一点,但需要额外的代码。
在代码中执行此操作的最低方法是在任务内的 CompletableFuture
上添加 join 调用。
@Override
public Map<String, CompletableFuture<String>> call() throws Exception {
for (int j = 0; j < listString.size(); ++j) {
CompletableFuture<String> future = someMethod();
futureMap.put(listString.get(j), future);
}
// This will wait for all the futures inside the map to terminate
try {
CompletableFuture.allOf(futureMap.values().toArray(new CompletableFuture[0])).join();
} catch (CompletionException e) {
// ignored - the status will be checked later on
}
return futureMap;
}
另一种方法是在最后等待(为了简洁起见,我跳过异常处理):
for (int i = 0; i < coresNumber; ++i) {
threads.add(executor.submit(new CallableC(splittedList.get(i),)));
}
// This will wait for all the futures to terminate (thanks to f.get())
// And then, all the completable futures will be added to an array of futures that will also be waited upon
CompletableFuture.allOf(
uglyList.stream().flatMap(f -> f.get().values().stream()).toArray(CompletableFuture[]::new)
).join();
但是如果对你来说可能的话,我会仅使用可完成的 future 来简化整个代码,除非你有充分的理由以你的方式中断任务(使用列表列表、专用执行器等......)。这可能看起来像这样(可能有一些错别字)
List<String> items = ...
ConcurrentMap<String, String> resultsByItem = new ConcurrentHashMap<>();
Future<Void> allWork = CompletableFuture.allOf(
items.stream()
.map(item -> someWork(item) // Get a completable future for this item
.andThen(result -> resultsByItem.put(item, result)) // Append the result to the map
).toArray(CompletableFuture[]::new)
);
allWork.join();
// Your futureMap is completed.
这将替换问题中的整个代码。