욱'S 노트

Java - Stream 본문

JAVA/Pleasure

Java - Stream

devsun 2016. 1. 14. 16:43

How streams work


스트림은 엘리먼트의 처리순서를 표현하고 이러한 엘리먼트들에 다른 종류의 연산을 제공한다.

List<String> myList = Arrays.asList("a1", "a2", "b1", "c2", "c1");

myList
.stream()
.filter(source -> source.startsWith("c"))
.map(String::toUpperCase)
.sorted()
.forEach(System.out::println);

스트림 연산은 intemediate 또는 terminal이다. Intermediate 연산은 stream을 리턴하여 세미콜론 없이 메소드 체이닝형식으로 사용할 수 있다. Terminal 연산은 void이거나 stream이 아닌 값을 리턴한다. 위의 예에서 filter, map, sorted는 intermediate 연산인 반면 forEach는 terminal 연산이다. 스트림에서 사용할 수 있는 모든 연산이 궁금하다면 javadoc을 살펴보도록 하자.


대부분의 스트림 연산은 람다 표현식을 파라미터로 받을 수 있는데 이는 functional interface로 연산의 동작을 명시한다. 대부분의 연산은 non-interfering 하며 stateless해야 한다. 이것은 무슨 의미인가?


함수가 간섭받지 않아야 한다는 의미는 스트림에 주어진 데이터소스를 변경할 수 없다는 것이다. 위의 예에서 보이듯 컬렉션의 엘리먼트를 추가하거나 삭제하는 람다식을 제공할 수 없다는 것이다.


또한 함수가 상태가 없어야 한다. 위 예제에서는 변경가능한 변수가 없거나 실행중에 변경할 수 있는 외부 스코프의 상태가 없다는 의미를 한다.


Different kind of streams


스트림은 다양한 데이터 소스 특히 컬렉션으로 부터 생성될 수 있다. List나 Set은 새로운 메소드 stream과 parallelStream을 제공한다. 병렬 스트림은 멀티 스레드에서 해당 오퍼레이션을 실행할 수 있는 능력을 가지고 있는데 이러한 내용은 뒷부분에서 다루도록 하겠다.

Arrays.asList("a1", "a2", "a3")
.stream()
.findFirst()
.ifPresent(System.out::println);

list의 stream 메소드를 호출함으로써 정상적인 오브젝트 스트림을 전달받을 수 있다. 그러나 스트림을 만들기 위해서 반드시 컬렉션을 생성해야 하는 것은 아닌다. 다음 코드를 보자.

Stream.of("a1", "a2", "a3")
.findFirst()
.ifPresent(System.out::println);

Stream.of()를 이용해서 오브젝트 레퍼런스의 묶음으로부터 스트림을 생성할 수 있다.


게다가 자바 8에는 기본형 int, long, double을 위한 특별한 종류의 스트림이 제공되는데 IntStream, LongStream 그리고 DoubleStream이다.


IntStream은 일반적인 for-loop을 IntStream.range()로 대체 할 수 있다.


IntStream.range(0, 4)
.forEach(System.out::println);


프리미티브 스트림은 일반적인 스트림과 같지만 다음과 같은 차이 점이 있다.

프리미티브 스트림은 특별한 람다 표현식을 사용할 수 있다. 예를 들어 Function 대신에 IntFunction을 Predicate 대신에 IntPredicate를 사용할 수 있다. 그리고 프리미티브 스트림에서는 추가적인 터미널 어그리게이트 오퍼레이션을 지원하는데 sum과 average이다. 

Arrays.stream(new int[] {1, 2, 3})
.map(n -> 2 * n + 1)
.average()
.ifPresent(System.out::println);

때떄로 일반적인 스트림을 기본형 스트림으로 변경하는 것은 편리하다. 이러한 목적으로 오브젝트 스트림은 특별한 매핑 오브젝트 오퍼레이션인 mapToInt, mapToLong과 mapToDouble을 제공한다.

