반응형

Trident는 스톰 위에서 실시간 집계를 수행하기 위한 하이레벨 추상화이다. 이것은 끊김없이 대용량 처리를 가능하게 해준다.(초당 수백만의 메시지) 지연없는 분산쿼리를 통해 스테이트풀 스트림 프로세싱을 가능하게 한다. Pig나 Cascading과 같은 고수준의 배치 프로세싱 툴에 친숙하다면 Trident의 개념은 매우 유사하다. Trident는 joins, aggregations, groupings, functions, filters 연산을 수행할 수 있다. 트라이던트는 어떤 데이터베이스 또는 퍼시스턴트 스토어의 위에서 스테이풀하고 인크리멘탈 프로세싱을 주로 처리하기 위해 추가되었다. Trident는 일관성 있고, 정확하고 쉽게 수행된다.


Illustrative example


트라이던트에 관한 실증적인 예제를 살펴보자. 이번 예제는 두가지를 수행한다.


문장의 입력 스트림으로부터 단어의 갯수를 계산한다.

단어 리스트의 카운트를 합계하는 쿼리를 구현한다.

이 예제의 목적은 다음 소스와 같이 무한의 문장을 읽는 것이다.


FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,

               new Values("the cow jumped over the moon"),

               new Values("the man went to the store and bought some candy"),

               new Values("four score and seven years ago"),

               new Values("how many apples can you eat"));

spout.setCycle(true);


이 스파우트는 문장 집합을 순환하며 문장 스트림을 생성한다. 다음 코드는 스트림의 단어 부분을 카운팅하는 코드이다.


This spout cycles through that set of sentences over and over to produce the sentence stream. Here's the code to do the streaming word count part of the computation:


TridentTopology topology = new TridentTopology();        

TridentState wordCounts =

     topology.newStream("spout1", spout)

       .each(new Fields("sentence"), new Split(), new Fields("word"))

       .groupBy(new Fields("word"))

       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                

       .parallelismHint(6);


코드를 라인에 따라 읽어보자. 첫번째 TridentTopology 오브젝트가 생성되었다. 이는 트라이언트 계산을 생성하기 위한 인터페이스를 노출한다.


TridentTopology는 newStream이라고 불리우는 메소드를 가지고 있으며 입력 소스로부터 데이터를 읽어 토폴로지에 새로운 스트림 데이터를 생성한다. 이 경우 입력소스는 이전에 정의한 FixedBatchSpout이다. 입력 소스는 Kestrel이나 Kafka와 같은 큐 브로커일 수 있다. Trident는 각 입력 소스를 위한 작은 량의 상태를 주키퍼에 기록하고 있다. " "spout1"이라는 문자열은 주키퍼의 노드를 명시하며 해당 위치에 트라이언트는 메타데이터를 유지한다.


트라이언트는 투플의 작은 배치로서 스트림을 처리한다. 예를들어 문장의 입력된 스트림은 다음과 같이 작은 배치로 쪼개질 수 있다.



일반적으로 이러한 작은 배치 사이즈는 수천 또는 수백만의 투플일 수가 있는데 이는 입력된 처리량에 따른다.

트라이언트는 작은 배치를 처리하기 위한 완전히 독립적인 배치 프로세싱 API를 제공한다.  API는 Pig나 Cascading과 같은 하둡의 고수준의 추상화 API와 매우 비슷한다. : group by와 join 및 aggregation을 수행할 수 있고,  function을 실행할 수 있으며, filter를 실행할 수 있다. 물론 각각의 작은 배치 처리는 독립적이고 서로 관심을 갖지 않는다. 그래서 트라이언트는 배치를 전체적으로 aggregation을 수행할 수 있는 기능을 제공하고 이러한 aggregation을 저장할 수 있게 한다. - 메모리든, Memcached던 Cassandra든 또는 어떤 스토어든. 마침내 트라이언트는 실시간 상태를 질의할 수 있는 첫번째 기능을 가졌다. 상태는 트라이언트에 의해 업데이트 될 것이다. 그것은 상태의 독립된 소스일 수 있다.


public class Split extends BaseFunction {

   public void execute(TridentTuple tuple, TridentCollector collector) {

       String sentence = tuple.getString(0);

       for(String word: sentence.split(" ")) {

           collector.emit(new Values(word));                

       }

   }

}


보시다시피 매우 단순하다. 단순히 문장을 잡아서 whitespace로 분리를 하고 각각의 단어로 투플을 발행한다.


토폴로지의 남은 부분은 워드를 카운트 하고 결과를 저장한다.  첫번째 스트림은 word 필드를 group by 하고, 각 그룹은 Count aggregator를 사용하여 결합된다. persitentAggregate function은 어떻게 저장하고 aggregation의 경과를 상태의 소스에 업데이트할지를 안다. word count는 메모리에 유지되지만, 보통 Memcached, Cassandra 또는 다른 persistent store로 변경될 수 있다. 이번 토폴로지에서 Memcached로 변경하고 싶다면 serverLocations에 Memcached 클러스터를 위한 호스트/포트 리스트를 제공해야 한다.


.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))        

MemcachedState.transactional()


persistentAggregate에 의해 저장된 값은 스트림으로 부터 발해된 모든 배치의 aggregation을 나타낸다.


Trident의 한가지 유용한 점은 완전히 실패에 관대하며 정확히 한번 수행된다는 것이다. 이것은 실시간 처리를 쉽게 만들어주는 이유가 된다. 트라이언트는 만약 실패가 발생하거나 재시도를 할 필요가 있기 때문에 상태를 저장한다. 그러므로 똑같은 소스 데이터를 여러번 데이터베이스에 업데이트 하지 않는다.


persistentAggregate 메소드는 Stream을 TridentState 오브젝트로 변경한다. 이 경우 TridentState 오브젝트는 모든 단어 갯수를 나타낸다. 우리는 계산의 분산된 쿼리의 몫으로서 사용할 수 있다.


토폴로지의 다음 부분은 지연없는 분산 쿼리를 구현하는 것이다. 쿼리는 빈칸으로 분리된 단어를 입력으로 해당 단어들의 카운트의 합을 리턴한다. 이러한 쿼리는 일반적인 RPC 호출로 실행될 수 잇다. 여기 이러한 쿼리를 호출할 수 있는 한가지 예가 있다. 


DRPCClient client = new DRPCClient("drpc.server.location", 3772);

System.out.println(client.execute("words", "cat dog the man");

// prints the JSON-encoded result, e.g.: "[[5078]]"


살펴보듯이 Storm 클러스터를 병렬로 실행한다는 것을 제외하면 일반적인 RPC로 보인다. 거의 지연없는 쿼리는 약 10ms 정도를 의미한다. 더 어려운 DRPC 쿼리는 더 많은 코스를 수행하며 계산을 위해 얼마나 많은 리소스를 할당했는지에 따라 지연은 커질수도 작아질수도 있다.


토폴로지의 분산 쿼리 부분의 구현은 다음과 같다.


topology.newDRPCStream("words")

       .each(new Fields("args"), new Split(), new Fields("word"))

       .groupBy(new Fields("word"))

       .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))

       .each(new Fields("count"), new FilterNull())

       .aggregate(new Fields("count"), new Sum(), new Fields("sum"));


똑같은 TridentTopology 오브젝트를 DRPC stream을 생성하기 위해 사용하였다. function이 이름은 words이다. function 명은 DRPCClient를 사용하여 실행을 위해 주어진 첫번째 인자와 일치한다.


