욱'S 노트

MapReduce 시작하기 본문

Programming/Hadoop

MapReduce 시작하기

devsun 2014. 12. 22. 13:25

자 이제 MapReduce로 첫번째 프로그램을 만들어보자. 우리의 목표는 공식사이트의 첫번째 예제인 WordCount이다. 무작정 베끼기만 하면 이해가 어려울 수 있으니 목표는 똑같으나 스타일은 내 스타일대로 가겠다. 목표는 Hadoop의 README.txt 파일의 단어 갯수를 세는 프로그램이다. 


먼저 MapReduce의 mapper 역할을 하는 클래스를 만들어 보자.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.io.IOException;
import java.util.StringTokenizer;
 
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class SimpleWordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final Logger logger = LoggerFactory.getLogger(SimpleWordMapper.class);
    
    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        logger.info("Key : {} , Value : {}", key, value);
        
        StringTokenizer tokenizer = new StringTokenizer(value.toString());
        
        while (tokenizer.hasMoreTokens()) {
            String token = tokenizer.nextToken();
            
            context.write(new Text(token), new IntWritable(1));
        }
    }
 
}
 
cs


Mapper 클래스를 보면 generic으로 4개의 타입을 명시하도록 요구한다. 각 타입은 <입력키, 입력값, 출력키, 출력값>에 대한 타입이다. 개요에서 봤듯이 자체 직렬화 및 정렬을 위해 Hadoop에서는 특정 타입을 구현하도록 유도한다. 위의 LongWritable, IntWritable, Text는 쉽게 보면 long, int, String에 대응되는 타입들이라고 보면 되겠다. 해당 클래스는 쉽게 보면 특정 long과 String을 입력으로 받아서 그것의 출력으로 단어 하나와 숫자 1을 context에 쓰는 일을 한다. 보충하자면 입력키와 값이 long과 String인 건 일반 텍스트 파일을 입력으로 활용할 경우 해당 타입의 값으로 입력이 생성된다. 


다음은 reduce 역할을 수행하는 클래스이다. 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import java.io.IOException;
 
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class SimpleWordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private final Logger logger = LoggerFactory.getLogger(SimpleWordReducer.class);
    
    @Override
    public void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException {
        int sum = 0;
        
        for (IntWritable intWritable : value) {
            logger.info("Key : {} , Value : {}", key, intWritable);
            
            sum += intWritable.get();
        }
        
        context.write(key, new IntWritable(sum));
    }
}
 
cs


전에 말했다시피 reducer의 입력은 mapper의 출력이다. 그러나 특이한 점은 value가 iterator형태로 전달되고 있음을 알 수 있다. 먼가 당연하지 않겠는가? 그렇지 않다면 mapper의 출력 결과를 unique하게 만들어야 하는데, 그렇다면 너무 힘들뿐더라 할 수 있는 일이 너무 제약이 될 것이다.


로컬에서 수행을 할려면 hadoop-site.xml 파일을 아래와 같이 수정하면 된다. 클러스터에 작업을 의뢰 할려면  fs.default.name만 수정하면 된다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 
<configuration>
    <property>
        <name>fs.default.name</name>
        <value>file:///</value>
    </property>
    <property>
        <name>mapred.job.tracker</name>
        <value>local</value>
    </property>
</configuration>
 
cs


구동하는 소스이다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class SimpleWordCounter {
    private final static String CONFIG_PATH = "/Users/devsun/dev/workspace/test/sample-project/src/test/resources/hadoop-local.xml";
 
    private final static Logger logger = LoggerFactory.getLogger(SimpleWordCounter.class);
    
    private final static String SOURCE_FILE_PATH = "/Users/devsun/dev/workspace/test/sample-project/src/test/resources/README.txt";
    private final static String TARGET_FILE_PATH = "/Users/devsun/dev/workspace/test/sample-project/src/test/resources/output";
 
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.addResource(new Path(CONFIG_PATH));
 
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(SimpleWordCounter.class);
        job.setMapperClass(SimpleWordMapper.class);
        job.setCombinerClass(SimpleWordReducer.class);
        job.setReducerClass(SimpleWordReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        Path sourcePath = new Path(SOURCE_FILE_PATH);
        
        logger.info(sourcePath.getFileSystem(job.getConfiguration()).getHomeDirectory().toString());
                
        FileInputFormat.addInputPath(job, sourcePath);
        FileOutputFormat.setOutputPath(job, new Path(TARGET_FILE_PATH));
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
 
cs


작업을 하나 정의하게 되고, 파일의 이력과 출력을 명시한다. 최종 결과를 확인해보자. 역시 잘된다.




'Programming > Hadoop' 카테고리의 다른 글

Hadoop - 클러스터 세팅  (0) 2015.03.20
MapReduce 개요  (0) 2014.12.22
HDFS - FileSystem API 맛보기  (0) 2014.12.18
HDFS 개발 시작하기(java)  (0) 2014.12.18
HDFS 개요  (0) 2014.12.17
Comments