욱'S 노트

Apache Storm - Tutorial 본문

Programming/Storm

Apache Storm - Tutorial

devsun 2015. 10. 29. 19:23

이번 튜토리얼에서는 어떻게 스톰 토폴로지를 만들고 스톰 클러스터에 배포하는지에 대해서 배울 것이다. 자바를 메인 언어로 사용할 것이지만 몇가지 예제는 파이썬을 사용할 것이다.


Preliminaries


이번 예제는 storm-starter 프로젝트에 있다. 예제를 따라하기 위해 프로젝트를 클론하는 것을 추천한다.  먼저 개발 환경 세팅하는 법을 일고 수행하고 새로운 스톰 프로젝트를 머신에 셋업하라. 


Components of a Storm cluster


스톰 클러스터는 표면적으로 보면 하둡 클러스터와 닮았다. 하둡은 맵리듀스 작업을 실행하는 반면에 스톰은 토폴로지를 실행한다. Job과 topology는 매우 다르다. 가장 주요하게 다른 점은 맵리듀스는 결국은 작업이 끝나는 반면, 토폴로지 영원히 메시지를 처리한다.(죽일때까지)


스톰 클러스터에는 두가지 종류의 노드가 있다. : 마스터 노드와 워커 노드이다. 마스터 노드는 Nimbus라고 불리는 데몬을 실행한다. 이는 하둡의 JobTracker와 유사하다. Nimbus는 클러스터에 코드를 분배하고 머신에 타스크를 할당하며 실패를 모니터링 한다. 


각 워커 노드는 Supervisor라고 불리우는 데몬을 실행한다. Supervisor는 작업을 대기하고 있다가 Nimbus에 의해 작업이 할당되면 필요에 의해 워커 프로세스를 실행하거나 중지한다. 각 워커 프로세는 토폴로지의 서브셋으로 실행된다. : 실행중인 토폴로지는 많은 머신에 분배된 많은 수의 워커 프로세스로 구성된다.




Nimbus와 Supervisor간의 모든 코디네이션은 Zookeeper 클러스터를 통해 이루어진다. 그리고 Nimbus 데몬과 Supvervisor 데몬은 fail-fast하며 stateless하다. 모든 상태는 주키퍼나 로컬 디스크에서 저장된다. 이 의미는 kill -9 로 Nimbus 나 Supervisor를 죽여도 된다는 것을 의미하며 다시 시작하면 아무 일이 없었던 것 처럼 백업이 실행된다. 이러한 설계는 스톰 클러스터는 거의 완벽하게 stable하게 만들어 준다.


Topologies


스톰에서 실시간 처리를 위해 우리는 토폴로지라고 불리는 것을 생성한다. 토폴로지는 계산의 그래프이다. 토폴로지의 각 노드는 처리 로직을 포함한다. 그리고 링크를 통해 노드간 데이터 전달을 지정할 수 있다.


토폴로지를 실행하는 것은 단순하다. 먼저 코드를 패키지하고 하나의 jar에 디펜던시를 넣어라. 그런 다음 다음과 같은 명령을 수행하라.


storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2


이렇게 하면 backtype.storm.MyTopology 클래스가 인자 arg1과 arg2와 함께 동작을 할 것이다. 클래스의 main 함수는 토폴로지를 정의하고 그것을 Nimbus로 보낸다. storm jar 부분은 Nimbus와의 연결을 관리하고 jar을 업로드 시킨다. 


토폴로지 정의는 Thrift 구조이기 떄문에 Nimbus는 Thrift 서비스이며, 당신은 어떤 프로그래밍 언어를 이용하더라도 토폴로지를 생성하고 서브밋할 수 있다. 위의 예제는 JVM 기반 언어로 부터 수행하는 가장 쉬운 방법이다.


Streams


스톰의 핵심 추상화는 stream 이다. stream은 tuple의 불규칙한 순서이다. 스톰은 스트림을 새로운 스트림으로 변경할 수 있는 신뢰할 수 있는 분산 환경을 기본적으로 제공한다. 예를 들어 트워터의 스트림을 트렌드 토픽의 스트림으로 변경할 수 있다.


가장 기본적으로 스톰이 스트림 변경을 이해 제공하는 것은 spouts와 bolts이다. Spouts와 bolts는 어플리케이션에 명시한 로직을 실행하기 위한 구현해야 하는 인터페이스이다.