각 DRPC 요청은 하나의 작은 배치 프로세싱 작업으로 취급된다.  투플은 args라고 불리우는 하나의 필드를 포함하고, 클라이언트에 의해 제공된 인자를 포함한다. 이러한 경우 인자는 whitespace으로 불리된 단어의 리스트이다.


첫번째 스플릿 펑션은 인자를 스플릿하여 요청을 단어들의 구성으로 만든다. 스트림은 word에 의해 그룹핑되고, stateQuery 오퍼레이터는 TridentState 오브젝트에 쿼리를 하기 위해 사용된다. 이 경우 토폴로지의 다른 부분에 의해 계산된 워드 카운트이다. 이 경우 MapGet 펑션이 실행되는데 각 단어를 위한 카운트를 가지고 오는 것이다. DRPC 스트림은 word 필드에 의한 TridentState로 똑같은 방법으로 그룹핑되낟. 각 워드 쿼리는 TridentState에 의해 정확히 파티션된다. 


다음 워드의 카운트는 FilterNull 필터를 통해 필터가 되지 않고 Sum aggregator에 의해 결과로 합산된다. 그런 다음 Trident는 자동적으로 대기하고 있는 클라이언트로 콜백을 보낸다. 


Trident는 성능을 최대화해서 토폴로지를 실행하기 위한 지식이다. 이 토폴로지 내에서는 두가지 흥미로운 일들이 발생한다.


오퍼레이션은 상태를 읽거나 쓰는 것인데 자동적으로 배치로 수행된다. 만약 20번의 업데이트가 필요하다면 배치 프로세싱으로 만드는게 20번 읽고 쓰는것 보다 낫다. Trident는 자동적으로 읽기와 쓰기를 1번의 읽기 요청과 1번의 쓰기 요청의 배치로 만들어준다. 그래서 각 튜플에 대한 계산만 명시할 수 있어 쉽고, 또한 퍼포먼스도 증대할 수 있다.


Trident aggregator는 매우 효과적이다. 같은 머신에서 모든 투플을 그룹지어 전송하는 것 보다 aggreagtor를 수행하는 것보다 낫다. Trident는 부분적으로 aggregation을 수행한 다음 네트워크로 투플들을 전송한다. 예를 들어 Count aggregator는 각 부분을 카운트를 집계한다음 부분적인 카운트를 네트워크상에 전송한다. 그런 다음 각 부분적인 카운트를 합산하여 총 합을 얻는다. 이 기술은 MapReduce의 combiner로 비슷하다.


이제 Trident의 다른 예제를 살펴보자. 



반응형
반응형

프로덕션 클러스터에서 토폴로지를 실행하는 것은 로컬 모드와 유사하다. 다음과 같은 스텝을 따른다.


1) 토폴로지를 정의한다.


2) 클러스터에 토폴로지를 서브밋하기 위해 StormSubmitter를 사용한다. StormSubmitter는 토폴로지의 이름, 토폴로지 설정 그리고 토폴로지를 입력받는다. 예제는 다음과 같다.


Config conf = new Config();

conf.setNumWorkers(20);

conf.setMaxSpoutPending(5000);

StormSubmitter.submitTopology("mytopology", conf, topology);


3) 코드와 코드에 모든 디펜던시를 포함한 jar를 만든다. (storm 관련 jars는 워커 노드에 의해 제공되니 제거한다.)


만약 메이븐을 사용한다면 Assembly plugin이 도움이 될 것이다. 다음과 같이 pom.xml에 기술하면 된다.


  <plugin>

    <artifactId>maven-assembly-plugin</artifactId>

    <configuration>

      <descriptorRefs>  

        <descriptorRef>jar-with-dependencies</descriptorRef>

      </descriptorRefs>

      <archive>

        <manifest>

          <mainClass>com.path.to.main.Class</mainClass>

        </manifest>

      </archive>

    </configuration>

  </plugin>


mvn assembly:assembly를 실행하면 적절히 패키지된 jar를 얻을 수 있다. 


4) 스톰 클라이언트를 이용해 topology를 클러스터에 서브밋하자. 각 인자를 사용할수도 있다.


storm jar path/to/allmycode.jar org.me.MyTopology arg1 arg2 arg3


스톰 jar는 클러스터에 submit되고 설정된 StormSubmitter 클래스는 즉시 클러스터와 통신을 할것이다. 예를 들어 스톰 jar 업로드 된 후  MyTopology의 "arg1", "arg2", "arg3"의 인자로 메인 함수를 호출할 것이다.


Common configurations


토폴로지를 설정할 수 있는 다양한 설정들이 존재한다. TOPOLOGY 라는 접두어를 가진 것들은 TOPOLOGY 기반 설정으로 오버라이드될 수 있다. 다음은 토폴로지를 위해 세팅할 수 있는 내용들이다.


  • Config.TOPOLOGY_WORKERS: 토폴로지를 실행할때 사용할 워커 프로세스의 수. 예를들어 25로 세팅하면 클러스터내에 25개의 자바 프로세스가 뜬다. 만약 150 parallelism을 수행하면 각 프로세스는 6개의 타스크를 스레드로 수행시킨다.
  • Config.TOPOLOGY_ACKER_EXECUTORS: 이 세팅은 투플 트리를 추적하고 분출된 투플이 완전히 수행되었는지를 검사하기 위함이다. Acker는 스톰의 reliability를 위한 필수적인 요소이다. 자세한 내용은 Guaranteeing message processing에서 알아볼 것이다. 세팅을 하지 않거나 null로 세팅을 하면 storm은 acker executor를 수를 토폴로지의 워커 프로세스의 수와 동일하게 할 것이다. 0으로 세팅하면 스톰은 spout에서 분출한 즉시 ack를 수행하여 reliability를 비활성화 할 것이다.
  • Config.TOPOLOGY_MAX_SPOUT_PENDING:  이것은 하나의 spout task에서 분출한 투플의 최대 펜딩 숫자이다.(펜딩은 투플이 아직 ack하지 않았거나 실패를 하지 않았다는 것을 의미한다). 큐가 폭발하는 것을 방지하기 위해 꼭 설정할 것을 권고한다.
  • Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS: 분출한 투플이 완전히 처리되는데 소요되는 최대 시간이다. 이 시간동안 완전히 처리되지 않았다면 실패로 간주한다. 기본값은 30초이며 대부분의 토폴로지에서 충분하다.
  • Config.TOPOLOGY_SERIALIZATIONS: 투플내에 커스텀 타입을 사용하기 위해 이 설정을 이용해서 더 많은 serializer를 등록할 수 있다. 


Killing a topology


토폴로지를 중지하기 위해선 단순히 다음을 실행하면 된다.


storm kill {stormname}


토폴로지를 서브밋할때 사용한 이름을 똑같이 입력하자.


스톰은 즉시 토폴로지를 중단시키지 않는다. 대신에 모든 spout들을 비활성화시키고, 스톰은 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS에 설정된 시간만큼 대기한 다음에 모든 워커를 중단시킨다. 이것은 토폴로지에게 수행중인 투플을 완전히 처리할 수 있도록 충분한 시간을 제공하기 위함이다.


Updating a running topology


실행중인 토폴로지를 업데이트하기 위해 유일한 옵션은 현재 토폴로지를 중단하고 새로운 것을 다시 서브밋하는 것이다. 