Arrays.asList("a1", "a2", "a3").stream()
.map(s -> s.substring(1))
.mapToInt(Integer::parseInt)
.max()
.ifPresent(System.out::println);

기본형 스트림은 또한 mapToObj를 통해 객체 스트림으로 변경할 수 있다.

IntStream.range(1,4)
.mapToObj(n -> "a" + n)
.forEach(System.out::println);


Processing Order


이제 우리는 스트림을 어떻게 만들고 또 어떻게 동작하는지에 대해 배웠다. 이제 스트림 오퍼레이션이 어떻게 처리되는지 더욱 자세히 살펴보자.


인터미디어트 오퍼레이션의 중요한 특징은 laziness이다. 터미널 오퍼레이션이 없는 예제를 살펴보자.

Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println(s);
return true;
});


위의 코드를 실행하면 아무런 것도 콘솔에 출력되지 않는 것을 알 수 있다. 이유는 인터미디어트 오퍼레이션은 터미널 오퍼레이션이 존재할 때 실행되기 때문이다.


이제 위의 예제에 forEach 터미널 오퍼레이션을 추가해보자. 

Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter : " + s);
return true;
})
.forEach(s -> System.out.println("forEach : " + s));


filter : d2

forEach : d2

filter : a2

forEach : a2

filter : b1

forEach : b1

filter : b3

forEach : b3

filter : c

forEach : c


결과의 순서에 놀랄것이다. 원시적인 접근은 스트림에서 모든 오퍼레이션은 스트림의 모든 엘리먼트는 수평적으로 실행된다는 것이다. 하지만 대신에 각 엘리먼트는 수직적으로 체인으로 이동했다. 첫번째 문자열 d2는 필터에 전달된 다음 forEach에 전달되고, 그런 다음에서 a2가 처리되었다.


이러한 특성은 각 엘리먼트에 대한 실제적인 오퍼레이션의 수를 줄일수 있다. 다음을 살펴보자.

Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map : " + s);
return s.toUpperCase();
})
.anyMatch(s -> {
System.out.println("anyMatch : " + s);
return s.startsWith("A");
});


map : d2

anyMatch : D2

map : a2

anyMatch : A2


anyMacth 연산이 true를 리턴하자마자 predicate가 주어진 입력 엘레먼트에 적용된다. 이것은 두번째 element "A2"가 전달되었을 때이다. 스트림 체인은 버티컬하게 수행되기 때문에 이 경우 맵은 단지 두번만 실행된다. 그래서 스트림은 모든 엘레먼트를 매핑하는 대신에 최소한의 매핑만 수행하게 된다.


Why order matters


다음 예제는 두개의 인터미디어트 연산 map과 filter 그리고 forEach 터미널 오퍼레이션으로 구성되어 있다. 

Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("A");
})
.forEach(s -> System.out.println("forEach: " + s));

map: d2

filter: D2

map: a2

filter: A2

forEach: A2

map: b1

filter: B1

map: b3

filter: B3

map: c

filter: C



모든 문자열에 대해서 map과 filter가 수행되었으며 forEach는 한번 호출된 것을 확인할 수 있다.


만약 수행횟수를 줄이고 싶다면 필터를 체인의 시작으로 옮기면 된다.

Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));

filter: d2

filter: a2

map: a2

forEach: A2

filter: b1

filter: b3

filter: c



이제 맵은 한번만 수행될 것이며 파이프라인의 성능은 입력 엘레먼트 수에 비례하여 훨씬 빨라질 것이다. 복잡한 체인 연산을 수행할 시 명심해두자.


이제 위의 예제를 확장해보자. 정렬을 추가하였다.

Stream.of("d2", "a2", "b1", "b3", "c")
.sorted((s1, s2) -> {
System.out.printf("sort : %s, %s\n", s1, s2);
return s1.compareTo(s2);
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));

정렬은 특별한 종류의 인터미디어트 연산이다. 정렬동안 state를 유지해야 함으로 이러한 연산을 stateful operation이라고 부른다.


