욱'S 노트

Apache Storm - Trident Tutorial 본문

Programming/Storm

Apache Storm - Trident Tutorial

devsun 2015. 12. 7. 18:21

Trident는 스톰 위에서 실시간 집계를 수행하기 위한 하이레벨 추상화이다. 이것은 끊김없이 대용량 처리를 가능하게 해준다.(초당 수백만의 메시지) 지연없는 분산쿼리를 통해 스테이트풀 스트림 프로세싱을 가능하게 한다. Pig나 Cascading과 같은 고수준의 배치 프로세싱 툴에 친숙하다면 Trident의 개념은 매우 유사하다. Trident는 joins, aggregations, groupings, functions, filters 연산을 수행할 수 있다. 트라이던트는 어떤 데이터베이스 또는 퍼시스턴트 스토어의 위에서 스테이풀하고 인크리멘탈 프로세싱을 주로 처리하기 위해 추가되었다. Trident는 일관성 있고, 정확하고 쉽게 수행된다.


Illustrative example


트라이던트에 관한 실증적인 예제를 살펴보자. 이번 예제는 두가지를 수행한다.


문장의 입력 스트림으로부터 단어의 갯수를 계산한다.

단어 리스트의 카운트를 합계하는 쿼리를 구현한다.

이 예제의 목적은 다음 소스와 같이 무한의 문장을 읽는 것이다.


FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,

               new Values("the cow jumped over the moon"),

               new Values("the man went to the store and bought some candy"),

               new Values("four score and seven years ago"),

               new Values("how many apples can you eat"));

spout.setCycle(true);


이 스파우트는 문장 집합을 순환하며 문장 스트림을 생성한다. 다음 코드는 스트림의 단어 부분을 카운팅하는 코드이다.


This spout cycles through that set of sentences over and over to produce the sentence stream. Here's the code to do the streaming word count part of the computation:


TridentTopology topology = new TridentTopology();        

TridentState wordCounts =

     topology.newStream("spout1", spout)

       .each(new Fields("sentence"), new Split(), new Fields("word"))

       .groupBy(new Fields("word"))

       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                

       .parallelismHint(6);


코드를 라인에 따라 읽어보자. 첫번째 TridentTopology 오브젝트가 생성되었다. 이는 트라이언트 계산을 생성하기 위한 인터페이스를 노출한다.


TridentTopology는 newStream이라고 불리우는 메소드를 가지고 있으며 입력 소스로부터 데이터를 읽어 토폴로지에 새로운 스트림 데이터를 생성한다. 이 경우 입력소스는 이전에 정의한 FixedBatchSpout이다. 입력 소스는 Kestrel이나 Kafka와 같은 큐 브로커일 수 있다. Trident는 각 입력 소스를 위한 작은 량의 상태를 주키퍼에 기록하고 있다. " "spout1"이라는 문자열은 주키퍼의 노드를 명시하며 해당 위치에 트라이언트는 메타데이터를 유지한다.


트라이언트는 투플의 작은 배치로서 스트림을 처리한다. 예를들어 문장의 입력된 스트림은 다음과 같이 작은 배치로 쪼개질 수 있다.



일반적으로 이러한 작은 배치 사이즈는 수천 또는 수백만의 투플일 수가 있는데 이는 입력된 처리량에 따른다.

트라이언트는 작은 배치를 처리하기 위한 완전히 독립적인 배치 프로세싱 API를 제공한다.  API는 Pig나 Cascading과 같은 하둡의 고수준의 추상화 API와 매우 비슷한다. : group by와 join 및 aggregation을 수행할 수 있고,  function을 실행할 수 있으며, filter를 실행할 수 있다. 물론 각각의 작은 배치 처리는 독립적이고 서로 관심을 갖지 않는다. 그래서 트라이언트는 배치를 전체적으로 aggregation을 수행할 수 있는 기능을 제공하고 이러한 aggregation을 저장할 수 있게 한다. - 메모리든, Memcached던 Cassandra든 또는 어떤 스토어든. 마침내 트라이언트는 실시간 상태를 질의할 수 있는 첫번째 기능을 가졌다. 상태는 트라이언트에 의해 업데이트 될 것이다. 그것은 상태의 독립된 소스일 수 있다.


public class Split extends BaseFunction {

   public void execute(TridentTuple tuple, TridentCollector collector) {

       String sentence = tuple.getString(0);

       for(String word: sentence.split(" ")) {

           collector.emit(new Values(word));                

       }

   }

}


보시다시피 매우 단순하다. 단순히 문장을 잡아서 whitespace로 분리를 하고 각각의 단어로 투플을 발행한다.


토폴로지의 남은 부분은 워드를 카운트 하고 결과를 저장한다.  첫번째 스트림은 word 필드를 group by 하고, 각 그룹은 Count aggregator를 사용하여 결합된다. persitentAggregate function은 어떻게 저장하고 aggregation의 경과를 상태의 소스에 업데이트할지를 안다. word count는 메모리에 유지되지만, 보통 Memcached, Cassandra 또는 다른 persistent store로 변경될 수 있다. 이번 토폴로지에서 Memcached로 변경하고 싶다면 serverLocations에 Memcached 클러스터를 위한 호스트/포트 리스트를 제공해야 한다.