Monitoring topologies


토폴로지를 모니터링 하기 위한 가장 좋은 방법은 Storm UI를 활용하는 것이다. Storm UI는 task내에서 발생한 에러에 대한 정보를 제공하고 처리량 및 각 수행중인 토폴로지의 컴포넌트에 대한 지연등을 잘 정리된 통계로 제공한다.


또한 클러스터 머신에 대한 worker 로그를 살펴볼 수 있다.

반응형

'Programming > Storm' 카테고리의 다른 글

Apache Storm - Trident Tutorial  (0) 2015.12.07
Apache Storm - Concepts  (0) 2015.11.19
Apache Storm - Setting Up a Development Environment  (0) 2015.10.30
Apache Storm - Tutorial  (0) 2015.10.29
Apache Storm - Setting up a Storm Cluster  (1) 2015.10.27
반응형

이번 페이지에서는 스톰의 메인 컨셉을 나열해보고 더 자세한 정보를 제공에 대한 링크를 제공할 것이다. 논의될 컨셉은 다음과 같다.


  • Topologies
  • Streams
  • Spouts
  • Bolts
  • Stream groupings
  • Reliability
  • Tasks
  • Workers

Topologies


실시간 어플리케이션을 위한 로직은 스톰 토폴로지로 패키징 된다. 스톰 토폴로지는 맵리듀스 작업과 유사하다. 하나의 주요한 차이점은 맵리듀스 작업은 결국에는 종료되는 반면 토폴로지는 항상 수행중이다는 것이다. 토폴로지는 spout과 bolt의 stream grouping으로 연결된 그래프이다. 


Streams


스트림은 스톰의 핵심 추상화이다. 스트림은 투플의 일관되지 않는 순서이다. 투플은 분산환경에서 병렬로 처리되고 생성된다. 스트림은 스키마로 정의되어지는데 스키마는 스트림의 투플들의 필드명을 의미한다. 기본적으로 투플은 integers, longs, shorts, bytes, strings, doubles, floats, booleans, and byte arrays을 포함할 수 있다. 또한 커스텀한 serializer를 구현해서 사용할 수 있다.


모든 스트림은 정의 시 주어진 아이디를 가지고 있다. 싱글 스트림 spout과 bolt는 일반적이기 때문에 OutputFieldsDeclarer는 특정한 아이디없이 싱글 스트림을 정의하기 위한 편리한 메소드를 제공한다. 아 경우 스트림은 default라는 default id를 가진다.


Resources:


Tuple: 스트림은 투플들로 구성된다.

OutputFieldsDeclarer: 스트림과 그의 스키마를 정의하기 위해 사용된다.

Serialization: 투플의 다이나믹 타이핑 및 커스텀 serialization 정의에 대한 정보ISerialization: 커스텀 serializer는 반드시 이 인터페이스를 구현해야 한다. 

CONFIG.TOPOLOGY_SERIALIZATIONS: 커스텀 serializer는 이 설정을 이용해서 등록해야 한다. 


Spouts


Spout은 토폴로지 상의 스트림의 소스이다.  일반적으로 spout은 외부 소스로부터 투플을 읽어서 토폴로지 상에 투플을 발행한다. Spout은 reliable일수도 있고 unreliable일 수 있다. reliable일 경우 spout은 스톰에 의해 처리에 실패할 경우 다시 실행할 수 있는 기능을 가지게 되고, 반면에 unreliabe일 경우에는 다시 발행하는 순간 투플에 관한 내용을 잊어버린다.


Spout은 하나의 스트림 이상에 투플을 발행할 수 있다. 이렇게 하기 위해서는 OutputFieldsDeclarer의 declareStream 메소드를 이용하여 다수의 스트림을 정의하고, SpoutOutputCollector의 emit 메소드를 이용할 때 스트림을 명시해주면 된다.


Spout의 가장 주요한 메소드는 nextTuple이다. nextTuple은 새로운 투플을 발행할 수도 있고 만약 발행할 투플이 없다고 하면 단순히 리턴할수도 있다. 


다른 spout 구현을 위해 nextTuble은 절대 블락하지 않아야 한다. 기 이유는 스톰은 모든 spout 메소드를 하나의 스레드에서 호출하기 때문이다. 


또 다른 spout의 주요한 메소드는 ack와 fail이다. Spout으로부터 발행된 투플이 토폴로지를 완전히 완료하거나 완료에 실패했을때 이 메서드가 호출된다. ack와 fail은 reliable spout에서만 호출된다.


Resources:


IRichSpout: spout은 해당 인터페이스를 구현해야 한다.


Bolts


토폴로지의 모든 처리는 볼트내에서 완료된다. 볼트는 다음과 같은 모든것을 수행할 수 있다. (filtering, functions, aggregations, joins, talking to databases, and more.)


볼트는 단순한 스트림의 변경을 수행할 수 있다. 또한 복잡한 스트림 변경을 위해서는 종종 다수의 스텝 즉 다수의 볼트가 필요하기도 한다. 예를들어 tweet의 스트림을 trending images로 변경하기 위해서는 최소한 두개의 스텝이 필요하다. 


볼트 또한 하나 이상의 스트림에 발행할 수 있다. 이렇게 하기 위해서는 OutputFieldsDeclarer의 declareStream 메소드를 이용하여 다수의 스트림을 정의하고, SpoutOutputCollector의 emit 메소드를 이용할 때 스트림을 명시 해주면 된다.


볼트의 입력 스트림을 정의할 때는 항상 다른 컴포넌트의 특정한 스트림을 subscribe해야 한다. 만약 다른 컴포넌트로 부터 모든 스트림을 subscribe하기를 원한다면 개별적으로 각각 하나씩 subscribe를 해야 한다. InputDeclarer는 기본 stream id로 정의된 스트림을 명시하기 위한 syntatic sugar를 가지고 있다. declarer.shffleGrouping("1")은 컴포넌트 "1"의 기본 스트림을 구독하고 declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)과 같은 의미이다.


볼트의 주요 메소드는 execute이다. 볼트는 OutputCollect 오브젝트를 사용하여 새로운 투플을 발행할 수 있고, 볼트는 그들이 처리한 모든 투플을 위해 OutputCollector의 ack 메소드를 반드시 호출해야 하는데, 그래야 스톰이 투플이 완료되었는지를 알 수 있기 때문이다. 입력한 투플을 처리하는 일반적인 경우 0개나 그 이상의 투플을 발행하게 되는데 그때 입력 투플을 acking해야 한다. Storm IBasicBolt 인터페이스를 제공하는데 이 인터페이스를 구현하면 자동으로 acking이 수행된다.


OutputCollector는 스레드 세이프하지 않다. 모든 emits, acks, fails 하나의 스레드에서 발생한다.


Resources:


IRichBolt: 볼트를 위한 일반적인 인터페이스

IBasicBolt: 필터링 또는 단순한 기능을 수행하기 위해 편리하게 사용할 수 있는 인터페이스 

OutputCollector: 볼트가 그들의 출력 스트림으로 투플을 발행할 때 사용하는 클래스


Stream groupings


토폴로지 정의의 한 부분으로 각 볼트를 위해 그들의 받을 입력을 명시할 수 있다. Stream grouping은 볼트의 작업들 사이에 스트림을 어떻게 분할할지를 명시한다.