이러한 예제의 결과로 다음과 같은 콘솔 출력을 나타나는 것을 확인할 수 있다.


sort:    a2; d2

sort:    b1; a2

sort:    b1; d2

sort:    b1; a2

sort:    b3; b1

sort:    b3; d2

sort:    c; b3

sort:    c; d2

filter:  a2

map:     a2

forEach: A2

filter:  b1

filter:  b3

filter:  c

filter:  d2



첫번째 정렬 오퍼레이션은 전체 입력 컬렉션에 대해서 수행된다. 다시 말하자면 sorted는 horizontablly하게 실행된다. 그래서 이 경우 다수의 조합에 따라 8번 호출된다.


다시 한번 정렬 체인의 성능을 옵티마이즈 해보자.

Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.sorted((s1, s2) -> {
System.out.printf("sort : %s, %s\n", s1, s2);
return s1.compareTo(s2);
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));

filter: d2

filter: a2

filter: b1

filter: b3

filter: c

map: a2

forEach: A2


이 예제는 sorted가 호출되지 않는다. 이유는 필터가 입력 엘레먼트를 하나의 엘레먼트로 줄였기 때문이다. 성능은 굉장히 개선될 것이다.


Reusing Streams


자바 8 스트림은 재활용할 수 없다. 어떤 터미널 오퍼레이션이 호출되면 스트림은 닫힌다.

Stream<String> stream =
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true);
stream.noneMatch(s -> true);


같은 스트림의 결과로 anyMatch를 호출한 후에 다시 nonMatch를 호출하면 오류가 발생한다.


Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed

at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)

at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)

at com.kakao.chub.stream.TestStream.main(TestStream.java:15)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:483)

at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)


이러한 한계를 극복하기 위해 모든 터미널 오퍼레이션을 위한 새로운 스트림을 생성해야 한다.

그러나 우리는 stream supplier를 만들어서 이미 세팅된 인터미디어트 연산을 새로운 스트림으로 만들 수 있다.

Supplier<Stream<String>> supplier =
() -> Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));

supplier.get().anyMatch(s -> true);
supplier.get().noneMatch(s -> true);

각 get 메소드를 호출할 때마다 새로운 스트림이 생성되고 원하는 터미널 연산에 전달 할 수 있다.


Advanced Operations


스트림은 많은 수의 다른 오퍼레이션들을 제공한다. 우리는 이미 filter, map과 같은 매우 중요한 스트림 연산에 대해서 배웠다. 이제 더 복잡한 오퍼레이션 collect, flatMap 및 reduce에 대해서 살펴보자.


이번 섹션의 대부분의 코드 샘플은 사람에 대한 목록을 이용해서 진행해보겠다.

public class Person {
private String name;
private int age;

public Person(String name, int age) {
this.name = name;
this.age = age;
}

@Override
public String toString() {
return "Person{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}    

List<Person> persons = Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12)
);


Collect


Collect는 명백하게 유용한 터미널 오퍼레이션이다. 스트림의 각 엘리먼트를 다른 종류의 결과로 만들기 위해 트랜스폼하기 위해 사용된다. Collect는 4개의 다른 오퍼레이션으로 구성된 Collector를 적용할 수 있다. : supplier, accumulator, combiner 그리고 finisher. 처음에는 매우 복잡하게 들릴지 모르나 자바 8의 지원하는 매우 좋은 기능중의 하나이다. 


매우 단순한 유스케이스로 시작해보자.

List<Person> filteredPersons = persons.stream()
.filter(person -> person.name.startsWith("P"))
.collect(Collectors.toList());

System.out.println(filteredPersons);

[Person{name='Peter', age=23}, Person{name='Pamela', age=23}]


스트림의 엘레먼트를 리스트로 구성하는 것은 매우 쉽다는 것을 알 수 있다. list 대신에 set으로 구성하고 싶다면 Collectors.toSet()을 사용하면 된다.


