욱'S 노트

Spark - RDD 본문

Programming/Spark

Spark - RDD

devsun 2017. 2. 1. 17:24

스파크에서 RDD는 가장 주요한 개념이다. RDD는 병렬로 수행될 수 있는 엘리먼트의 컬렉션이며, fault-tolerant하다. 앞에서 보았듯이 RDD를 생성하는 방법은 두가지이다. 첫번째는 내부의 컬렉션으로 부터 생성하는 방식이며 두번째는 외부의 리소스로부터 생성하는 방법이 있다.

내부컬렉션으로부터 생성

아래와 같이 간단한 프로그램을 작성해보자.

object RddCollectionTest {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("RddCollectionTest").setMaster("local"))
    val data = Array(1,2,3,4,5)
    val distData = sc.parallelize(data)

    distData.foreach(element => println(element))

    sc.stop();
  }
}

RDD 즉 병렬 컬렉션은 SparkContext 객체의 parallelize 메소드를 통해서 생성된다.
프로그램의 실행결과는 단순하다. 다음과 같다.

1
2
3
4
5

parallelize 메소드에서 주요한 파라미터는 파티션 수이다. 기본적으로 스파크에서는 클러스터에 기반하여 자동적으로 파티션의 수를 세팅한다. 그러나 다음과 같이 parallelize 메소드의 두번째 인자로 파티션 수를 결정할 수 있다.

외부데이터로부터 생성

object RddFileCollectionTest {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("RddFileCollectionTest").setMaster("local")
    val sc = new SparkContext(conf);

    val data = sc.textFile("data.txt");

    data.foreach(item => println(item))

    sc.stop()
  }
}

위와 같이 스파크에서는 다양한 외부 데이터 소스로부터 RDD를 생성할 수 있다. 그러나 위와 같이 로컬 파일 시스템으로부터 데이터를 생성할 때는 유의할 점이 있다. 해당 작업이 클러스터 모드에서 동작한다면 모든 워크노드에서 해당 경로의 파일에 접근 할 수 있어야 한다. 즉, 모든 워크노드로 파일이 복제되거나 네트워크 기반의 파일 시스템을 사용해야 한다는 뜻이다.

RDD 연산

RDD에서는 두가지 타입의 연산을 제공한다. transformation과 action이다. 예를들어 map transformation연산이며 주어진 데이터셋의 엘리먼트를 이용해 새로운 데이터셋의 엘리먼트를 생성한다. 또한 reduce가 대표적인 action의 예이며, 각 엘리먼트에 대한 집계를 수행하여 드라이버 프로그램에게 전달한다.

스파크의 모든 연산은 lazy하다. 이 뜻은 연산이 바로 수행되지 않고 드라이버 프로그램이 리턴을 요구할 때 수행된다는 것이다. lazy한 연산을 위해 자신이 사용한 base 데이터셋을 기억한다. 예를들어 엄청나게 큰 데이터셋이 존재한다고 할 때 그 결과를 다음 노드에 전달해서 수행하게 되므로, 스파크는 훨씬 효율적으로 자원을 활용하게 된다.

기본적으로 스파크는 항상 재연산을 수행하나, persist 또는 cache 메소드를 이용하여 데이터셋을 메모리 또는 디스크에 저장할 수 있다. 이렇게 할 경우 같은 쿼리에 대해 훨씬 빠른 응답을 줄 수 있다.

RDD 연산을 수행할 때는 변수와 메소드에 대한 스코프와 라이프사이클에 주의해야 한다. 만약 아래와 같은 코드가 있다면 로컬 모드일때와 클러스터 모드일때의 연산 결과는 달라질 것이다. 기본적으로 스파크는 클러스터상에서 수행되기 때문에 아래와 같이 범위 밖의 변수를 변경하는 실수를 하지 말아야 한다.

var counter = 0

var data = sc.parallelize(Array(1,2,3))

data.foreach(item => counter += item)

스파크는 RDD에 대한 다양한 연산을 제공한다. 각 연산에 대해서는 다음 장에서 알아본다.

RDD Persistence

스파크에서 제공하는 가장 중요한 기능 중에 하나는 연산중인 데이터셋을 저장하는 것이다. 저장된 RDD이 이후 연산에서 재활용되어 연산을 속도를 증대시킬 수 있다.