스톰에는 8개의 빌트인 stream grouping이 있다. 커스텀 stream grouping을 구현하고자 한다면 CustomStreamGrouping 인터페이스를 구현하면 된다.


Shuffle grouping: 투플이 랜덤하게 볼트의 작업으로 분배된다. 이 경우 각 볼트는 같은 수의 투플을 처리하게 된다.

Fields grouping: 스트림은 그룹핑에 명시된 필드로 분할된다. 예를 들어 스트림이 user-id라는 필드로 그룹핑되어 있다면, 똑같은 user-id를 가진 투플은 항상 똑같은 작업으로 전달될 것이다. user-id가 다른 값을 가지면 다른 타스크로 전달된다.

Partial Key grouping: 스트림은 그룹핑에 명시된 필드에 의해 파티션된다. Fields 그룹핑과 유사하지만 두개의 downstream 볼트 사이에 로드 밸런스가 수행된다. 더 좋은 리소스 활용이 제공되어 한쪽으로 쏠려서 데이터가 몰리는 것이 방지된다. 

All grouping : 스트림은 모든 볼트 타스크에 복제된다. 이 그룹핑을 사용할때는 특별히 유의하자.

Global groping : 전체 스트림이 하나의 볼트 타스크로 전달된다. 구체적으로 말하자면 lowest id의 타스크로 전달된다.

None grouping : 이 그룹핑은 스트림이 어떻게 그룹핑되는지 신경쓰지 않는다는 것이다. 현재 none groupnig은 shuffle grouping과 동일하다.

Direct grouping : 이것은 특별한 타입의 그룹핑이다. 이 방식에서 그룹핑된 스트림은 투플의 producer가 투플의 consumer를 지정한다. Direct grouping에서는 direct stream으로 명시된 stream에서만 정의될 수 있다. Direct stream이 발행된 투플은 반드시 emitDirect method를 사용해서 발행되어야만 한다. 볼트는 TopogyContext로 부터 제공되어진 consumer들의 task 아이디를 조회할 수 있고, OutputCollector의 emit메소드의 출력을 추적할 수 잇다.

Local or shuffle grouping: 타겟 볼트가 하나 또는 그 이상의 타스크를 같은 워커 프로세스에서 가지고 있다면 투플은 인프로세스 파스크로 전달된다.


Resources:


TopologyBuilder: 토폴로지를 정의하기 위해 이 클래스를 사용한다.

InputDeclarer: TopologyBuilder의 setBolt메소드를 호출하면 이 오브젝트가 리턴된다. 볼트의 input stream을 정의하기 위해 사용되며 스트림을 어떻게 그룹핑할지를 명시한다.

CoordinatedBolt: 분산 RPC 토폴로지에 유용한 볼트이다. Direct stream과 direct grouping을 heavy하게 사용할 수 있게 해준다.


Reliability


스톰은 모든 spout 투플이 토폴로지에 의해 완전하게 처리되는 것을 보장한다. 이것은 모든 spout 투플에 의해 트리거된 투플 트리 추적에 의해 달성된다. 투플 트리가 성공적으로 완료되었는지를 판단한다. 모든 토폴로지는 message timeout을 가진다. 만약 타임아웃내에 완료되지 않으면 스톰은 투플의 실패로 감지하고 이후 투플을 다시 실행시킨다. 


스톰의 reliablity의 이점을 누릴려면 투플 트리에 새로운 엣지가 생기면 반드시 스톰에게 알려야 한다. 이것은 볼트가 투플을 발행하기 위해 사용하는 OutputCollect 오브젝트를 이용하여 수행할 수 있다. Anchoring이 완료되면 명시적으로 투플이 완료되었다고 ack 메소드를 사용해야 한다.


Tasks


각 spout과 볼트는 클러스터상의 많은 작업을 수행한다. 각 작업은 하나의 스레드의 수행과 일치하며 stream 그룹핑은 하나의 타스크를 또 다른 파스크로 어떻게 전달하는지를 명시한다. 각 spout과  bolt의 parallelism을 정의할 수 있다. 이 때 TopologyBuilder의 setSpout과 setbolt 함수를 이용한다.


Workers


토폴로지는 하나 또는 그 이상의 worker 프로세스를 실행한다. 각 워커 프로세스는 하나의 물리적인 JVM이며 토폴로지를 위한 모든 타스크의 서브셋을 실행한다. 예를 들어 만약 토폴로지가 300개의 병렬 타스크로 구성되어 있고 50개의 워커 스레드가 있다면 각 워커는 6개의 타스크가 할당된다.


Resources:


Config.TOPOLOGY_WORKERS: 토폴로지를 실행하기 위한 워커의 수는 다음의 설정에 명시할 수 있다.

반응형
반응형

이번 페이지는 어떻게 스톰 개발환경을 세팅하는 지에 대해 설명한다. 요약하면 스텝은 다음과 같다.


스톰 릴리이즈를 다운로드하고 압축을 해제한다. PATH 환경변수에 bin 디렉토리를 등록한다.

리모트 클러스터에 있는 토폴로지를 시작하고 종료하기 위해서 클러스터 정보를 ~/.storm/storm.yaml를 넣는다.

각 스텝의 자세한 내용은 아래와 같다.


What is a development environment?


스톰은 두가지 모드를 가지고 있다. : 로컬 모드와 리모트 모드. 로컬 모드에서는 로컬머신에서 토폴로지를 개발하고 완전히 테스트하는 것을 가능하게 한다. 리모트 모드에서는 클러스터 머신에서 토폴로지를 보내 실행할 수 있게 한다.


스톰 개발 환경은 로컬 모드에서 토폴리지를 개발하고 테스트할 수 있는 모든 것을 설치할 수 있다. 또한 리모트 서버에서 실행을 위해 토폴로지를 패키지할 수 있고, 리모트 클러스터의 토폴로지를 서브밋하고 종료할 수 있다.


빨리 로컬 머신과 리모트 클러스터간의 관계를 살펴보자. 스톰 클러스터는 Nimbus라고 불리우는 마스터 노드에 의해 관리된다. 당신의 머신은 Nimus와 코드를 보내기 위해 커뮤니케이션을 하고 클러스터에서 토폴로지를 실행하기 위해 커뮤니케이션을 한다. Nimbus는 클러스터에 코드를 분산하고 토폴로지를 실행하기 위한 worker를 할당한다. 로컬 모신에서 storm이라고 불리우는 커맨드라인 클라이언트를 사용해서 님저스와 커뮤니케이션을 할 수 있다. 스톰 클라이언트는 리모트 모드를 위해서만 사용된다. : 로컬 모드에서 토폴로지를 개발하고 테스트하는데는 필요가 없다는 말이다.


Installing a Storm release locally


로컬 머신으로 부터 리모트 클러스터로 토폴로지를 서브밋하기 위해서는 스톰 릴리이지를 로컬에 설치해야 된다. 스톰 릴리이즈를 설치하면 스톰 클라이언트를 활용해 리모트 클러스터와 통신을 할 수 있다. 스톰을 로컬에 설치하기 위해서는 릴리이즈를 다운받고 컴퓨터 어딘가에 압축을 해제해야 한다. 그런 다음 PATH 환경 변수에 bin 디렉토리를 추가해서 storm 스크립트가 실행되는지 확인해 보자.