다음 예제는 나이로 모든 사람들을 그룹핑하는 예제이다.

Map<Integer, List<Person>> personsByAge = persons.stream()
.collect(Collectors.groupingBy(person -> person.age));

personsByAge.forEach((age, personGroup) -> {
System.out.println(String.format("age : %d, persons : %s", age, personGroup));
});

age : 18, persons : [Person{name='Max', age=18}]

age : 23, persons : [Person{name='Peter', age=23}, Person{name='Pamela', age=23}]

age : 12, persons : [Person{name='David', age=12}]


Collectors는 정말 다양하다. 또한 스트림에 각 엘레먼트에 대한 어그리게이션 연산도 수행할 수 있는데 예를 들면 평균도 낼 수 있다.

Double average = persons.stream()
.collect(Collectors.averagingInt(person -> person.age));

System.out.println(average);


만약 더 포괄적인 통계에 관심이 있다면 summarizing collector는 빌트인 summary statistics 오브젝트를 반환한다. 이를 통해 우리는 쉽게 최소, 최대 평균, 카운트, 합계를 구할 수 있다.

IntSummaryStatistics statistics = persons.stream()
.collect(Collectors.summarizingInt(person -> person.age));

System.out.println(statistics);

IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}


다음 예제는 모든 사람을 하나의 문자열로 조인하는 것이다.

String phrase = persons.stream()
.filter(person -> person.age > 18)
.map(person -> person.name)
.collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));

System.out.println(phrase);

In Germany Peter and Pamela are of legal age.


조인 컬렉터에는 구분자와 옵션으로 prefix와 suffix를 적용할 수 있다.


스트림 엘레먼트를 맵으로 변경하기 위해서 우리는 키와 값을 명시해줘야 한다. 키는 유니크해야 하며 그렇지 않을 경우 IllegalStateException이 발생한다. 또한 오류 발생시 머지를 하기 위해 추가적인 연산을 전달할 수 있다.

Map<Integer, String> map = persons.stream()
.collect(
Collectors.toMap(
person -> person.age,
person -> person.name,
(name1, name2) -> name1 + ":" + name2
));

System.out.println(map);

{18=Max, 23=Peter:Pamela, 12=David}


이제 우리는 매우 강력한 빌트인 컬렉터에 대해서 알게 되었다. 이제 우리 자신만의 컬렉터를 만들어 보자. 우리는 | 파이프 문자열로 구분된 대문자로된 하나의 문자열로 만들어 보고 싶다. 우리는 Collector.of를 통해 새로운 컬렉터를 만들 수 있다. 컬렉터에 전달할 수 있는 재료는 다음과 같다. : a supplier, an accumulator, a combiner and a finisher.

Collector<Person, StringJoiner, String> personCollector =
Collector.of(
() -> new StringJoiner(" | "),
((stringJoiner, person) -> stringJoiner.add(person.name.toUpperCase())),
((stringJoiner1, stringJoiner2) -> stringJoiner1.merge(stringJoiner2)),
StringJoiner::toString
);

String names = persons.stream().collect(personCollector);

System.out.println(names);

MAX | PETER | PAMELA | DAVID


자바에서 문자열은 immutable 하기 때문에 StringJoiner와 같은 헬퍼 클래스가 필요하다. supplier는 적당한 구분자와 StringJoiner을 초기화하고, accumulator는 StringJoiner에 각 사람의 대문자 이름을 추가하기 위해 사용된다. combiner는 어떻게 두개의 StringJoiner를 하나로 머지할지를 알고 있다. 마지막 스텝은 finisher는 StringJoiner로부터 원하는 문자열을 만들어 낸다.


FlatMap


우리는 map 오퍼레이션을 통해 스트림의 객체가 어떻게 다른 타입의 객체로 변경할 수 있는지에 대해서 배웠다. Map은 한계가 있는데 모든 오브젝트가 정확히 하나의 다른 오브젝트에 매핑되어야 한다는 것이다. 그러나 만약 하나의 오브젝트가 다른 둘이상의 오브젝트로 변경하기를 원한다면 어떻게 할 것인가? 이러한 경우를 위해 flatMap이 있다.