RDD는 persiste()나 cache() 메소드를 이용해 저장될 수 있다. 스파크에서 캐쉬는 fault-tolerant하다. 만약 캐쉬 데이터를 잃어 버릴 경우 재연산에서 똑같은 결과를 캐슁한다.

또한 RDD는 저장시 다양한 스토리지 레벨을 제공하며, 이에따라 메모리 혹은 디스크에 저장할 수 있는 옵션을 제공한다.

마지막으로 스파크의 캐쉬는 LRU알고리즘에 의해 자동으로 삭제되지만 unpersist() 메소드를 이용해 직접 삭제하는 기능도 제공한다.

공유변수

일반적인 스파크 연산이 원격 클러스터에서 실행될 때 함수에서 사용되는 변수는 각각의 클러스터에 복제되고 각 노드에서의 연산에서는 독립적이다. 일반적으로 클러스터상에서 변수를 공유하는 것은 비효율적이다. 하지만 스파크에서는 일반적인 연산 패턴을 지원하기 위해 broadcast 변수와 accumulator를 제공한다.

브로드캐스트 변수

브로드캐스트 변수는 읽기전용 변수이다. 각 머신에 캐쉬를 유지하고 각 타스크에 복제본을 제공한다. 모든 노드에 큰 입력 데이터셋을 제공할 때 좋은 방법이다. 스파크는 브로드캐스트 변수를 제공하는 효율적인 알고리즘을 제공한다.

스파크 연산은 일련의 단계로 수행된다. 스파크는 각 단계의 타스크에서 공통 데이터가 필요할 때 자동으로 브로드캐스트 한다. 즉 타스크에서 해당 데이터가 필요할 때만 캐쉬된 데이터가 역직렬화되서 제공된다.

브로드캐스트 변수는 broadcast() 메소드를 호출하여 생성할 수 있으며 value 메소드를 통해 해당 값을 이용할 수 있다.

val broadcast = sc.broadcast(Array(1, 2, 3))

broadcast.value.foreach(item => {
  println(item)
})

Accumulator

Accumulator는 병렬환경에서 효율적으로 집계 연산을 수행하기 위해 제공된다. 스파크에서는 숫자 타입의 accumulator를 기본 제공하며 프로그래머는 새로운 타입을 지원하는 Accumulator를 작성할 수 있다. 숫자 형식의 accumulator는 longAccumulator() 또는 doubleAccumulator()를 메소드를 통해 생성할 수 있다.

val conf = new SparkConf().setAppName("AccumulatorTest").setMaster("local")

val sc = new SparkContext(conf)

val accumulator = sc.longAccumulator("My Accumulator")

val distData = sc.parallelize(Array(1, 2, 3))

distData.foreach(item => accumulator.add(item))

println(accumulator.value)

sc.stop()

또한 프로그래머가 직접 Accumulator를 작성할 수도 있는데 이럴 경우 AccumulatorV2를 상속받아 구현하면 된다. AccumulatorV2는 추상클래스이며 여러개의 메소드를 가지고 있다. 대표적인 것인 reset과 add인데 reset에는 초기화시의 동작을 add에는 추가시의 동작을 작성하면 된다.

class MyAccumulator extends AccumulatorV2[Long, Long] {
  private var sum : Long = 0

  override def reset(): Unit = sum = 0

  override def add(v: Long): Unit = sum += v
}

AccumulatorV2를 구현한 사용자의 Accumulator는 스파크에서 호출시 한번씩만 동작하도록 보장한다. 커스텀 Accumulator는 아래와 같이 등록해서 사용하면 된다.

sc.register(new MyAccumulator, "My Accumulator")

출처 : http://spark.apache.org/docs/latest/programming-guide.html

'Programming > Spark' 카테고리의 다른 글

Spark - 어플리케이션 서브밋  (1) 2017.03.06
Spark - Stand alone 클러스터  (0) 2017.02.17
Spark - 클러스터 개요  (0) 2017.02.16
Spark - 개요 및 시작하기  (0) 2017.02.01
Spark + IntelliJ 프로젝트 구성하기  (0) 2017.02.01
Comments