Spout은 스트림의 소스이다. Spout은 Kestrel 큐로부터 튜플을 읽어서 스트림으로 발행할 수 있다. 또는 spout은 트위터 API와 연결하여 트윗을 스트림으로 발행할 수 있다.


Bolt는 입력 스트림을 소비하고 몇가지 프로세싱을 수행하고 새로운 스트림을 발행한다. 트윗 스트림으로부터 트렌드 토픽으로 변환과 같은 복잡한 스트림 변경은 다수의 스텝이 필요하며 그러므로 다수의 bolt가 처리한다. Bolts는 기능을 수행하거나 tuple을 필터링하거나 스트림을 합치거나 데이터베이스와 통신하거나 어떤것이든 수행할 수 있다.


Spout과 bolt의 네트워크는 topology로 패키지 된다. 토폴로지는 탑레벨 추상화이며 실행을 위해 스톰 클러스터에 서브밋된다. 토폴로지는 각 노드 spout이나 bolt로 구성된 스트림 처리 그래프이다. 그래프의 엣지는 스트림은 어떤 bolt가 subscribe할지를 나타낸다. Spout이나 bolt가 tuple을 stream으로 발행하면 스트림을 구독하는 모든 볼트에게 투플이 전송된다.



링크는 토폴로지내에서 노드간 어떻게 투플이 전달되는지를 나타낸다. 예를들어 Spout A로 부터 Bolt B로의 링크가 있고 Bolt B로부터 Bolt C로의 링ㅋ가 있다면 Spout A가 튜플을 발행할때마다 튜플은 Bolt B 그리고 Bolt C로 전송될 것이다. 


스톰 토폴로지에서 각 노드는 병렬로 실행된다. 토폴로지에 각 노드가 얼마나 많은 병렬수를 처리할 지를 명시할 수 있다. 그러면 스톰은 실행을 위한 스레드들을 클러스터에 할당할 것이다.


토폴로지는 항상 실행된다. 죽기 전까지. 스톰은 자동으로 실패한 타스크를 재할당한다. 또한 스톰은 데이터 유실이 없도록 보장한다. 머신이 다운되거나 메시지가 drop되더라도


Data model


스톰은 투플을 데이터 모델로써 이용한다. 투플은 값의 명명된 리스트이다. 투플의 필드는 어떤 타입이 오브젝트든 가능하다. 스톰은 기본으로 모든 프리미티브 타입, 스트링 바이트 배열을 투플의 필드값으로 지원한다. 다른 타입의 오브젝트를 사용하기 위해서는 해당 타입을 위한 serializer를 구현하면 된다.


토폴로지의 모든 노드는 발행한 투플을 위한 출력 필드를 정의해야 된다. 예를 들어 아래의 볼트는 double과 triple을 필드로 가지는 2개의 투플을 발행할 것이다.


public class DoubleAndTripleBolt extends BaseRichBolt {

    private OutputCollectorBase _collector;


    @Override

    public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {

        _collector = collector;

    }


    @Override

    public void execute(Tuple input) {

        int val = input.getInteger(0);        

        _collector.emit(input, new Values(val*2, val*3));

        _collector.ack(input);

    }


    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("double", "triple"));

    }    

}


declareOutputFields 함수는 컴포넌트를 위한 출력 필드 ["double", "triple"]을 정의한다. 볼트의 나머지 부분은 다음 섹션에서 설명한다.


A simple topology


개념을 탐색하고 코드가 어떻게 구성되는지를 살펴보기 위해 단순한 토폴로지를 살펴보겠다. storm-starter의 ExclamationTopology를 살펴보자.


TopologyBuilder builder = new TopologyBuilder();        

builder.setSpout("words", new TestWordSpout(), 10);        

builder.setBolt("exclaim1", new ExclamationBolt(), 3)

        .shuffleGrouping("words");

builder.setBolt("exclaim2", new ExclamationBolt(), 2)

        .shuffleGrouping("exclaim1");


위의 토폴폴지는 하나의 spout과 두개의 볼트로 구성된다. Spout은 words를 발행하고 각 볼트는 입력에 "!!!"을 붙인다. 각 노드는 라인상에 위치한다. : spout이 첫번째 볼트로 발행하면 첫번째 볼트가 두번째 볼트로 발행한다. 만약 spout이 "bob"과 "john"이라는 투플을 발하면 두번째 볼트는 "bob!!!!!!"과 "john!!!!!!"을 발행할 것이다.