FlatMap은 스트림의 각 엘레먼트를 다른 오브젝트의 스트림으로 변경한다. 그래서 각 오브젝트는 0,1 또는 둘 이상의 다른 오브젝트로 변경될 수 있다. 이러한 스트림들은 flatMap 오퍼레이션에서 리턴된 스트림에 포함된다.


FlatMap의 동작을 살펴보기 전에 우리는 적절한 type hierarchy가 필요하다.

public class Foo {
String name;

List<Bar> bars = new ArrayList<>();

Foo(String name) {
this.name = name;
}
}
public class Bar {
String name;

Bar(String name) {
this.name = name;
}
}


다음은 스트림에 가진 지식을 이용하여 두개의 오브젝트를 초기화해보자.


List<Foo> foos = new ArrayList<>();


// create foos

IntStream

    .range(1, 4)

    .forEach(i -> foos.add(new Foo("Foo" + i)));


// create bars

foos.forEach(f ->

    IntStream

        .range(1, 4)

        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));


이제 우리는 3개의 bar로 구성된 3개의 foo리스트를 가지게 되었다.


FlatMap에는 스트림을 리턴하는 함수를 적용할 수 있다. 그리서 각 foo의 bar 객체를 접근하기 위해 적당한 함수를 전달해보자.


foos.stream()
.flatMap(foo -> foo.bars.stream())
.forEach(bar -> System.out.println(bar.name));

Bar1<-Foo1

Bar2<-Foo1

Bar3<-Foo1

Bar1<-Foo2

Bar2<-Foo2

Bar3<-Foo2

Bar1<-Foo3

Bar2<-Foo3

Bar3<-Foo3


보시다시피 3개의 foo 오브젝트 스트림은 7개의 바 오브젝트 스트림으로 변경되었다.


결론적으로 위의 코드 예제는 스트림 단일 파이프라인을 단순화 할수도 있다.

IntStream.range(1,4)
.mapToObj(i -> new Foo("Foo" + i))
.peek(foo -> IntStream.range(1,4).forEach(i ->new Bar("Bar" + i + "<-" + foo.name)))
.flatMap(foo -> foo.bars.stream())
.forEach(bar -> System.out.println(bar.name));


FlatMap은 자바 8에서 소개된 Optional 클래스에서도 매우 유용하다. Optionals flatMap 연산은 다른 타입의 optional 객체를 리턴한다. 그래서 다루기 힘든 null 체크를 방지하기 위해 유용하게 사용할 수 있다.


다음과 같은 계층적인 구조를 생각해보자.

public class Outer {
Nested nested;
}
public class Nested {
Inner inner;
}
public class Inner {
String foo;
}


NullPointerException을 방지 하기 위해 outer 인스턴스에 inner foo 문자열에 널체크를 한다면 다음과 같이 될 것이다.

Outer outer = new Outer();

if (outer != null && outer.nested != null && outer.nested.inner != null) {
System.out.println(outer.nested.inner.foo);
}


같은 동작을 optionals flatMap을 사용하면 다음과 같다.


Optional.of(outer)
.flatMap(o -> Optional.ofNullable(o.nested))
.flatMap(n -> Optional.ofNullable(n.inner))
.flatMap(i -> Optional.ofNullable(i.foo))
.ifPresent(System.out::println);

각 flatMap 호출은 원하는 객체를 래핑한 Optional이 리턴된다. Optional에는 원하는 오브젝트가 있거나 null이 들어있을 것이다.


Reduce


Reduce 오퍼레이션은 스트림의 모든 엘레민트들을 조합하여 하나의 결과로 리턴한다. 자바 8에서는 3가지 reduce 오퍼레이션을 제공하는데 첫번째는 스트림의 엘레먼트를 스트림이 존재하는 하나의 엘레먼트로 변경하는 것이다. 앞의 예제에서 어떻게 가장 나이가 많은 사람을 추출하는지 알아보자.

