programming language/Java

[Java] Stream(스트림) - 2탄

공대키메라 2025. 5. 8. 22:11

지난번에 Stream1에서 읽다가 말았는데, 

 

이번에는 stream을 Parallelism 부터 이어서 읽어보도록 하겠다.

 

이를 이해하는데 thread관련 글을 읽으면 좀 더 도움이 될 것이다.

 


 

잠시 기억 복기...

 

Steam이란?

자바에서의 스트림은 뭔가 데이터 요소들의 연속, 즉 데이터 파이프 라인을 만들어서 데이터를 다루도록 도와주는 도구

 

데이터 파이프라인?

데이터의 수집부터 처리, 변환, 저장에 이르기까지 일련의 단계를 거치는 과정

 

Stream의 특징?

No storage, Funtional in nature, Laziness-seeking, probably unbounded, consumable!

 

1. Parallelism(병렬성)

명확히 for-loop을 가지고 원소들을 처리하는건 순차적이다.

 

Stream은 각자 원소를 명령형 작업보다는 집계 작업의 파이프라인으로 계산을 재구성해 병렬 처리를 가능하게 한다.

 

이게 도대체 무슨 말이냐?

 

다음 예시를 보겠다.

 

예시 1

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int sum = 0;
for (int number : numbers) {
    if (number % 2 == 0) {
        int doubled = number * 2;
        sum += doubled;
    }
}

 

위를 보면 for-loop을 사용하면 일일히 선언적으로, 명령형 작업을 직! 접! 쳐서 계산을 하도록 한다.

 

하지만 다음 코드라면 어떨까?

// 선언적 스트림 접근 방식
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int sum = numbers.stream()
                .filter(n -> n % 2 == 0)    // 중간 연산
                .map(n -> n * 2)            // 중간 연산
                .reduce(0, Integer::sum);   // 종단 연산

 

 

이렇게 명확하게 선언하기 어떻게 할지만 스트림 API에게 주는 것이다. 

 

모든 streams 작업들은 순차적으로 혹은 병렬적으로 수행할 수 있다.

 

JDK 에서 stream 구현체들은 parallelism 이 명확히 요구되지 않는다면 순차적인 stream을 생성한다.

 

예를 들어, Collection에는 Collection.stream() 그리고 Collection.parallelStream()이 있는데 전자는 순차적인, 후자는 병렬적인 stream을 각각 생성한다. 

 

IntStream(int, int) 메소드를 통해서도 순차적인 stream을 생성할 수 있지만, BaseStream.parallel() 메소드를 사용해서 효율적으로 병렬 처리가 가능하다.

 

int sumOfWeights = widgets.parallelStream()
                               .filter(b -> b.getColor() == RED)
                               .mapToInt(b -> b.getWeight())
                               .sum();

 

순차적인것과 병렬적인 버전의 유일한 처이점은 stream()대신에 parallelStream()를 이용해서 초기 stream을 생성했다는 점이다. 최종 작업이 실행되면, stream pipeline은 호출된 stream이 무엇인지에 따라 순차적으로 혹은 병렬적으로 수행된다.

 

stream이 순차적으로 혹은 병렬적으로 실행될건지 아닌지는 isParallel() 메소드로 확인할 수 있으며 stream의 방향은 BaseStream.sequential() 그리고 BaseStream.paralle() 작업으로 수정될 수 있다.

 

최종 연산(terminal operation) 이 시작되면, stream pipeline이 호출된 작업에 따라서 실행된다. 

 

명확히 비결정적(non-deterministic)으로 식별된(findAny() 같은) 작업을 제외하고 stream이 순차적으로 작업하던지 병렬적으로 작업하던지는 계산의 결과를 바꿀 수 없다.

 

여기서 findAny() 메소드는 왜 제외하나 하면 메소드의 명칭에서 보듯이... 아무 것이나 찾으면 그것을 처리하는 메소드이니기 때문에 어떤 결과가 나올지 모르기 때문이다.

 

대부분 스트림 작업들은 자주 람다 표현식(lambda expressions) 같은 사용자에 의해 구체화된 행위들을 파라미터로 받는다. 

 

