일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- Java
- Linux
- elastic search
- hdfs
- Spring Boot
- 도메인주도설계
- Angular2
- nginx
- design pattern
- scala
- DDD
- Spring XD
- 스프링 배치
- hadoop
- spark
- 엘라스틱서치
- hibernate
- docker
- apache storm
- Hbase
- Spring
- elasticsearch
- Domain Driven Design
- Gradle
- Storm
- Spring Batch
- 제주
- intellij
- Clean Code
- SBT
- Today
- Total
욱'S 노트
Spring Batch Example - 분류해서 출력하기 본문
스프링 배치 매뉴얼을 60% 본 시점 즈음에 다음과 같은 배치 프로그램을 작성해달라는 요청이 생겼다.
요건 : 파일로 부터 IP 리스트를 읽어서 특정 IP 대역을 분류해주세요.
음 엄청 간단하다. 첫번쨰 방법은 멀티 스텝을 구성해서 파일에서 IP 리스트를 읽어서 특정 대역에 해당하는 내용을 한 파일에 쓴다. 그리고 난 다음 두번째 스텝에서 특정대역에 속하지 않는 파일을 쓰면 된다. 그러나 이러한 방법은 우아하지 못하다. 배치라고 하면 대용량일텐데 입력파일을 두번 읽는다면, 두배로 시간이 소요되기 떄문이다. 그래서 구글링을 해봤더니 역시나 방법이 있었다. 이번 예제는 한번 읽어서 분류해서 두개의 파일로 출력하는 방법을 구현해보겠다.
먼저 입력 파일을 살펴보자. 아래와 같이 아이피를 가진 입력 파일이다.
210.xxx.xxx.xx
210.xxx.xxx.xx
210.xxx.xxx.xx
210.xxx.xxx.xx
210.xxx.xxx.xx
210.xxx.xxx.xx
210.xxx.xxx.xx
210.xxx.xxx.xx
210.xxx.xxx.xx
219.xxx.xxx.xx
219.xxx.xxx.xx
219.xxx.xxx.xx
219.xxx.xxx.xx
219.xxx.xxx.xx
219.xxx.xxx.xx
219.xxx.xxx.xx
219.xxx.xxx.xx
먼저 입력파일을 읽어드리기 위한 reader를 선언해보자.
<bean id="ipFileReader" class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="classpath:example/iplist.txt"/>
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names" value="ip"/>
</bean>
</property>
<property name="fieldSetMapper">
<bean class="net.daum.datahub.batch.app.classify.IpInfoMapper"/>
</property>
</bean>
</property>
</bean>
FieldSet을 도메인 객체를 변환하기 위해서 도메인 객체와 Mapper 클래스를 작성해보자.
도메인 객체 작성시 필터링 되었는지 여부를 판단할 수 있는 필드를 추가적으로 선언하자.
public class IpInfo {
private String ip;
private boolean filtered = false;
public IpInfo(String ip) {
this.ip = ip;
}
public void setFiltered(boolean filtered) {
this.filtered = filtered;
}
public boolean isFiltered() {
return filtered;
}
@Override
public String toString() {
return ip;
}
public String getIp() {
return ip;
}
}
public class IpInfoMapper implements FieldSetMapper<IpInfo> {
@Override
public IpInfo mapFieldSet(FieldSet fieldSet) throws BindException {
String ip = fieldSet.readString("ip");
return new IpInfo(ip);
}
}
다음은 처리로직을 담은 ItemProcessor를 하나 생성하겠다. 로직은 일단 단순하게 210번대로 시작하는 IP를 필터링 하는 예이다.
public class IpFilterProcessor implements ItemProcessor<IpInfo,IpInfo> {
@Override
public IpInfo process(IpInfo item) throws Exception {
if (item.getIp().startsWith("210.")) {
item.setFiltered(true);
}
return item;
}
}
그런 다음 두개의 ItemWriter를 선언하자. 하나는 필터링된 아이피를 작성할 writer이고, 다른 하나는 필터링되지 않은 아이피를 작성하는 것이다.
<bean id="filteredIpFileWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" value="file://Users/devsun/output/filtered_ip.csv"/>
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator"/>
</property>
</bean>
<bean id="unfilteredIpFileWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" value="file://Users/devsun/output/unfiltered_ip.csv"/>
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator"/>
</property>
</bean>
마지막으로 전달된 아이템에 따라 사용한 Writer를 지정하기 위한 Classifier를 작성해보자. ClassifiterCompositeItemWriter는 우리가 작성한 Classfier의 classify 메소드를 호출하여 writer를 선택한 후 해당 writer의 write 메소드를 호출할 것이다.
public class IpClassfier implements Classifier<IpInfo,ItemWriter<IpInfo>> {
private Map<Boolean, ItemWriter<IpInfo>> writerMap = new HashMap<Boolean, ItemWriter<IpInfo>>();
@Override
public ItemWriter<IpInfo> classify(IpInfo ipInfo) {
return writerMap.get(ipInfo.isFiltered());
}
public void setWriterMap(Map<Boolean, ItemWriter<IpInfo>> writerMap) {
this.writerMap = writerMap;
}
}
<bean id="ipClassifierFileWriter" class="org.springframework.batch.item.support.ClassifierCompositeItemWriter">
<property name="classifier">
<bean class="net.daum.datahub.batch.app.classify.IpClassfier">
<property name="writerMap">
<map>
<entry key="true" value-ref="filteredIpFileWriter"/>
<entry key="false" value-ref="unfilteredIpFileWriter"/>
</map>
</property>
</bean>
</property>
</bean>
최종적인 작업의 설정이다. 멀티 writer를 오픈하고 닫기 위해 stream으로 등록해주면 된다. 수행 결과는 성공적일 것이다.
<batch:job id="classifyJob">
<batch:step id="simpleStep">
<batch:tasklet>
<batch:chunk reader="ipFileReader" processor="ipProcessor"
writer="ipClassifierFileWriter" commit-interval="10">
<batch:streams>
<batch:stream ref="filteredIpFileWriter"/>
<batch:stream ref="unfilteredIpFileWriter"/>
</batch:streams>
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
'Programming > Spring Batch' 카테고리의 다른 글
Spring Batch Tip - 작업 히스토리 테이블 삭제 스크립트 (0) | 2015.08.21 |
---|---|
Spring Batch - Scaling, Parallel Processing (2) | 2015.01.28 |
Spring Batch - ItemReader, ItemWriter, ItemProcessor (0) | 2015.01.23 |
Spring Batch - Step 정의 (0) | 2015.01.23 |
Spring Batch - 메타데이터 이용 (0) | 2015.01.22 |