.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))        

MemcachedState.transactional()


persistentAggregate에 의해 저장된 값은 스트림으로 부터 발해된 모든 배치의 aggregation을 나타낸다.


Trident의 한가지 유용한 점은 완전히 실패에 관대하며 정확히 한번 수행된다는 것이다. 이것은 실시간 처리를 쉽게 만들어주는 이유가 된다. 트라이언트는 만약 실패가 발생하거나 재시도를 할 필요가 있기 때문에 상태를 저장한다. 그러므로 똑같은 소스 데이터를 여러번 데이터베이스에 업데이트 하지 않는다.


persistentAggregate 메소드는 Stream을 TridentState 오브젝트로 변경한다. 이 경우 TridentState 오브젝트는 모든 단어 갯수를 나타낸다. 우리는 계산의 분산된 쿼리의 몫으로서 사용할 수 있다.


토폴로지의 다음 부분은 지연없는 분산 쿼리를 구현하는 것이다. 쿼리는 빈칸으로 분리된 단어를 입력으로 해당 단어들의 카운트의 합을 리턴한다. 이러한 쿼리는 일반적인 RPC 호출로 실행될 수 잇다. 여기 이러한 쿼리를 호출할 수 있는 한가지 예가 있다. 


DRPCClient client = new DRPCClient("drpc.server.location", 3772);

System.out.println(client.execute("words", "cat dog the man");

// prints the JSON-encoded result, e.g.: "[[5078]]"


살펴보듯이 Storm 클러스터를 병렬로 실행한다는 것을 제외하면 일반적인 RPC로 보인다. 거의 지연없는 쿼리는 약 10ms 정도를 의미한다. 더 어려운 DRPC 쿼리는 더 많은 코스를 수행하며 계산을 위해 얼마나 많은 리소스를 할당했는지에 따라 지연은 커질수도 작아질수도 있다.


토폴로지의 분산 쿼리 부분의 구현은 다음과 같다.


topology.newDRPCStream("words")

       .each(new Fields("args"), new Split(), new Fields("word"))

       .groupBy(new Fields("word"))

       .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))

       .each(new Fields("count"), new FilterNull())

       .aggregate(new Fields("count"), new Sum(), new Fields("sum"));


똑같은 TridentTopology 오브젝트를 DRPC stream을 생성하기 위해 사용하였다. function이 이름은 words이다. function 명은 DRPCClient를 사용하여 실행을 위해 주어진 첫번째 인자와 일치한다.


각 DRPC 요청은 하나의 작은 배치 프로세싱 작업으로 취급된다.  투플은 args라고 불리우는 하나의 필드를 포함하고, 클라이언트에 의해 제공된 인자를 포함한다. 이러한 경우 인자는 whitespace으로 불리된 단어의 리스트이다.


첫번째 스플릿 펑션은 인자를 스플릿하여 요청을 단어들의 구성으로 만든다. 스트림은 word에 의해 그룹핑되고, stateQuery 오퍼레이터는 TridentState 오브젝트에 쿼리를 하기 위해 사용된다. 이 경우 토폴로지의 다른 부분에 의해 계산된 워드 카운트이다. 이 경우 MapGet 펑션이 실행되는데 각 단어를 위한 카운트를 가지고 오는 것이다. DRPC 스트림은 word 필드에 의한 TridentState로 똑같은 방법으로 그룹핑되낟. 각 워드 쿼리는 TridentState에 의해 정확히 파티션된다. 


다음 워드의 카운트는 FilterNull 필터를 통해 필터가 되지 않고 Sum aggregator에 의해 결과로 합산된다. 그런 다음 Trident는 자동적으로 대기하고 있는 클라이언트로 콜백을 보낸다. 


Trident는 성능을 최대화해서 토폴로지를 실행하기 위한 지식이다. 이 토폴로지 내에서는 두가지 흥미로운 일들이 발생한다.


오퍼레이션은 상태를 읽거나 쓰는 것인데 자동적으로 배치로 수행된다. 만약 20번의 업데이트가 필요하다면 배치 프로세싱으로 만드는게 20번 읽고 쓰는것 보다 낫다. Trident는 자동적으로 읽기와 쓰기를 1번의 읽기 요청과 1번의 쓰기 요청의 배치로 만들어준다. 그래서 각 튜플에 대한 계산만 명시할 수 있어 쉽고, 또한 퍼포먼스도 증대할 수 있다.


Trident aggregator는 매우 효과적이다. 같은 머신에서 모든 투플을 그룹지어 전송하는 것 보다 aggreagtor를 수행하는 것보다 낫다. Trident는 부분적으로 aggregation을 수행한 다음 네트워크로 투플들을 전송한다. 예를 들어 Count aggregator는 각 부분을 카운트를 집계한다음 부분적인 카운트를 네트워크상에 전송한다. 그런 다음 각 부분적인 카운트를 합산하여 총 합을 얻는다. 이 기술은 MapReduce의 combiner로 비슷하다.


이제 Trident의 다른 예제를 살펴보자. 



Comments