programming language/Java

[Java] ForkJoinPool 알아보기

공대키메라 2024. 1. 28. 23:21

전에 Future랑 CompletableFuture에 대해 간단히 알아보았다.

 

이번에는 ForkJoinPool에 대해 알아보고자 한다.

 

지난 글에서 Future, FutureTask, CompletableFuture 에 대해 공부를 사알~짝 했었는데

 

이 내용을 보고 해당 글을 보면 더 이해가 잘 될 것이다.

 

(지난 내용 클릭)


 

1. Fork? Join? ExecutorSerivce? ForkJoinPool?

Java 7에서부터 Fork/Join 프레임워크를 도입했다.

사용 가능한 모든 프로세서 코어를 사용하여 병렬 처리 속도를 높이는 데 도움이 되는 도구를 제공하는데

이는 분할 및 정복 접근 방식을 통해 달성된다.

 

Fork

비동기식으로 실행될 수 있을 만큼 단순해질 때 까지 작업을 더 작은 독립 하위 작업으로 반복적으로 나눈다. 그리고 이 각각의 작은 태스크는 별도의 스레드에서 병렬로 실행될 수 있다.


Join

Join은 모든 하위의 작업 결과를 단일 결과로 결합하는 것이다. 즉, fork된 태스크가 완료될 때 까지 기다려서 태스트의 결과를 합치는데 사용된다. 모든 하위 태스크들이 완료된 후에만 호출된다.

 

Executor

동시에 여러 요청을 처리해야 하는 경우 매번 새로운 쓰레드를 만드는 것은 비효율적입니다.
그래서 쓰레드를 미리 만들어두고 재사용하기 위한 쓰레드 풀(Thread Pool)이 등장하게 되었는데,
Executor 인터페이스는 쓰레드 풀의 구현을 위한 인터페이스입니다.

 

ExecutorService

비동기 모드에서 작업 실행을 단순화하는 JDK API다. 일반적으로 ExecutorService는 자동으로 스레드 풀과 작업 할당을 위한 API를 제공한다.

 

ForkJoinPool

forkJoinPool은 그러면 어떤 놈인가?

worker thread를 관리하고 스레드 풀 상태 및 성능에 대한 정보를 얻을 수 있는 도구를 제공하는 ExecutorService 의 구현체이다.

 

worker thread는 한 번에 하나의 작업만 실행할 수 있는데 ForkJoinPool은 하나의 하위 task마다 분리된 thread를 생성하지 않는다. 대신에, pool안에 각각의 thread는 작업을 저장하는 자신만의 deque를 가진다.

 

이 아키텍처는 work-stealing 알고리즘의 도움으로 thread의 업무량을 분배하는데 필수적이다.

 

2. Work-Stealing 알고리즘?

이것은 또 무엇인가...?

그대로 읽어보면 일을 훔치는 알고리즘 이라는데... 

 

찾아보니 다음과 같다.

알고리즘은 병렬 컴퓨팅에서 사용되는 효율적인 태스크 스케줄링 방법입니다. 이 알고리즘의 기본 아이디어는 멀티코어 프로세서에서 각 코어가 자체적인 태스크 큐를 가지고 있으며, 일부 코어가 다른 코어보다 더 많은 작업을 가지고 있을 때, 덜 바쁜 코어들이 바쁜 코어의 큐에서 작업을 '도둑질'(steal)하여 처리하는 것입니다.

 

그런데 ForkJoinPool이 해당 알고리즘과 무슨 연관이 있는지 찾아보았다.

 

Java의 ForkJoinPool은 Work-Stealing 알고리즘을 기반으로 하는 특별한 유형의 스레드 풀입니다.
ForkJoinPool은 특히 재귀적 태스크 분할(recursive task splitting)을 처리하는 데 최적화되어 있습니다.

ForkJoinPool에서는 각 스레드(일반적으로 CPU 코어에 해당)가 자신의 작업 큐를 가지고 있으며, fork() 메서드를 통해 생성된 하위 태스크들을 이 큐에 넣습니다.
스레드가 자신의 큐에 있는 태스크를 모두 처리하고 남은 태스크가 없을 때, 다른 스레드의 큐에서 남은 작업을 가져와(steal) 처리합니다.

 