setSpout 과 setBolt 메소드를 이용하여 노드들을 정의할 수 있다. 이러한 메소드는 유저가 명시한 id를 입력으로 프로세싱 로직의 객체로 포함한다. 각 노드를 위한 최대 병렬수를 지정할 수 있다. 예를들면 spout은 "words"라는 아이디를 가지고 각 볼트들은 "excliam1"과 "excliam2"를 아이디로 가진다.


프로세싱 로직에 포함되는 오브젝트들은 spout는 IRichSpout, bolts는 IRichBolt를 구현해야 한다.


마지막 파라미터는 노드가 얼마나 많은 병렬처리를 수행할 수 있는지이다. 이것을 지정하면 클러스터내에서 해당 수에 해당하는 스레드가 실행될 것이다. 이것을 생략하면 스톰은 각 노드를 위해 하나의 스레드만 할당 할 것이다.


setBolt 메소드는 InputDeclarer 오브젝트를 리턴한다. 이것은 bolt의 입력을 정의하기 위해 이용된다. 여기서는 컴포넌트 "exclaim1"은 컴포넌트 "words"로 부터 발생하는 투플을 suffle grouping으로 읽겠다고 정의하고 있으며, "exclaim2"는 "exclaim1"으로 부터 shuffle grouping을 통해서 읽겠다고 정의하고 있다. shuffle grouping의 의미는 입력 task로부터 투플을 랜덤하게 bolt task로 분배하겠다는 것을 의미한다. 컴포넌트 간의 데이터를 그룹핑하는 방법은 여러가지가 있다. 이는 다음에 살펴보자.


만약 "excliam2"가 "words"와 "excliam2" 컴포넌트로 부터 발행한 모든 투플을 읽겠다면 다음과 같이 정의할 수 있다.


builder.setBolt("exclaim2", new ExclamationBolt(), 5)

            .shuffleGrouping("words")

            .shuffleGrouping("exclaim1");



As you can see, input declarations can be chained to specify multiple sources for the Bolt.


토폴로지에 있는 spout과 bolts의 구현에 대해서 더 살펴보자. 토폴로지에 있는 TestWordSpout 리스트에 있는 단어 ["nathan", "mike", "jackson", "golda", "bertels"] 중에 하나를 100ms마다 발핸한다. nextTuple의 구현은 다음과 같다.


public void nextTuple() {

    Utils.sleep(100);

    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};

    final Random rand = new Random();

    final String word = words[rand.nextInt(words.length)];

    _collector.emit(new Values(word));

}


ExclamationBolt는 입력에 "!!!" 문자열을 붙인다. ExclamationBolt의 구현은 다음과 같다. 


public static class ExclamationBolt implements IRichBolt {

    OutputCollector _collector;


    @Override

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

        _collector = collector;

    }


    @Override

    public void execute(Tuple tuple) {

        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));

        _collector.ack(tuple);

    }


    @Override

    public void cleanup() {

    }


    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

    }


    @Override

    public Map getComponentConfiguration() {

        return null;

    }

}


prepare 메소드는 이 볼트로부터 투플을 발행하기 위해 사용하는 OutputCollector를 볼트에 제공한다. 투플은 언제든 볼트로부터 발행될 수 있다. -- prepare, execute, cleanup 또는 다른 스레드의 비동기 이벤트로도 발행할 수 있다. 예제의 prepare 구현체는 단순히 나중에 execute 메소드에서 OutputCollector를 사용할 수 있도록 인스턴스로 저장한다.


execute는 볼트의 입력으로부터 하나의 투플을 받는다. ExclamationBolt는 투플의 첫번쨰 필드를 꺼내서 "!!!"를 붙인다음 발행한다. 만약 다수의 입력소스로부터 구독을하도록 bolt를 구현하였다면 투플이 어디로부터 전달되었는지는 getSourceComponent 메소드를 통해서 알 수 있다.


execute 메소드에서 수행하는 또다른 것이 있다. 첫번째 라인에서는 투플을 발행하고 마지막라인에서는 입력 투플에 ack를 한다. 이것은 Storm의 신뢰성 API이 한 부분으로 데이터 손실이 없도록 보장한다. 더 자세한 내용은 마지막에 설명하겠다.