정확한 행위를 유지하기 위해 이러한 행위 파라미터(behavioral parameters)들은 non-interfering(비간섭) 이어야 하고 대부분의 경우에 무상태(stateless)여야 한다. 그러한 파라미터들은 항상 Function 같은 함수형 인터페이스의 인스턴스이고 자주 람다 표현식이거나 메소드 레퍼런스다.

 

 

이제... non-interfering과 stateless라는 표현이 나오는데 이 뒤를 읽으면 알 수 있다.

 

바로 뒤에 이어서 설명을 하고 있다.

 

여기서 잠깐... 정말 parallel을 사용하면 성능이 좋은지 안좋은지 말이 많았던것으로 기억하고, 그렇게 성능이 사실 좋은거 같지 않다고 기억하는데... 이에 대해 다시 thread관련해서 내용들은 추후에 또 정리할 예정이다.

 

 

2. Non-inferference(비간섭)

비간섭? 뭘 간섭하지 말라는거지?

 

streams는 다양한 데이터 소스를 통해 병렬처리 가능한 집계 작업들을 수행할 수 있게 도와준다.

ArrayList같은 non-thread-safe한 collections들도 포함해서 말이다.

 

* 여기서... thread-safe하다는 말은?

여러 스레드가 동시에 같은 객체나 메서드에 접근하더라도 프로그램이 정상적으로 동작하는 상태

스레드간 race condition(경쟁 조건)이나 데이터 불일치가 발생하지 않는다.

Java 에서는 이를 구현하기 위해 synchronized, lock 인터페이스, ConcurrentHashMap같은 concurrent collections 혹은 Atomic 변수를 사용해서 구현이 가능하다.

 

비간섭은 우리가 stream pipeline의 실행동안 데이터 소스로의 간섭을 막는 경우에만 가능하다.

 

escape-hatch 작업들인 iterator() 그리고 spliterator()를 제외하고 최종 작업(terminal operation)이 호출될 떄만 실행을 시작하고 최종 작업이 끝나면 종료한다. 

 

대부분의 데이터 소스를 위해 간섭을 방지하는건 데이터가 스트림 파이프라인 실행동안 수정되지 않는다는걸 확실히 하는 수단이다.

 

주목할 만한 예외는 동시 수정을 처리하도록 특별히 설계된 동시성 컬렉션에서 나오는 스트림이다.

 

동시성 스트림 소스는 Spliterator가 CONCURRENT 특성을 보고하는 것들입니다.

 

 

따라서, 소스가 동시적이지 않은 스트림 파이프라인에 행위 파라미터들은 절대로 스트림의 데이터 소스를 수정해서는 안된다.

 

물론 이런거는 가능하다.

 

 

예시

List<String> l = new ArrayList(Arrays.asList("one", "two"));
     Stream<String> sl = l.stream();
     l.add("three");
     String s = sl.collect(joining(" "));

 

 

정리하면 비간섭 원칙이 필요한데, 이는 안전하고 예측 가능한 스트림 파이프라인을 위해 중요하다.

 

 

3. Stateless behaviors(무상태 행위)

갑자기 무슨 stateless? stateful? 이런 말이 나온다.

 

이게 무슨 말인고 하니... 상태를 가지고 있으면(stateful) 이전의 상태가 현재의 계산이나 연산에 영향을 주는 것이고,

 

상태를 가지고 있지 않으면(stateless) 오직 현재 연산에만 의존하고 외부와 독립적다고 생각하고 읽어보려고 한다.

 

 

스트림 파이프라인 결과는 스트림 작업에서 행위 파라미터들이 stateful 하다면 비결정적거나 틀릴 수 도 있다.

 

stateful한 람다(또는 적절한 함수형 인터페이스를 구현하는 다른 객체)는 결과가 스트림 파이프라인의 실행 동안 변할 수도 있는 어떤 상태에 의존한다.

 

이게 도대체 무슨 말이야? 예시를 쭉 읽어보겠다.

 

예시

Set<Integer> seen = Collections.synchronizedSet(new HashSet<>());
 stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...

 

 

만약 매핑 작업이 병렬로 수행되면, 같은 입력의 결과가 쓰레드 스케중일의 사이점 때문에 작동할 때 마다 다를 수 있다. 

 