Work-Stealing 알고리즘의 주요 특징은 다음과 같다.

 

  1. 분산 태스크 관리: 각 코어가 자신의 로컬 태스크 큐를 가집니다.
  2. 부하 균형: 한 코어가 처리할 태스크가 없을 경우, 다른 코어의 태스크 큐에서 작업을 가져옵니다.
  3. 효율적인 자원 활용: 모든 코어가 지속적으로 작업을 처리하도록 하여 자원을 최대한 활용합니다.
  4. 동적 조정: 작업 부하가 시간에 따라 변함에 따라 태스크 분배도 동적으로 조정됩니다.

 

3. ForkJoinPool vs ExecutorService

 

필자는 공부를 하는 와중에 굳이 ForkJoinPool이 왜 필요한지, 어느 상황에서 사용하면 좋은지 GPT에게 물어보았다.

 

ForkJoinPool은 ExecutorService의 구현체 중 하나이며, 특정 유형의 작업에 매우 적합하게 설계되었습니다. ForkJoinPool을 사용하는 것이 ExecutorService의 다른 구현체를 사용하는 것보다 더 유리한 몇 가지 상황을 아래에 설명드리겠습니다.

ForkJoinPool을 사용하면 좋은 상황:
분할 정복(Divide-and-Conquer) 알고리즘: ForkJoinPool은 재귀적인 작업 분할에 최적화되어 있습니다. 예를 들어, 퀵소트, 병합 정렬, 트리 탐색과 같은 분할 정복 알고리즘은 ForkJoinPool을 사용하여 효율적으로 처리할 수 있습니다.대량의 데이터 처리: 큰 데이터 세트를 병렬로 처리해야 하는 경우, 예를 들어 대규모 배열의 처리나 복잡한 계산을 수행해야 하는 경우 ForkJoinPool이 적합합니다.재귀적 태스크 처리: 알고리즘이 재귀적으로 태스크를 생성하는 경우, ForkJoinPool은 이러한 타입의 작업을 효율적으로 관리할 수 있습니다.Work-Stealing 알고리즘 활용: ForkJoinPool은 Work-Stealing 알고리즘을 사용하여, 작업 스레드 간에 부하를 동적으로 균형잡아 줍니다. 이는 다른 ExecutorService 구현체보다 스레드 활용도를 높여줍니다.

일반적인 ExecutorService 사용이 더 좋은 상황:
단순한 작업 큐: 작업이 간단하고 독립적일 경우, 예를 들어 단순한 HTTP 요청 처리나 배치 작업 처리 같은 경우에는 일반적인 ExecutorService (예: ThreadPoolExecutor)가 더 적합할 수 있습니다.IO 집중적 작업: IO 작업(예: 파일 읽기/쓰기, 네트워크 요청)이 주를 이루는 경우, ForkJoinPool보다는 전통적인 스레드 풀이 더 적합할 수 있습니다. IO 작업은 CPU 작업보다 대기 시간이 많기 때문에, ForkJoinPool의 Work-Stealing 메커니즘이 큰 이점을 제공하지 못할 수 있습니다.

결론
ForkJoinPool은 복잡하고 계산 집중적인 작업, 특히 재귀적 태스크 분할이 필요한 경우에 효율적입니다. 반면, IO 집중적 작업이나 간단한 병렬 태스크를 처리하는 경우에는 ThreadPoolExecutor와 같은 전통적인 ExecutorService 구현체를 사용하는 것이 더 나을 수 있습니다. 상황에 따라 가장 적합한 도구를 선택하는 것이 중요합니다.

 

그러면 이제 어떻게 사용해야 하는지 알아보려고 한다.

 

3. ForkJoinPool 사용

ForkJoinTask

ForkJoinTask는 ForkJoinPool 내에서 실행되는 작업의 기본 유형이다. 실제로는 두 하위 클래스 중 하나, 즉 void작업을 위한 RecursiveAction  값을 반환하는 작업을 위한 RecursiveTask<V> 중 하나를 확장해야 합니다. 둘 다 작업의 논리가 정의되는 추상 메서드 Compute()를 가지고 있습니다.

 

RecursiveAction 과  RecursiveTask

 

CustomResersiveAction.java

public class CustomRecursiveAction extends RecursiveAction {

    private String workload = "";
    private static final int THRESHOLD = 4;

    private static Logger logger = 
      Logger.getAnonymousLogger();

    public CustomRecursiveAction(String workload) {
        this.workload = workload;
    }