persons.stream()
.reduce(((person1, person2) -> person1.age > person2.age ? person1 : person2))
.ifPresent(System.out::println);


리듀스 메소드는 BinaryOperator accumulator 함수를 적용할 수 있다. 실제로 BiFunction은 같은 타입간의 연산이며 이번 케이스는 Person이다. BiFunction이 Function과 다르게 두개의 인자를 적용한다. 이번 예제에서는 두 사람의 나이를 비교해서 나이가 가장 많은 사람을 리턴한다.


두번째 reduce 메소드는 id값과 BinaryOperator acculmulator를 적용할 수 있다. 이 메소드는 새로운 Person객체를 만드는데 이 객체는 스트림에 존재하는 모든 사람의 이름과 나이를 조합한 결과이다.


Person total = persons.stream()
.reduce(new Person("Total", 0), (person1, person2) -> {
person1.age += person2.age;
person1.name += person2.name;
return person1;
});

System.out.println(total);


Person{name='TotalMaxPeterPamelaDavid', age=76}


세번째 리듀스 메소드는 세 개의 파라미터를 적용할 수 있다 : id값, BiFunction accumulator 그리고 BinaryOperator의 combiner 함수이다. id값은 Person 타입으로 한정되지 않으므로, 모든 사람의 나이의 합을 내는 것은 아래와 같이 구현하는 것이 가능하다.


Integer total = persons.stream()
.reduce(0, (sum, person1) -> sum += person1.age, (sum1, sum2) -> sum1 + sum2);

System.out.println(total);


결과로 76이 리턴되는 것을 알 수 있다. 하지만 내부적으로 어떤일이 벌어질까? 위의 코드의 몇가지 디버그 코드를 입력해보자.

Integer total = persons.stream()
.reduce(0, (sum, person1) -> {
System.out.println(String.format("accumulator. sum=%d, person=%s", sum, person1));
return sum += person1.age;
}, (sum1, sum2) -> {
System.out.println(String.format("combiner. sum1=%d, sum2=%d", sum1, sum2));
return sum1 + sum2;
});

accumulator. sum=0, person=Person{name='Max', age=18}

accumulator. sum=18, person=Person{name='Peter', age=23}

accumulator. sum=41, person=Person{name='Pamela', age=23}

accumulator. sum=64, person=Person{name='David', age=12}


보시다시피 accumulator 함수는 모든 경우 실행되는 것을 알 수 있다. 처음 호출이 되면 id 값은 0이며 나머지 세 스텝이 호출되면 마지막 스텝에서는 나이가 증가하여 76이 됨을 알 수 있다.


그런데 combiner는 호출되지 않았다. 같은 스트림을 병렬로 돌려보자.

Integer total = persons.parallelStream()
.reduce(0, (sum, person1) -> {
System.out.println(String.format("accumulator. sum=%d, person=%s", sum, person1));
return sum += person1.age;
}, (sum1, sum2) -> {
System.out.println(String.format("combiner. sum1=%d, sum2=%d", sum1, sum2));
return sum1 + sum2;
});

accumulator. sum=0, person=Person{name='Pamela', age=23}

accumulator. sum=0, person=Person{name='David', age=12}

accumulator. sum=0, person=Person{name='Max', age=18}

accumulator. sum=0, person=Person{name='Peter', age=23}

combiner. sum1=23, sum2=12

combiner. sum1=18, sum2=23

combiner. sum1=41, sum2=35


이 스트림을 병렬로 실행하면 전체적인 실행이 달라지는 것을 알 수 있다. 그리고 병렬로 accumulator가 실행되기 때문에 combiner는 각 분리 계산된 값을 합계를 내기 위해 필요하다.


Parallel Streams