설치된 스톰 릴리이즈는 단지 리모트 클러스터와 통신하기 위해서만 이용된다. 로컬 모드에서 토폴로지를 개발 및 테스트하기 위해서는 프로젝트에 storm을 개발 디펜던시에 추가하면 된다. 자세한 내용은 Using Maven 글을 참고하라.


Starting and stopping topologies on a remote cluster


이전 스텝에서 리모트 클러스터와 커뮤니케이션을 위한 스톰 클라이언트를 설치하였다. 이제 클라이언트를 이용해 리모트 클러스터와 커뮤니케이션을 해보자.  이를 위해 마스터의 호스트 주소를 ~/.storm/storm.yaml 파일에 적어야 한다. 


nimbus.host: "123.45.678.890"


대안으로 AWS 스톰 클러스터를 위해 준비된 storm-deploy 프로젝트를 사용한다면 자동으로 ~/.storm/storm.yaml 파일을 세팅할 수 있다. 


반응형

'Programming > Storm' 카테고리의 다른 글

Apache Storm - Trident Tutorial  (0) 2015.12.07
Apache Storm - Running Topologies on a Production Cluster  (0) 2015.11.20
Apache Storm - Concepts  (0) 2015.11.19
Apache Storm - Tutorial  (0) 2015.10.29
Apache Storm - Setting up a Storm Cluster  (1) 2015.10.27
반응형

이번 튜토리얼에서는 어떻게 스톰 토폴로지를 만들고 스톰 클러스터에 배포하는지에 대해서 배울 것이다. 자바를 메인 언어로 사용할 것이지만 몇가지 예제는 파이썬을 사용할 것이다.


Preliminaries


이번 예제는 storm-starter 프로젝트에 있다. 예제를 따라하기 위해 프로젝트를 클론하는 것을 추천한다.  먼저 개발 환경 세팅하는 법을 일고 수행하고 새로운 스톰 프로젝트를 머신에 셋업하라. 


Components of a Storm cluster


스톰 클러스터는 표면적으로 보면 하둡 클러스터와 닮았다. 하둡은 맵리듀스 작업을 실행하는 반면에 스톰은 토폴로지를 실행한다. Job과 topology는 매우 다르다. 가장 주요하게 다른 점은 맵리듀스는 결국은 작업이 끝나는 반면, 토폴로지 영원히 메시지를 처리한다.(죽일때까지)


스톰 클러스터에는 두가지 종류의 노드가 있다. : 마스터 노드와 워커 노드이다. 마스터 노드는 Nimbus라고 불리는 데몬을 실행한다. 이는 하둡의 JobTracker와 유사하다. Nimbus는 클러스터에 코드를 분배하고 머신에 타스크를 할당하며 실패를 모니터링 한다. 


각 워커 노드는 Supervisor라고 불리우는 데몬을 실행한다. Supervisor는 작업을 대기하고 있다가 Nimbus에 의해 작업이 할당되면 필요에 의해 워커 프로세스를 실행하거나 중지한다. 각 워커 프로세는 토폴로지의 서브셋으로 실행된다. : 실행중인 토폴로지는 많은 머신에 분배된 많은 수의 워커 프로세스로 구성된다.




Nimbus와 Supervisor간의 모든 코디네이션은 Zookeeper 클러스터를 통해 이루어진다. 그리고 Nimbus 데몬과 Supvervisor 데몬은 fail-fast하며 stateless하다. 모든 상태는 주키퍼나 로컬 디스크에서 저장된다. 이 의미는 kill -9 로 Nimbus 나 Supervisor를 죽여도 된다는 것을 의미하며 다시 시작하면 아무 일이 없었던 것 처럼 백업이 실행된다. 이러한 설계는 스톰 클러스터는 거의 완벽하게 stable하게 만들어 준다.


Topologies


스톰에서 실시간 처리를 위해 우리는 토폴로지라고 불리는 것을 생성한다. 토폴로지는 계산의 그래프이다. 토폴로지의 각 노드는 처리 로직을 포함한다. 그리고 링크를 통해 노드간 데이터 전달을 지정할 수 있다.


토폴로지를 실행하는 것은 단순하다. 먼저 코드를 패키지하고 하나의 jar에 디펜던시를 넣어라. 그런 다음 다음과 같은 명령을 수행하라.


storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2


이렇게 하면 backtype.storm.MyTopology 클래스가 인자 arg1과 arg2와 함께 동작을 할 것이다. 클래스의 main 함수는 토폴로지를 정의하고 그것을 Nimbus로 보낸다. storm jar 부분은 Nimbus와의 연결을 관리하고 jar을 업로드 시킨다. 


토폴로지 정의는 Thrift 구조이기 떄문에 Nimbus는 Thrift 서비스이며, 당신은 어떤 프로그래밍 언어를 이용하더라도 토폴로지를 생성하고 서브밋할 수 있다. 위의 예제는 JVM 기반 언어로 부터 수행하는 가장 쉬운 방법이다.


Streams


스톰의 핵심 추상화는 stream 이다. stream은 tuple의 불규칙한 순서이다. 스톰은 스트림을 새로운 스트림으로 변경할 수 있는 신뢰할 수 있는 분산 환경을 기본적으로 제공한다. 예를 들어 트워터의 스트림을 트렌드 토픽의 스트림으로 변경할 수 있다.


가장 기본적으로 스톰이 스트림 변경을 이해 제공하는 것은 spouts와 bolts이다. Spouts와 bolts는 어플리케이션에 명시한 로직을 실행하기 위한 구현해야 하는 인터페이스이다.


Spout은 스트림의 소스이다. Spout은 Kestrel 큐로부터 튜플을 읽어서 스트림으로 발행할 수 있다. 또는 spout은 트위터 API와 연결하여 트윗을 스트림으로 발행할 수 있다.


Bolt는 입력 스트림을 소비하고 몇가지 프로세싱을 수행하고 새로운 스트림을 발행한다. 트윗 스트림으로부터 트렌드 토픽으로 변환과 같은 복잡한 스트림 변경은 다수의 스텝이 필요하며 그러므로 다수의 bolt가 처리한다. Bolts는 기능을 수행하거나 tuple을 필터링하거나 스트림을 합치거나 데이터베이스와 통신하거나 어떤것이든 수행할 수 있다.


Spout과 bolt의 네트워크는 topology로 패키지 된다. 토폴로지는 탑레벨 추상화이며 실행을 위해 스톰 클러스터에 서브밋된다. 토폴로지는 각 노드 spout이나 bolt로 구성된 스트림 처리 그래프이다. 그래프의 엣지는 스트림은 어떤 bolt가 subscribe할지를 나타낸다. Spout이나 bolt가 tuple을 stream으로 발행하면 스트림을 구독하는 모든 볼트에게 투플이 전송된다.



링크는 토폴로지내에서 노드간 어떻게 투플이 전달되는지를 나타낸다. 예를들어 Spout A로 부터 Bolt B로의 링크가 있고 Bolt B로부터 Bolt C로의 링ㅋ가 있다면 Spout A가 튜플을 발행할때마다 튜플은 Bolt B 그리고 Bolt C로 전송될 것이다. 


스톰 토폴로지에서 각 노드는 병렬로 실행된다. 토폴로지에 각 노드가 얼마나 많은 병렬수를 처리할 지를 명시할 수 있다. 그러면 스톰은 실행을 위한 스레드들을 클러스터에 할당할 것이다.