cleanup 메소드는 볼트가 셧다운 될떄 호출되며 열린 리소스를 정리하기 위해 사용된다. 이 메소드는 클러스터 상에서 호출되는 것을 보장하지 못한다. : 예를들어 머신에 분산되어서 수행중일때 이러한 메소드를 수행할 수 있는 방법이 없다. cleanup 메소드는 토폴로지를 로컬모드로 실행할때 의미가 있다. 그리고 토폴로지를 실행하거나 죽일때 리소스 릭을 제거하기 원할때 사용할 수 잇다. 


declareOutputFields  메소드는 ExclamationBolt가 word 불리는 하나의 필드를 가진 투플을 반환한다고 명시하고 있다.



getComponentConfiguration 메소드는 컴포넌트를 실행할때 다양한 영향을 설정할 수 있다. 이것에 대한 주제는 다음에 자세히 다루겠다.


cleanup과 getComponentConfiguration 메소드는 bolt 구현에 종종 필요가 없다. 우리는 일반적으로 base class를 이용해 볼트를 더욱 간결하게 작성할 수 있다. BaseRichBolt를 상속해 더욱 간결하게 작성한 ExclamationBolt는 아래와 같다.


public static class ExclamationBolt extends BaseRichBolt {

    OutputCollector _collector;


    @Override

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

        _collector = collector;

    }


    @Override

    public void execute(Tuple tuple) {

        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));

        _collector.ack(tuple);

    }


    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

    }    

}


Running ExclamationTopology in local mode


ExclamationTopology이 로컬모드에서 어떻게 실행하고 어떻게 동작하는지 살펴보자.


스톰은 두가지 오퍼레이션 모드가 있다. : 로컬 모드와 분산 모드이다. 로컬 모드에서는 스톰은 하나의 프로세스 내에서 실행되며 워커 노드들은 스레드로 시뮬레이션 된다. 로컬 모드는 토폴로지를 개발하거나 테스트할때 유용하다. storm-starter에 토폴로지를 실행시키면 로컬모드로 실행될 것이며, 우리는 각 컴포넌트가 어떤 메시지를 발행하는지 알 수 있다. 


분산 모드에서는 스톰은 클러스터에서 수행된다. 토폴로지를 마스터에 전송하면 토폴로지를 실행하는데 필요한 모든 코드가 전송될 것이다. 마스터는 코드를 분배하고 토폴로지를 실행한 워커를 할당한다. 만약 워커가 다운되면 마스터는 그들은 다른곳에 재할당할 것이다.


다음 코드는 ExclamationTopology를 로컬모드에서 실행하기 위한 코드이다.


Config conf = new Config();

conf.setDebug(true);

conf.setNumWorkers(2);


LocalCluster cluster = new LocalCluster();

cluster.submitTopology("test", conf, builder.createTopology());

Utils.sleep(10000);

cluster.killTopology("test");

cluster.shutdown();


먼저 LocalCluster 객체를 생성함으로써 in-process 클러스터를 생성한다. 가상 클러스터에 토폴로지를 서브밋합니다. 이때는 submitTopology를 호출하여 수행한다. 


"test"라는 이름은 나중에 kill할 수 있도록 토폴로지를 식별하기 위해서 사용된다. 


configuration은 토폴로지 실행시 다양한 영향을 조정하기 위해서 이용됩니다. 가장 일반적인 두가지 설정은 다음과 같다.


TOPOLOGY_WORKERS - 클러스터에 토폴로지를 실행하기 위해 얼마나 많은 수의 프로세스를 할당할 지를 의미한다. 토폴로지의 각 컴포넌트는 매우 많은 수의 스레드에서 실행될 것이다. 스레드의 숫자는 주어진 컴포넌트의 setBolt, setSpout 메소드에 의해 할당된다.  이러한 스레드들은 워커 프로세스에 존재한다. 각 워커 프로세스는 내부에 몇몇의 컴포넌트를 위한 몇몇의 스레드를 포함한다. 예를 들어 컴포넌트들에 300개의 스레드를 명시했고, 설정에 50개의 worker 프로세스를 명시했다면 각 워커 프로세스는 각 컴포넌트당 6개의 스레드를 실행할 것이다. 


TOPOLOGY_DEBUG - 이 속성을 true로 세팅하면 스톰은 각 컴포넌트에 의해 발행되는 모든 메시지를 로깅할 것이다. 이것은 로컬 모드에서 토폴로지를 테스트할 때 유용하다. 