스트림은 대용량처리시 실행성능을 증가하기 위해 병렬로 수행될 수 있다. 병렬 스트림은 ForkJoinPool.commonPool 메소드를 이용하여 ForkJoinPool을 이용한다. 스레드풀의 사이즈는 5개까지 사용할 수 있으며 이것은 물리적인 CPU의 코어수에 비례한다.

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

System.out.println(forkJoinPool.getParallelism());


나의 머신에서는 디폴트는 7이었다. 이 값은 다음과 같은 JVM 파라미터에 의해 증가하거나 줄어들수 있다.

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5


컬렉션은 parallelStream이라는 메소드를 지원하는데 이는 엘레먼트의 스트림의 병렬 스트림을 생성하기 위해 사용된다. 대안으로는 인터미디어트 메소드 parallel을 호출할 수있는데 이는 직렬 스트림을 병렬 스트림으로 변경한다.


이해를 돕기 위해서 현재 스레드를 출력해보자.

Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.println(String.format("Current Thread : %s, filter : %s", Thread.currentThread(), s));
return true;
})
.map(s -> {
System.out.println(String.format("Current Thread : %s, map : %s", Thread.currentThread(), s));
return s.toUpperCase();
})
.forEach(s -> {
System.out.println(String.format("Current Thread : %s, forEach : %s", Thread.currentThread(), s));
});


Current Thread : Thread[main,5,main], filter : b1

Current Thread : Thread[ForkJoinPool.commonPool-worker-2,5,main], filter : c1

Current Thread : Thread[ForkJoinPool.commonPool-worker-4,5,main], filter : c2

Current Thread : Thread[main,5,main], map : b1

Current Thread : Thread[ForkJoinPool.commonPool-worker-4,5,main], map : c2

Current Thread : Thread[ForkJoinPool.commonPool-worker-3,5,main], filter : a1

Current Thread : Thread[ForkJoinPool.commonPool-worker-1,5,main], filter : a2

Current Thread : Thread[main,5,main], forEach : B1

Current Thread : Thread[ForkJoinPool.commonPool-worker-3,5,main], map : a1

Current Thread : Thread[ForkJoinPool.commonPool-worker-1,5,main], map : a2

Current Thread : Thread[ForkJoinPool.commonPool-worker-4,5,main], forEach : C2

Current Thread : Thread[ForkJoinPool.commonPool-worker-3,5,main], forEach : A1

Current Thread : Thread[ForkJoinPool.commonPool-worker-2,5,main], map : c1

Current Thread : Thread[ForkJoinPool.commonPool-worker-1,5,main], forEach : A2

Current Thread : Thread[ForkJoinPool.commonPool-worker-2,5,main], forEach : C1


위와 같이 각 스트림 오퍼레이션은 common ForkJoinPool을 이용하는 것을 알 수 있다. 


위의 예제에 추가적으로 소팅 작업을 수행해보자.

Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.println(String.format("Current Thread : %s, filter : %s", Thread.currentThread(), s));
return true;
})
.map(s -> {
System.out.println(String.format("Current Thread : %s, map : %s", Thread.currentThread(), s));
return s.toUpperCase();
})
.sorted((s1, s2) -> {
System.out.println(String.format("Current Thread : %s, sorted : %s, %s", Thread.currentThread(), s1, s2));
return s1.compareTo(s2);
})
.forEach(s -> {
System.out.println(String.format("Current Thread : %s, forEach : %s", Thread.currentThread(), s));
});

Current Thread : Thread[ForkJoinPool.commonPool-worker-1,5,main], filter : a2

Current Thread : Thread[ForkJoinPool.commonPool-worker-4,5,main], filter : c2

Current Thread : Thread[ForkJoinPool.commonPool-worker-2,5,main], filter : c1

Current Thread : Thread[main,5,main], filter : b1

Current Thread : Thread[ForkJoinPool.commonPool-worker-4,5,main], map : c2

Current Thread : Thread[ForkJoinPool.commonPool-worker-3,5,main], filter : a1