    @Override
    protected void compute() {
    //workload의 길이가 THRESHOLD보다 크면 작업을 분할하고, 그렇지 않으면 직접 처리
        if (workload.length() > THRESHOLD) {
            ForkJoinTask.invokeAll(createSubtasks());
        } else {
           processing(workload);
        }
    }

//workload를 두 부분으로 나누어 각각에 대한 새로운 CustomRecursiveAction 인스턴스를 생성
    private List<CustomRecursiveAction> createSubtasks() {
        List<CustomRecursiveAction> subtasks = new ArrayList<>();

        String partOne = workload.substring(0, workload.length() / 2);
        String partTwo = workload.substring(workload.length() / 2, workload.length());

        subtasks.add(new CustomRecursiveAction(partOne));
        subtasks.add(new CustomRecursiveAction(partTwo));

        return subtasks;
    }

    private void processing(String work) {
        String result = work.toUpperCase();
        logger.info("This result - (" + result + ") - was processed by " 
          + Thread.currentThread().getName());
    }
}

 

 

위의 예시 코드는 우리가 처리할 업무를 (여기서는 String 자르기) 부하(workload)를 임계점(THRESHOLD)를 기준으로 업무들을 작은 단위들로 자르고 있다.

 

CustomRecursiveTask.java

public class CustomRecursiveTask extends RecursiveTask<Integer> {
    private int[] arr;

    private static final int THRESHOLD = 20;

    public CustomRecursiveTask(int[] arr) {
        this.arr = arr;
    }

//실제 작업을 수행하는 메소드
//배열의 크기가 THRESHOLD보다 크면 작업을 분할하고, 그렇지 않으면 배열을 직접 처리
    @Override
    protected Integer compute() {
        if (arr.length > THRESHOLD) {
            return ForkJoinTask.invokeAll(createSubtasks())
              .stream()
              .mapToInt(ForkJoinTask::join)
              .sum();
        } else {
            return processing(arr);
        }
    }
    
// 배열을 두 부분으로 나누어 각각에 대한 새로운 CustomRecursiveTask 인스턴스를 생성
    private Collection<CustomRecursiveTask> createSubtasks() {
        List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
        dividedTasks.add(new CustomRecursiveTask(
          Arrays.copyOfRange(arr, 0, arr.length / 2)));
        dividedTasks.add(new CustomRecursiveTask(
          Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
        return dividedTasks;
    }

//실제 배열 처리를 수행하는 메소드
//배열에서 특정 조건(10보다 크고 27보다 작은 요소)에 맞는 요소를 필터링하고, 
//각 요소에 10을 곱한 후 합계를 계산
    private Integer processing(int[] arr) {
        return Arrays.stream(arr)
          .filter(a -> a > 10 && a < 27)
          .map(a -> a * 10)
          .sum();
    }
}

 

CustomRecursiveTask.java의 경우 CustomResersiveAction.java와 다른 점은 기본적인 작업이 살짝 다르긴 하지만 앞서 말했듯이 return 값이 있느냐 없느냐인 것이다.

 

 

ForkJoinPoolExample.java

public class ForkJoinPoolExample {
    public static void main(String[] args) {
        /**
         * ForkJoinPool 생성법
         * 1. commonPool()
         * 2. new ForkJoinPool();
         * 3. PoolUtil.forkJoinPool (org.apache.commons.pool2 사용);
         */
        ForkJoinPool forkJoinPoolAction = ForkJoinPool.commonPool();
        CustomRecursiveAction customRecursiveAction = new CustomRecursiveAction("abcde qwerkkfgj dsoqomvm qolwlkjdlaskdlaklsd asdfkqwer");
        forkJoinPoolAction.invoke(customRecursiveAction);

        ForkJoinPool forkJoinPoolTask = ForkJoinPool.commonPool();
        int[] arr = {1,0,5,48,4,8,158,18,165,1,58,1,52,5,84,23,1,567,84,21,5,498,6};
        CustomRecursiveTask customRecursiveTask = new CustomRecursiveTask(arr);
        Integer invoke = forkJoinPoolTask.invoke(customRecursiveTask);
        System.out.println("invoke = " + invoke);
    }
}

 


 

사실 코드는 뭐 인터넷 찾으면 다 볼 수 있는 것이고 필자도 해외 사이트를 참고로 글을 정리하는 정도이다.

 

그런데 중요한 점은 이것이 왜 필요하고, 어느 상황에 좋은지 알아둬서

 

적재적소에 원하는 작업을 빠르게 하는것이 중요하다 생각한다.

 

 

출처:

https://velog.io/@ssssujini99/Java-Callable-Executors-ExecutorService-의-이해-및-사용법

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html

https://www.baeldung.com/java-fork-join