이외에 많은 설정들이 토폴로지를 위해 존재한다. 자세한 사항은 Javadoc을 참고하기 바란다.


Stream groupings


스트림 그룹핑은 토폴로지에 두 컴포넌트 간에 어떻게 투플을 전송할지를 명시한다. 




Stream grouping은 taks의 집합간에 투플을 어떻게 전송해야되는지를 의미한다. 스트림 그룹핑의 종류를 자세히 살펴보기 전에 스톰 클러스터의 또다른 토폴로지를 살펴보자. WorldCountTopology는 문장을 읽어서 WorldCountBolt에 스트림을 전달한다. 


TopologyBuilder builder = new TopologyBuilder();


builder.setSpout("sentences", new RandomSentenceSpout(), 5);        

builder.setBolt("split", new SplitSentence(), 8)

        .shuffleGrouping("sentences");

builder.setBolt("count", new WordCount(), 12)

        .fieldsGrouping("split", new Fields("word"));


SplitSentence는 전달받은 각 문장의 단어를 투플로 발행하고 WordCount는 카운트를 하기 위한 단어를 메모리에 유지한다. WordCount가 단어를 전달 받을때 마다 상태를 업데이트하고 새로운 단어 카운트를 발행한다.


스트림 그룹핑에는 몇가지 종류가 있다.


"shuffle grouping"은 가장 단순한 종류의 그룹핑이다. shuffle grouping은 RandomSentenceSpout으로부터 SplitSentence 볼트로 투플을 전송하기 위해 사용된다. 이 경우 클러스터에 분산된 어떠한 SplitSentence 볼트의 작업에도 분배가 된다.


"fields grouping"은 더욱 흥미로운 종류의 그룹핑이다. fields grouping은 SplitSentence 볼트와 WordCount 볼트 사이에 투플을 전송하기 위해 이용된다. 이 경우 WordCount 볼트는 항상 같은 타스크로 같은 단어가 전송된다. 그렇지 않다면 하나 이상의 타스크가 같은 단어를 볼것인데 그럴 경우 잘못된 카운트 정보가 발행될 것이다. fields grouping은 각 필드의 부분집합을 스트림으로 그룹핑한다. 이러한 결과로 같은 값은 값은 타스크에 할당된다. WordCount는 SplitSentence의 출력 스트림을 word라는 필드의 그룹핑을 사용한다. 이 경우 똑같은 단어들은 항상 똑같은 타스크들로 전달되어 볼트는 정확한 출력을 생산할 수 있을 것이다.


필드 그룹핑은 스트림의 조인 또는 조합의 기본적인 구현이다.내부적으로 fields grouping은 mod hashing을 사용해서 구현되었다.


몇가지 다른 종류의 스트림 그룹핑도 있다. 자세한 내용은 Concepts 챕터를 살펴보자.


Guaranteeing message processing


이번 튜토리얼의 처음에 어떻게 투플이 발행되는지에 대해서 묵과하였다. 이러한 측명은 스톰 reliabilty API의 하나의 부분이다. : 어떻게 스톰은 spout으로 부터 들어온 메시지가 완전히 처리되는것을 보장하는 지에 대한 것이다. 이러한 정보는 Guaranteeing message processing을 살펴보자.


Transactional topologies


스톰은 모든 메시지가 적어도 한번 토폴로지를 통해 실행될 것을 보장한다. 공통적인 질문은 어떻게 스톰의 위에서 카운트를 할 수 있는가 이다. 오버카운트를 원하지는 않지 않는가? 스톰은 transactional topologies라고 불리우는 특성을 가지고 있다. 대부분의 계산을 위해 정확한 한번의 메시징 의미를 획득할 수 있다. 이러한 자세한 내용은 다음에 살펴보겠다.


Distributed RPC


스톰의 대략적인 관점에서 이번 튜토리얼은 스트림 프로세싱을 보여주었다. 스톰의 기본적으로 제공하는 더 많은 것들이 있다. 스톰 어플리케이션의 가장 흥미로운 것 중 하나는 Distributed RPC이다. 이것은 빠르게 극도의 연산을 병렬로 처리할 수 있다. 자세한 내용은 나중에 살펴보겠다.


Conclusion


이번 튜토리얼은 스톰 토폴로지를 개발, 테스트, 배포하기 위해 대략적으로 살펴보았다.



Comments