Current Thread : Thread[ForkJoinPool.commonPool-worker-2,5,main], map : c1

Current Thread : Thread[main,5,main], map : b1

Current Thread : Thread[ForkJoinPool.commonPool-worker-3,5,main], map : a1

Current Thread : Thread[ForkJoinPool.commonPool-worker-1,5,main], map : a2

Current Thread : Thread[main,5,main], sorted : A2, A1

Current Thread : Thread[main,5,main], sorted : B1, A2

Current Thread : Thread[main,5,main], sorted : C2, B1

Current Thread : Thread[main,5,main], sorted : C1, C2

Current Thread : Thread[main,5,main], sorted : C1, B1

Current Thread : Thread[main,5,main], sorted : C1, C2

Current Thread : Thread[main,5,main], forEach : B1

Current Thread : Thread[ForkJoinPool.commonPool-worker-2,5,main], forEach : C1

Current Thread : Thread[ForkJoinPool.commonPool-worker-3,5,main], forEach : A1

Current Thread : Thread[ForkJoinPool.commonPool-worker-5,5,main], forEach : A2

Current Thread : Thread[ForkJoinPool.commonPool-worker-1,5,main], forEach : C2


정렬작업은 메인 스레드에서 직렬로 실행된 것 처럼 보인다. 실제로 병렬 스트림에서는 Arrays.parallelSort()를 사용할 수 있다. 


만약 명시된 배열의 길이가 minimum granularity보다 작다면 적당한 Arrays.sort 메소드를 사용할 수 있다

마지막 섹션에 reduce메소드로 돌아가자. 우리는 이미 직렬 스트림에서 combiner 메소드를 실행해보았다. 이제 어떤 스레드가 실제적으로 참여하는지를 살펴보자.


List<Person> persons = Arrays.asList(

    new Person("Max", 18),

    new Person("Peter", 23),

    new Person("Pamela", 23),

    new Person("David", 12));


Integer total = persons.parallelStream()
.reduce(0, (sum, person1) -> {
System.out.println(String.format("accumulator. sum=%d, person=%s [%s]", sum, person1, Thread.currentThread().getName()));
return sum += person1.age;
}, (sum1, sum2) -> {
System.out.println(String.format("combiner. sum1=%d, sum2=%d [%s]", sum1, sum2, Thread.currentThread().getName()));
return sum1 + sum2;
});

System.out.println(total);

accumulator. sum=0, person=Person{name='Pamela', age=23} [main]

accumulator. sum=0, person=Person{name='Max', age=18} [ForkJoinPool.commonPool-worker-3]

accumulator. sum=0, person=Person{name='Peter', age=23} [ForkJoinPool.commonPool-worker-1]

accumulator. sum=0, person=Person{name='David', age=12} [ForkJoinPool.commonPool-worker-2]

combiner. sum1=23, sum2=12 [ForkJoinPool.commonPool-worker-2]

combiner. sum1=18, sum2=23 [ForkJoinPool.commonPool-worker-1]

combiner. sum1=41, sum2=35 [ForkJoinPool.commonPool-worker-1]

76


요약하자면 병렬 스트림은 대용량 처리에 좋은 성능 개선을 가질 수 있다. 그러나 명심해야 할 것은 reduce나 collect(예를 들어 combiner) 오퍼레이션은 추가적은 컴퓨팅 자원을 사용한다는 것이다.


더욱이 모든 병렬 스트림은 JVM 공통 ForkJoinPool을 사용한다. 그래서 무거운 병렬 스트림작업 떄문에 우리의 어플리케이션은 잠재적으로 더 느려질 수도 있다.


Happy coding!



출처 : http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/

'JAVA > Pleasure' 카테고리의 다른 글

Java - Nashorn  (1) 2016.03.10
Java - Lamda expression  (0) 2016.01.13
NIO란?  (0) 2015.04.17
Stack과 heap의 차이점  (0) 2015.01.12
Comments