반면에, 무상태 람다 표현을 사용하면 결과는 항상 같다.

 

코드를 보면 당연 그럴 수 밖에 없다. 

 

병렬로 수행이 되면, set에서 언제 뭐가 추가가 되서 return 을 할 지 모른다.

 

하지만 무상태 람다로 외부에 의존하지 않는 코드(우리가 일반적으로 가장 쉽게 떠올리는 stream을 활용한 map들)

 

즉, stateless 람다를 사용하면 결과는 늘 같다.

 

 

예시 코드에서는 data race 를 막기 위해서 synchronizedSet을 사용해서 병렬 처리와 stateless 에 대해 보여주고 있다.

 

 

4. Side-Effects

스트림 연산에 사용되는 행동 파라미터(behavioral parameters)에서의 부작용(side-effects)은 일반적으로 권장되지 않는다.

 

이는 무상태(statelessness) 요구사항을 의도치 않게 위반하거나 다른 스레드 안전성 위험을 초래할 수 있기 때문입니다.

 

여기서 부작용은 스트림 연산의 람다식이나 메서드 참조가 외부 변수를 수정하거나 외부 상태에 영향을 주는 것을 의미한다.

 

 

말이 딱딱하니 예시로 바로 넘어가겠다.

 

 ArrayList<String> results = new ArrayList<>();
     stream.filter(s -> pattern.matcher(s).matches())
           .forEach(s -> results.add(s));  // Unnecessary use of side-effects!

 

마지막에 stream사용하면서 non-interfering 을 지키지 않았다. 이거 때문에 결과가 계속 다르게 나올 것이다. 즉, 부작용이 생긴다!

 

non-thread-safe한데 병렬 처리를 하고 있다! 총체적 난국이다! 그런데 마지막은 또 결과를 계속 add 해서 외부 결과에 영향을 주고 있다.

 

이 코드는 다음과 같이 변경되어야 한다.

 

     List<String>results =
         stream.filter(s -> pattern.matcher(s).matches())
               .collect(Collectors.toList());  // No side-effects!

 

코드 사이사이에 외부에 영향을 주게 되면 이게 예측이 힘들고 유지보수가 힘들건 말안해도 알 것 같다.

 

5. 뒤에 그 외 것들

글이 엄청 긴데, 이것들은 stream이 가진 특징 혹은 개념이 아닌 지원해주는 여러 기능들에 대해 설명하고 있어서 소개만 하려고 한다.

 

궁금하면 읽어보자. 헥헥...

 

Ordering

스트림은 encounter order(스트림 요소가 처리되는 순서) 를 가지거나 안가진다. 순서에 대해 설명

 

Reduction operations

입력 원소들을 하나의 요약처럼 결합하는 작업을 수행하는 방법에 대해 설명

Mutable reduction

가변 결과 컨테이너를 사용해서 스트림 요소를 결합함에 대해 설명

 

Reduction, concurrency, and ordering

리더션, 동기화, 순서 맞춤 여러개 적용이 가능함을 설명

Associativity

작업 혹은 함수들이 다음과 같이 결합법칙적이다. (a op b) op c == a op (b op c) 등등~

 

Low-Level stream construction

저레벨에서 스트림 구조를 어떻게 사용하는지 설명

 

 


 

 

내가 멍청한건지 아닌지 모르겟는데, 열심히 정리해서 뒤돌아서면 기억이 잘 안나기도 한다.

 

반복 학습과 이해가 이렇게 중요한 듯 하다.

 

이해하지 않고 그냥 보는건... 까먹기 쉽고, 또 보지 않는다면 금방 잊어버린다.

 

마지막 것들은 그냥 사용법에 좀 가까운 부분들 같아서 과감히 글에서 생락했다.

 

궁금한 분들은 읽어보면 아 그냥 이렇게 쓰는거네... 하면서 넘기리라 생각한다.

 

스트림 이렇게 복잡했어? 참놔~ 처음엔 재미있다가... 이게 영어로 읽으니 와닿지가 않아서 알아먹기 힘들었다.

 

그래도 한 번 쯤 읽어볼만 했다고 느낀다옹...

 

출처

https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html