토폴로지는 항상 실행된다. 죽기 전까지. 스톰은 자동으로 실패한 타스크를 재할당한다. 또한 스톰은 데이터 유실이 없도록 보장한다. 머신이 다운되거나 메시지가 drop되더라도


Data model


스톰은 투플을 데이터 모델로써 이용한다. 투플은 값의 명명된 리스트이다. 투플의 필드는 어떤 타입이 오브젝트든 가능하다. 스톰은 기본으로 모든 프리미티브 타입, 스트링 바이트 배열을 투플의 필드값으로 지원한다. 다른 타입의 오브젝트를 사용하기 위해서는 해당 타입을 위한 serializer를 구현하면 된다.


토폴로지의 모든 노드는 발행한 투플을 위한 출력 필드를 정의해야 된다. 예를 들어 아래의 볼트는 double과 triple을 필드로 가지는 2개의 투플을 발행할 것이다.


public class DoubleAndTripleBolt extends BaseRichBolt {

    private OutputCollectorBase _collector;


    @Override

    public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {

        _collector = collector;

    }


    @Override

    public void execute(Tuple input) {

        int val = input.getInteger(0);        

        _collector.emit(input, new Values(val*2, val*3));

        _collector.ack(input);

    }


    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("double", "triple"));

    }    

}


declareOutputFields 함수는 컴포넌트를 위한 출력 필드 ["double", "triple"]을 정의한다. 볼트의 나머지 부분은 다음 섹션에서 설명한다.


A simple topology


개념을 탐색하고 코드가 어떻게 구성되는지를 살펴보기 위해 단순한 토폴로지를 살펴보겠다. storm-starter의 ExclamationTopology를 살펴보자.


TopologyBuilder builder = new TopologyBuilder();        

builder.setSpout("words", new TestWordSpout(), 10);        

builder.setBolt("exclaim1", new ExclamationBolt(), 3)

        .shuffleGrouping("words");

builder.setBolt("exclaim2", new ExclamationBolt(), 2)

        .shuffleGrouping("exclaim1");


위의 토폴폴지는 하나의 spout과 두개의 볼트로 구성된다. Spout은 words를 발행하고 각 볼트는 입력에 "!!!"을 붙인다. 각 노드는 라인상에 위치한다. : spout이 첫번째 볼트로 발행하면 첫번째 볼트가 두번째 볼트로 발행한다. 만약 spout이 "bob"과 "john"이라는 투플을 발하면 두번째 볼트는 "bob!!!!!!"과 "john!!!!!!"을 발행할 것이다.


setSpout 과 setBolt 메소드를 이용하여 노드들을 정의할 수 있다. 이러한 메소드는 유저가 명시한 id를 입력으로 프로세싱 로직의 객체로 포함한다. 각 노드를 위한 최대 병렬수를 지정할 수 있다. 예를들면 spout은 "words"라는 아이디를 가지고 각 볼트들은 "excliam1"과 "excliam2"를 아이디로 가진다.


프로세싱 로직에 포함되는 오브젝트들은 spout는 IRichSpout, bolts는 IRichBolt를 구현해야 한다.


마지막 파라미터는 노드가 얼마나 많은 병렬처리를 수행할 수 있는지이다. 이것을 지정하면 클러스터내에서 해당 수에 해당하는 스레드가 실행될 것이다. 이것을 생략하면 스톰은 각 노드를 위해 하나의 스레드만 할당 할 것이다.


setBolt 메소드는 InputDeclarer 오브젝트를 리턴한다. 이것은 bolt의 입력을 정의하기 위해 이용된다. 여기서는 컴포넌트 "exclaim1"은 컴포넌트 "words"로 부터 발생하는 투플을 suffle grouping으로 읽겠다고 정의하고 있으며, "exclaim2"는 "exclaim1"으로 부터 shuffle grouping을 통해서 읽겠다고 정의하고 있다. shuffle grouping의 의미는 입력 task로부터 투플을 랜덤하게 bolt task로 분배하겠다는 것을 의미한다. 컴포넌트 간의 데이터를 그룹핑하는 방법은 여러가지가 있다. 이는 다음에 살펴보자.


만약 "excliam2"가 "words"와 "excliam2" 컴포넌트로 부터 발행한 모든 투플을 읽겠다면 다음과 같이 정의할 수 있다.


builder.setBolt("exclaim2", new ExclamationBolt(), 5)

            .shuffleGrouping("words")

            .shuffleGrouping("exclaim1");



As you can see, input declarations can be chained to specify multiple sources for the Bolt.


토폴로지에 있는 spout과 bolts의 구현에 대해서 더 살펴보자. 토폴로지에 있는 TestWordSpout 리스트에 있는 단어 ["nathan", "mike", "jackson", "golda", "bertels"] 중에 하나를 100ms마다 발핸한다. nextTuple의 구현은 다음과 같다.


public void nextTuple() {

    Utils.sleep(100);

    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};

    final Random rand = new Random();

    final String word = words[rand.nextInt(words.length)];

    _collector.emit(new Values(word));

}


ExclamationBolt는 입력에 "!!!" 문자열을 붙인다. ExclamationBolt의 구현은 다음과 같다. 


public static class ExclamationBolt implements IRichBolt {

    OutputCollector _collector;


    @Override

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

        _collector = collector;

    }


    @Override

    public void execute(Tuple tuple) {

        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));

        _collector.ack(tuple);

    }


    @Override

    public void cleanup() {

    }


    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

    }


    @Override

    public Map getComponentConfiguration() {

        return null;

    }

}


prepare 메소드는 이 볼트로부터 투플을 발행하기 위해 사용하는 OutputCollector를 볼트에 제공한다. 투플은 언제든 볼트로부터 발행될 수 있다. -- prepare, execute, cleanup 또는 다른 스레드의 비동기 이벤트로도 발행할 수 있다. 예제의 prepare 구현체는 단순히 나중에 execute 메소드에서 OutputCollector를 사용할 수 있도록 인스턴스로 저장한다.


execute는 볼트의 입력으로부터 하나의 투플을 받는다. ExclamationBolt는 투플의 첫번쨰 필드를 꺼내서 "!!!"를 붙인다음 발행한다. 만약 다수의 입력소스로부터 구독을하도록 bolt를 구현하였다면 투플이 어디로부터 전달되었는지는 getSourceComponent 메소드를 통해서 알 수 있다.


execute 메소드에서 수행하는 또다른 것이 있다. 첫번째 라인에서는 투플을 발행하고 마지막라인에서는 입력 투플에 ack를 한다. 이것은 Storm의 신뢰성 API이 한 부분으로 데이터 손실이 없도록 보장한다. 더 자세한 내용은 마지막에 설명하겠다.


cleanup 메소드는 볼트가 셧다운 될떄 호출되며 열린 리소스를 정리하기 위해 사용된다. 이 메소드는 클러스터 상에서 호출되는 것을 보장하지 못한다. : 예를들어 머신에 분산되어서 수행중일때 이러한 메소드를 수행할 수 있는 방법이 없다. cleanup 메소드는 토폴로지를 로컬모드로 실행할때 의미가 있다. 그리고 토폴로지를 실행하거나 죽일때 리소스 릭을 제거하기 원할때 사용할 수 잇다. 


declareOutputFields  메소드는 ExclamationBolt가 word 불리는 하나의 필드를 가진 투플을 반환한다고 명시하고 있다.



