최근에 스레드에 대해 공부를 진행하고 있다.
그런데 Future, FutureTask, ComplatableFuture 관련해서 필자가 잘 이해를 못하고 있다.
그래서 이를 정리하고자 한다.
1. Future?
Oracle에서는 공식 문서에서 future에 대해 다음과 설명하고 있다.
Future는 비동기 계산의 결과를 나타냅니다.
계산이 완료되었는지 확인하고, 완료될 때까지 기다리고, 계산 결과를 검색하는 메서드가 제공됩니다.
결과는 계산이 완료되었을 때 get 메소드를 사용해서만 검색할 수 있으며 필요한 경우 준비될 때까지 차단됩니다.
취소는 cancel 메소드로 수행됩니다. 작업이 정상적으로 완료되었는지 또는 취소되었는지 확인하기 위한 추가 메서드가 제공됩니다. 계산이 완료되면 계산을 취소할 수 없습니다. 취소 가능성을 위해 Future를 사용하고 싶지만 사용 가능한 결과를 제공하지 않으려는 경우 Future<?> 형식의 유형을 선언하고 기본 작업의 결과로 null을 반환할 수 있습니다.
그런데 Future가 왜 필요한지 솔직히 필자는 잘 몰랐었다. (그대가 이미 안다면? 당신은 천재 ㅎ)
Future를 사용해서 얻을 수 있는 장점은 다음과 같다.
Future를 사용한 이점은 엄청 거~대한데 다른 유용한 작업을 할 수 있는 반면, 현재 스레드를 블로킹하지 않으면서도 비동기적으로 매우 중요환 계산을 할 수 있기 때문이다.
레스토랑에 가는 경우를 생각할 수 있다. 쉐프가 우리 저녁을 준비하는 동안 와인을 마신다던가, 친구와 이야기를 하는것 같은 일을 할 수 있는데 쉐프가 준비가 되면 마침내 음식을 먹을 수 있다.
다른 이점은 Future interface를 사용하면 thread로 직접 작업하는 것 보다 훨씬 더 개발자 친화적이다.
그렇다는데... Future 가 가지고 있는 method들을 살펴볼 것이다.
SquareCalculator.java
class SquareCalculator {
private ExecutorService executors = Executors.newSingleThreadExecutor();
public Future<Integer> calculate(Integer input) {
return executors.submit(() -> {
Thread.sleep(1000);
return input * input;
});
}
public void shutdown() {
executors.shutdown();
}
}
여기서 ExecutorService는 비동기 모드로 작업을 실행하는것을 쉽게 해주는 JDK API이다.
위의 코드에서는 메소드 이름에서 유추할 수 있듯이 새로운 하나의 thread를 사용해서 비동기 작업을 실행한다는 것을 보여준다.
FutureBasic.java
public class FutureBasic {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
SquareCalculator squareCalculator = new SquareCalculator();
Future<Integer> calculate = squareCalculator.calculate(100);
System.out.println(calculate.get());
Future<Integer> future = new SquareCalculator().calculate(10);
while(!future.isDone()) {
System.out.println("Calculating...");
Thread.sleep(300);
}
// get => Waits if necessary for the computation to complete, and then retrieves its result.
// join vs get? join은 반환값 없이 작업이 끝낼 때 까지 기다린다.
Integer result = future.get();
System.out.println("result :: " + result);
// 500 ms안에 응답이 없으면 TimeoutException을 던짐.
Integer result1 = future.get(500, TimeUnit.MILLISECONDS);
System.out.println("result1 :: " + result1);
Future<Integer> cancelFuture = new SquareCalculator().calculate(4);
boolean isCanceled = cancelFuture.cancel(true);
//cancelFuture.get(); 취소 후 get을 통해 결과값을 가져오려 하면 CancellationException을 던짐.
}
}
/** 출력 결과
10000
Calculating...
Calculating...
Calculating...
Calculating...
result :: 100
result1 :: 100
**/
2. Future를 활용한 MultiThreading
다음은 future를 여러개 사용하였다.
FutureSingle.java
public class FutureSingle {
public static void main(String[] args) throws InterruptedException, ExecutionException {
SquareCalculator squareCalculator = new SquareCalculator();
Future<Integer> future1 = squareCalculator.calculate(10);
Future<Integer> future2 = squareCalculator.calculate(100);
while (!(future1.isDone() && future2.isDone())) {
System.out.println(
String.format(
"future1 is %s and future2 is %s",
future1.isDone() ? "done" : "not done",
future2.isDone() ? "done" : "not done"
)
);
Thread.sleep(300);
}
Integer result1 = future1.get();
Integer result2 = future2.get();
System.out.println(result1 + " and " + result2);
squareCalculator.shutdown();
}
}
/**
calculating square for :: 10
calculating square for :: 100
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is done and future2 is not done
future1 is done and future2 is not done
future1 is done and future2 is not done
100 and 10000
**/
여기서 우리는 single thread로 작업을 했기 때문에 아무리 비동기라도 작업을 한 번에 한번씩 처리할 수 밖에 없어서
결과가 위처럼 나온 것이다.
그러면 Thread를 더 만들면 되겠지?
Thread Pool의 갯수를 2개로 늘려보자.
SquareCalculator.java (Thread Pool 을 1개에서 2개로)
class SquareCalculator {
private ExecutorService executors = Executors.newFixedThreadPool(2);
public Future<Integer> calculate(Integer input) {
System.out.println("calculating square for :: " + input);
return executors.submit(() -> {
Thread.sleep(1000);
return input * input;
});
}
public void shutdown() {
executors.shutdown();
}
}
그 다음 ThreadSingle.java를 실행하면 동시에 두 개의 task가 실행되는 것을 볼 수 있다.
public class FutureSingle {
public static void main(String[] args) throws InterruptedException, ExecutionException {
SquareCalculator squareCalculator = new SquareCalculator();
Future<Integer> future1 = squareCalculator.calculate(10);
Future<Integer> future2 = squareCalculator.calculate(100);
while (!(future1.isDone() && future2.isDone())) {
System.out.println(
String.format(
"future1 is %s and future2 is %s",
future1.isDone() ? "done" : "not done",
future2.isDone() ? "done" : "not done"
)
);
Thread.sleep(300);
}
Integer result1 = future1.get();
Integer result2 = future2.get();
System.out.println(result1 + " and " + result2);
squareCalculator.shutdown();
}
}
/**
calculating square for :: 10
calculating square for :: 100
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is done and future2 is not done
future1 is done and future2 is not done
future1 is done and future2 is not done
100 and 10000
**/
3. FutureTask?
FutureTask는 Future 인터페이스의 구현 클래스이며, Runnable 인터페이스도 구현한다. 이는 Runnable과 Future의 기능을 모두 갖추고 있다.
Future는 비동기 연산의 결과에 대한 참조를 제공하는 인터페이스이며,
FutureTask는 이 Future 인터페이스를 구현하는 실행 가능한 태스크다.
FutureTask는 Future의 모든 기능을 제공하면서, 동시에 Runnable을 구현하여 스레드에서 직접 실행될 수 있는 기능을 추가한다.
MyCallable.java
public class MyCallable implements Callable<String> {
private long waitTime;
public MyCallable(int timeInMillis){
this.waitTime=timeInMillis;
}
@Override
public String call() throws Exception {
Thread.sleep(waitTime);
//return the thread name executing this callable task
return Thread.currentThread().getName();
}
}
FutureTaskExample.java
public class FutureTaskExample {
public static void main(String[] args) {
MyCallable callable1 = new MyCallable(1000);
MyCallable callable2 = new MyCallable(2000);
FutureTask<String> futureTask1 = new FutureTask<String>(callable1);
FutureTask<String> futureTask2 = new FutureTask<String>(callable2);
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(futureTask1);
executor.execute(futureTask2);
while (true) {
try {
if(futureTask1.isDone() && futureTask2.isDone()){
System.out.println("Done");
//shut down executor service
executor.shutdown();
return;
}
if(!futureTask1.isDone()){
//wait indefinitely for future task to complete
System.out.println("FutureTask1 output="+futureTask1.get());
}
System.out.println("Waiting for FutureTask2 to complete");
String s = futureTask2.get(200L, TimeUnit.MILLISECONDS);
if(s !=null){
System.out.println("FutureTask2 output="+s);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}catch(TimeoutException e){
//do nothing
}
}
}
}
Future도 좋지만 FutureTask는 직접적이로 단순한 접근 방식을 제공한다고 한다.
또한, 각 태스크를 별도의 스레드에서 실행하고, 이를 통해 각 태스크의 완료 상태와 결과를 독립적으로 관리할 수 있다.
이렇게 되면 태스크 실행에 대한 더 큰 유연성과 제어를 제공한다.
4. ForkJoin과 ForkJoinPool
그러면 ForkJoin은 무엇인가?
Java 7에서부터 도입된 fork/join 프레임워크는 모든 사용가능한 처리 코어를 사용해서 병렬 처리 속도를 높이는데 돕는 도구를 제공한다. 이를 분할과 정복 접근을 통해서 해결한다.
실제로 이는 프레임워크가 먼저 "포크"되어 비동기식으로 실행될 수 있을 만큼 단순해질 때까지 작업을 더 작은 독립 하위 작업으로 반복적으로 나누는 것을 의미한다.
그 후 "참여" 부분이 시작된다. 모든 하위 작업의 결과는 단일 결과로 재귀적으로 결합된다. void를 반환하는 작업의 경우 프로그램은 모든 하위 작업이 실행될 때까지 기다린다.
효과적으로 병렬 실행을 제공하기 위해 fork/join 프레임워크는 ForkJoinPool이라고 불리는 thread의 pool을 사용한다. 이 pool은 ForkJoinWorkerThread 타입의 worker thread를 관리한다.
ForkJoinPool은 또한 ExecutorService의 구현체다.
ForkJoinPool에 대해서는 추후에 공부할 예정이다.
5. CompletableFuture?
completableFuture는 Java 8에서 도입된 Future의 확장으로, 초기 작업의 결과에 기능을 연쇄적으로 추가할 수 있는 옵션을 제공한다.
CompletableFuture는 여러 비동기 연산을 연결하거나, 여러 Future의 완료를 기다리는 데 사용할 수 있다.
우선 간단하게 CompletableFuture를 Future로 사용할 수 있다.
CompletableFutureBasic.java
public class CompletableFutureBasic {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Future<String> completableFuture = calculateAsync();
String result = completableFuture.get();
System.out.println("result = " + result);
}
public static Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});
return completableFuture;
}
}
// 결과 : result = Hello
6. CompletableFuture의 다양한 Method
내가 method를 사용해보고 느낀 점은 다음과 같다.
보면 작업을 처리하고, 이어서 처리하고, 에러가 나면 에러로 처리하고! 그리고 결과값을 가공하고!
이게 뭐 어디서 본거 같은 직관적인 흐름을 따라가고 있다.
우리 stream api 처럼 쉽게 작업을 직관적으로 추가해서 볼 수 있다.
많은 메소드를 지원하는데 정리한 것은 궁금하면 다음을 펴서 보면 된다.
기본 메서드
- thenApply / thenApplyAsync:
- thenApply: 현재 CompletableFuture의 결과에 함수를 적용합니다. 이 메서드는 동일한 스레드에서 실행됩니다.
- thenApplyAsync: thenApply와 같지만, 별도의 스레드(기본적으로는 ForkJoinPool의 스레드)에서 비동기적으로 실행됩니다.
- thenAccept / thenAcceptAsync:
- thenAccept: 현재 CompletableFuture의 결과를 소비합니다(소비만 하고 다른 값을 반환하지 않음). 동일한 스레드에서 실행됩니다.
- thenAcceptAsync: thenAccept와 같지만, 비동기적으로 실행됩니다.
- thenRun / thenRunAsync:
- thenRun: 현재 CompletableFuture의 결과와 무관하게 작업을 실행합니다(리턴 값이 없음). 동일한 스레드에서 실행됩니다.
- thenRunAsync: thenRun과 같지만, 비동기적으로 실행됩니다.
조합 메서드
- thenCompose / thenComposeAsync:
- thenCompose: 현재 CompletableFuture의 결과를 사용하여 새로운 CompletableFuture를 생성하고, 그 결과를 반환합니다.
- thenComposeAsync: thenCompose와 같지만, 비동기적으로 실행됩니다.
- thenCombine / thenCombineAsync:
- thenCombine: 두 개의 CompletableFuture를 결합하고, 두 연산의 결과에 함수를 적용합니다.
- thenCombineAsync: thenCombine과 같지만, 비동기적으로 실행됩니다.
- allOf:
- 여러 CompletableFuture를 모두 완료될 때까지 기다리는 새로운 CompletableFuture를 반환합니다.
- anyOf:
- 여러 CompletableFuture 중 하나라도 완료되면 결과를 반환하는 새로운 CompletableFuture를 반환합니다.
예외 처리 메서드
- exceptionally:
- CompletableFuture의 연산 중 예외가 발생했을 때 해당 예외를 처리합니다.
- handle / handleAsync:
- handle: 연산의 결과 또는 발생한 예외를 처리합니다.
- handleAsync: handle과 같지만, 비동기적으로 실행됩니다.
그러면 어떻게 사용하는지 간단히 보려고 한다.
CompletableFutureBasic.java
public class CompletableFutureBasic {
public static void main(String[] args) throws InterruptedException, ExecutionException, UnsupportedEncodingException {
Future<String> completableFuture = calculateAsync();
String result = completableFuture.get();
System.out.println("result = " + result);
CompletableFuture<String> completableFuture1
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> result2 = completableFuture1
.thenApply(s -> s + " World");
System.out.println("result2 :: " + result2.get());
//value를 반환하기 싫으면 consumer function도 사용 가능
CompletableFuture<Void> result3 = completableFuture1
.thenAccept(s -> System.out.println("consumer test :: " + s));
System.out.println("result3 = " + result3);
//해당 코드같이 이전의 결과값을 가지고 작업을 추가할 수 있다.
CompletableFuture<String> completableFuture3
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
System.out.println("completableFuture3 = " + completableFuture3);
//thenCombine은 두 개의 CompletableFuture를 결합하고, 두 연산의 결과에 함수를 적용한다.
CompletableFuture<String> completableFuture4
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(
() -> " World"), (s1, s2) -> s1 + s2);
System.out.println("completableFuture4.get() = " + completableFuture4.get());
//
}
public static Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});
return completableFuture;
}
}
public static Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});
return completableFuture;
}
}
thenApply롸 themCompose가 비슷한 것 같아서 찾아보니
다음과 같은 설명을 들을 수 있었다.
간단히 말해서, thenApply는 현재 CompletableFuture의 결과를 바탕으로 단순한 값을 계산하는 데 사용되며,
thenCompose는 이 결과를 사용하여 새로운CompletableFuture를 시작하는 데 사용됩니다.
thenCompose는 중첩된CompletableFuture 구조를 피하고자 할 때 유용합니다.
이렇게 이 글에서 Future, FutureTask, CompletableFuture에 대해 알아보았다.
출처
https://www.baeldung.com/java-future
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html
https://reflectoring.io/java-completablefuture/
https://www.baeldung.com/java-executor-service-tutorial
https://www.baeldung.com/java-fork-join
https://www.baeldung.com/java-completablefuture
https://www.digitalocean.com/community/tutorials/java-futuretask-example-program
'programming language > Java' 카테고리의 다른 글
[Java] ForkJoinPool 알아보기 (0) | 2024.01.28 |
---|---|
[Java] @annotation을 통한 Response 데이터 변경하기 (feat : jackson) (0) | 2023.12.01 |
[Java] 불변객체를 사용해야 하는 이유 + record 사용하기 (0) | 2023.07.05 |
[Java] 배열 - Arrays 정리하기 : 2탄 - copyOfRange, equals, fill, deepEquals (0) | 2023.02.09 |
[Java] 배열 - Arrays 정리하기 : 1탄 - copyOf, asList, binarySearch (0) | 2023.02.09 |