getComponentConfiguration 메소드는 컴포넌트를 실행할때 다양한 영향을 설정할 수 있다. 이것에 대한 주제는 다음에 자세히 다루겠다.


cleanup과 getComponentConfiguration 메소드는 bolt 구현에 종종 필요가 없다. 우리는 일반적으로 base class를 이용해 볼트를 더욱 간결하게 작성할 수 있다. BaseRichBolt를 상속해 더욱 간결하게 작성한 ExclamationBolt는 아래와 같다.


public static class ExclamationBolt extends BaseRichBolt {

    OutputCollector _collector;


    @Override

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

        _collector = collector;

    }


    @Override

    public void execute(Tuple tuple) {

        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));

        _collector.ack(tuple);

    }


    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

    }    

}


Running ExclamationTopology in local mode


ExclamationTopology이 로컬모드에서 어떻게 실행하고 어떻게 동작하는지 살펴보자.


스톰은 두가지 오퍼레이션 모드가 있다. : 로컬 모드와 분산 모드이다. 로컬 모드에서는 스톰은 하나의 프로세스 내에서 실행되며 워커 노드들은 스레드로 시뮬레이션 된다. 로컬 모드는 토폴로지를 개발하거나 테스트할때 유용하다. storm-starter에 토폴로지를 실행시키면 로컬모드로 실행될 것이며, 우리는 각 컴포넌트가 어떤 메시지를 발행하는지 알 수 있다. 


분산 모드에서는 스톰은 클러스터에서 수행된다. 토폴로지를 마스터에 전송하면 토폴로지를 실행하는데 필요한 모든 코드가 전송될 것이다. 마스터는 코드를 분배하고 토폴로지를 실행한 워커를 할당한다. 만약 워커가 다운되면 마스터는 그들은 다른곳에 재할당할 것이다.


다음 코드는 ExclamationTopology를 로컬모드에서 실행하기 위한 코드이다.


Config conf = new Config();

conf.setDebug(true);

conf.setNumWorkers(2);


LocalCluster cluster = new LocalCluster();

cluster.submitTopology("test", conf, builder.createTopology());

Utils.sleep(10000);

cluster.killTopology("test");

cluster.shutdown();


먼저 LocalCluster 객체를 생성함으로써 in-process 클러스터를 생성한다. 가상 클러스터에 토폴로지를 서브밋합니다. 이때는 submitTopology를 호출하여 수행한다. 


"test"라는 이름은 나중에 kill할 수 있도록 토폴로지를 식별하기 위해서 사용된다. 


configuration은 토폴로지 실행시 다양한 영향을 조정하기 위해서 이용됩니다. 가장 일반적인 두가지 설정은 다음과 같다.


TOPOLOGY_WORKERS - 클러스터에 토폴로지를 실행하기 위해 얼마나 많은 수의 프로세스를 할당할 지를 의미한다. 토폴로지의 각 컴포넌트는 매우 많은 수의 스레드에서 실행될 것이다. 스레드의 숫자는 주어진 컴포넌트의 setBolt, setSpout 메소드에 의해 할당된다.  이러한 스레드들은 워커 프로세스에 존재한다. 각 워커 프로세스는 내부에 몇몇의 컴포넌트를 위한 몇몇의 스레드를 포함한다. 예를 들어 컴포넌트들에 300개의 스레드를 명시했고, 설정에 50개의 worker 프로세스를 명시했다면 각 워커 프로세스는 각 컴포넌트당 6개의 스레드를 실행할 것이다. 


TOPOLOGY_DEBUG - 이 속성을 true로 세팅하면 스톰은 각 컴포넌트에 의해 발행되는 모든 메시지를 로깅할 것이다. 이것은 로컬 모드에서 토폴로지를 테스트할 때 유용하다. 


이외에 많은 설정들이 토폴로지를 위해 존재한다. 자세한 사항은 Javadoc을 참고하기 바란다.


Stream groupings


스트림 그룹핑은 토폴로지에 두 컴포넌트 간에 어떻게 투플을 전송할지를 명시한다. 




Stream grouping은 taks의 집합간에 투플을 어떻게 전송해야되는지를 의미한다. 스트림 그룹핑의 종류를 자세히 살펴보기 전에 스톰 클러스터의 또다른 토폴로지를 살펴보자. WorldCountTopology는 문장을 읽어서 WorldCountBolt에 스트림을 전달한다. 


TopologyBuilder builder = new TopologyBuilder();


builder.setSpout("sentences", new RandomSentenceSpout(), 5);        

builder.setBolt("split", new SplitSentence(), 8)

        .shuffleGrouping("sentences");

builder.setBolt("count", new WordCount(), 12)

        .fieldsGrouping("split", new Fields("word"));


SplitSentence는 전달받은 각 문장의 단어를 투플로 발행하고 WordCount는 카운트를 하기 위한 단어를 메모리에 유지한다. WordCount가 단어를 전달 받을때 마다 상태를 업데이트하고 새로운 단어 카운트를 발행한다.


스트림 그룹핑에는 몇가지 종류가 있다.


"shuffle grouping"은 가장 단순한 종류의 그룹핑이다. shuffle grouping은 RandomSentenceSpout으로부터 SplitSentence 볼트로 투플을 전송하기 위해 사용된다. 이 경우 클러스터에 분산된 어떠한 SplitSentence 볼트의 작업에도 분배가 된다.


"fields grouping"은 더욱 흥미로운 종류의 그룹핑이다. fields grouping은 SplitSentence 볼트와 WordCount 볼트 사이에 투플을 전송하기 위해 이용된다. 이 경우 WordCount 볼트는 항상 같은 타스크로 같은 단어가 전송된다. 그렇지 않다면 하나 이상의 타스크가 같은 단어를 볼것인데 그럴 경우 잘못된 카운트 정보가 발행될 것이다. fields grouping은 각 필드의 부분집합을 스트림으로 그룹핑한다. 이러한 결과로 같은 값은 값은 타스크에 할당된다. WordCount는 SplitSentence의 출력 스트림을 word라는 필드의 그룹핑을 사용한다. 이 경우 똑같은 단어들은 항상 똑같은 타스크들로 전달되어 볼트는 정확한 출력을 생산할 수 있을 것이다.


필드 그룹핑은 스트림의 조인 또는 조합의 기본적인 구현이다.내부적으로 fields grouping은 mod hashing을 사용해서 구현되었다.


몇가지 다른 종류의 스트림 그룹핑도 있다. 자세한 내용은 Concepts 챕터를 살펴보자.


Guaranteeing message processing


이번 튜토리얼의 처음에 어떻게 투플이 발행되는지에 대해서 묵과하였다. 이러한 측명은 스톰 reliabilty API의 하나의 부분이다. : 어떻게 스톰은 spout으로 부터 들어온 메시지가 완전히 처리되는것을 보장하는 지에 대한 것이다. 이러한 정보는 Guaranteeing message processing을 살펴보자.


Transactional topologies


스톰은 모든 메시지가 적어도 한번 토폴로지를 통해 실행될 것을 보장한다. 공통적인 질문은 어떻게 스톰의 위에서 카운트를 할 수 있는가 이다. 오버카운트를 원하지는 않지 않는가? 스톰은 transactional topologies라고 불리우는 특성을 가지고 있다. 대부분의 계산을 위해 정확한 한번의 메시징 의미를 획득할 수 있다. 이러한 자세한 내용은 다음에 살펴보겠다.


Distributed RPC


스톰의 대략적인 관점에서 이번 튜토리얼은 스트림 프로세싱을 보여주었다. 스톰의 기본적으로 제공하는 더 많은 것들이 있다. 스톰 어플리케이션의 가장 흥미로운 것 중 하나는 Distributed RPC이다. 이것은 빠르게 극도의 연산을 병렬로 처리할 수 있다. 자세한 내용은 나중에 살펴보겠다.


Conclusion


이번 튜토리얼은 스톰 토폴로지를 개발, 테스트, 배포하기 위해 대략적으로 살펴보았다.



반응형
반응형

이번 페이지는 스톰 클러스터를 세팅하고 실행시키는 스텝에 대한 개요이다. 만약 AWS환경이라면 storm-deploy 프로젝트를 체크아웃을 해야한다. storm-deploy를 완전히 EC2에 스톰 클러스터를 자동적으로 설정하고 세팅할 수 있다. 또한 당신이 CPU, disk, 네트워크 사용량을 모니터할 수 있도록 Ganglia도 세팅해줄 것이다. 


만약 스톰 클러스터를 세팅하다가 어려움에 빠진다면 트러블슈팅 페이지를 참고하라. 그래도 안되다면 메일링 리스트로 메일하라.


스톰 클러스터를 세팅하는 스텝을 요약해보면 다음과 같다.

  • 주키퍼 클러스터를 셋업하라.
  • Nimbus와 worker 머신에 디펜던시를 인스톨하라.
  • 스톰릴리즈를 다운로드해서 nimbus와 worker 머신에 압축해제하라.
  • storm.yml에 필수 설정을 채워라.
  • 스톰 스크립트을 이용해 데몬을 실행시키고 슈퍼바이저를 선택하라.

Set up a Zookeeper cluster


스톰은 주키퍼를 클러스터 코디네이터로 이용한다. 주키퍼는 메시지를 전송하는데 이용하지는 않는다. 따라서 스톰 클러스터에 이용되는 주키퍼는 매우 조용하다. 싱글 노드 주키퍼 클러스터는 대부분의 경우 충분하나 대량의 스톰 클러스터를 failover 하거나 배포를 한다면 더 큰 주키퍼 클러스터가 필요하다.


주키퍼 배포에 대한 몇가지 주의사항은 다음과 같다.


주키퍼는 fail-fast하고 에러를 만나는 순간 종료하므로 supvervision하에서 Zookeeper를 실행하여야 한다.

주키퍼의 데이터와 트랜잭션 로그를 compact하는 cron을 설정해야한다. 주키퍼 데몬 스스로 이러한 일을 수행하지 않으므로, cron을 설정하지 않는다면 빠르게 디스크 용량이 소진될 것이다.


Install dependencies on Nimbus and worker machines


다음은 nimus와 worker 머신에 스톰이 가지고 있는 디펜던시 인스톨하라.


Java 6

Python 2.6.6


위에 있는 디펜던시의 버젼이 스톰을 테스트한 버젼이다. 다른 버젼의 자바나 파이썬에서는 스톰이 잘 동작할수도 아닐수도 있다.


Download and extract a Storm release to Nimbus and worker machines


다음 스톰 릴리이즈를 다운로드하고 nimus와 각각의 worker 머신에 압축을 풀어라. 


Fill in mandatory configurations into storm.yaml


스톰 릴리이즈는 스톰 데몬을 설정하기 위한 conf/storm.yml 파일을 포함하고 있다. 당신은 여기서 설정의 기본 값들을 살펴볼 수 있다. storm.yaml은 defaults.yaml의 내용을 오버라이드한다. 동작하는 클러스터를 구성하기 위한 몇가지 필수로 설정해야할 요소들이 있다. 


1) storm.zookeeper.servers: 스톰 클러스터를 위한 주키퍼 클러스터의 호스트 리스트이다. 


storm.zookeeper.servers:

  - "111.222.333.444"

  - "555.666.777.888"


만약 주키퍼의 포트가 디폴트 포트가 아니라면 storm.zookeeper.port를 세팅해야 한다.


2) storm.local.dir: Nimbus와 Supervisor 데몬은 jars나 설정들을 저장하기 위한 로컬 디스크가 필요하다. 각 머신에 디렉토리를 생성하고 적절한 퍼미션을 준 다음 아래와 같이 설정을 해야 한다.


storm.local.dir: "/mnt/storm"


3) nimbus.host: Worker 노드는 topology jars나 설정을 마스터로 부터 다운로드 받기 위해서 다음과 같은 설정이 필요하다.


nimbus.host: "111.222.333.44"


4) supervisor.slots.ports: 각 worker 머신을 위해 얼마나 많은 worker가 동작할지를 지정해야 한다. 각 worker는 메시지를 수신하기 위해 하나의 포트를 이용하고 해당 포트를 사용하기 위해 포트를 열어야한다. 만약 여기에 다섯개의 포트를 정의한다면 이 머신에 다섯개의 worker를 할당하겠다는 것을 의미한다. 기본적으로 6700, 6701, 6702, 6703의 네개의 worker를 위해 할당되어 있다.


supervisor.slots.ports:

    - 6700

    - 6701

    - 6702

    - 6703


Configure external libraries and environmental variables (optional)


만약 외부 라이브러리 또는 커스텀 플러그인의 지원을 받을 필요가 있다면 그러한 jars는 extlib/ 그리고 extlib-daemon/ 디렉토리에 위치시키면 된다. extlib-daemon/ 디렉토리는 데몬(Nimbus, Supervisor, DRPC, UI, Logviewer)에 의해서만 사용되는 jars를 저장하는 곳임을 주의하자. 또한 두가지 환경변수 STORM_EXT_CLASSPATH 그리고 STORM_EXT_CLASSPATH_DAEMON에 의해 외부 클래스 패스를 포함시킬 수 있다.


Launch daemons under supervision using "storm" script and a supervisor of your choice


마지막 스템은 모든 스톰 데몬을 시작하는 것이다. 감독하에 이러한 데몬을 실행하는 것이 매우 크리티컬하다. 스톰은 fail-fast 시스템이며 이것은 예상외의 에러에 직면했을때 프로세스가 종료된다는 것을 의미한다. 스톰은 그러한 경우 안전하게 종료하고 프로세스가 재실행되었을 떄 정확하게 복구할 수 있게 설계되었다. 이것이 스톰이 수행중 상태를 유지하지 않는 이유이다. 만약 Nimbus나 Supervisor를 재시작한다면 수행중인 토폴로지는 어떠한 영향도 없을 것이다. 다음은 스톰 데몬을 실행하는 방법이다.


Nimbus: "bin/storm nimbus" 커맨드를 마스터 머신에서 수행하라.

Supervisor: "bin/storm supervisor" 커맨드를 각 워커 머신에서 실행하라. Supervisor 데몬은 각 머신의 프로세스를 시작하고 종료하는 책임을 가지고 있다.

UI: "bin/storm ui" 커맨드를 수행함으로써 스톰 UI를 실행할 수 있다. UI는 웹브라우저를 통해 http://{nimbus host}:8080으로 접속할 수 있다.


정상적으로 데몬이 수행된다면 데몬은 logs/ 디렉토리에 storm 릴리이즈가 어디에 압축해제 되었는지가 로깅될 것이다. 

반응형

+ Recent posts