욱'S 노트

Spring Batch - ItemReader, ItemWriter, ItemProcessor 본문

Programming/Spring Batch

Spring Batch - ItemReader, ItemWriter, ItemProcessor

devsun 2015. 1. 23. 17:11

모든 배치처리는 대량의 데이터를 읽고 계산하거나 변경하고 결과를 쓰는것으로 묘사될 수 있다. 스프링 배치는 bulk 처리를 위한 세가지 주요한 ItemReader, ItemProcessor 및 ItemWriter를 제공한다.


ItemReader


비록 단순한 컨셉이지만 ItemReader는 다양한 타입의 입력 데이터를 제공하기 위한 것이다. 가장 일반적인 예는 다음과 같다.


- Flat File, XML, Database 


public interface ItemReader<T> {
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}


Read 메소드는 ItemReader가 가장 주요한 계약이다. 메소드가 호출되면 하나의 아이템이 리턴되거나 null이 리턴되어야 한다. Item은 file의 라인, 데이터베이스의 로우 혹은 XML 파일의 element일 수 있다. 아이템은 일반적으로 사용가능한 도메인 오브젝트로 매핑될 수 있을 것이다. ItemReader의 일반적인 구현체들은 forward only이다. 이것은 아이템의 부족으로 아무것도 하지 않는 것이 예외를 발생하는 것보다 더 좋기 때문이다. 예를 들어 데이터베이스를 읽었을때 결과가 없으면 null을 리턴하고 작업은 종료하는 것이다. 하지만 예외적으로 트랜잭션에 연관된 특별한 리소스의 경우에는 rollback 시나리오에 호출된 동일한 아이템을 리턴해야 할 수도 있다.


ItemWriter


ItemWriter는 ItermReader와 기능적으로 닮았지만, 반대의 연산을 수행한다. 리소스는 적절한 위치에서 열리고 닫혀야 한다. 데이터 베이스 혹은 큐에서 해당하는 데이터로 포맷팅되어 전송되거나 입력되거나 업데이트 되어야 한다.

public interface ItemWriter<T> {
void write(List<? extends T> items) throws Exception;
}

ItermReader에서는 read처럼 ItemWriter는 write는 가장 기본적인 연산이다. 이 메소드는 아이템리스트를 전달받아 리소스가 열려있는한 쓰기를 시도할 것이다. 아이템 리스트들은 작업되고 chunk로 처리가 수행된다. 리스트를 다 쓴 다음 write 메소드로 돌아오기 전에 flush가 수행된다. 예를 들어 Hibernate DAO로 작성한다면 개별 아이템으로 부터 다중 호출이 만들어져서 처리가 될 것이며,  write 메소드로부터 돌아오기전에 session은 close가 될 것이다.


ItemProcessor


ItemReader와 ItemWriter는 그들의 특정한 단위작업을 수행하는데 매우 유용하다. 그러나 쓰기 전에 비즈니스 로직을 넣고 싶다면? 이러한 경우를 위해 스프링 배치에서는 ItemProcessor 인터페이스를 제공한다.

public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}

ItemProcessor는 매우 단순하다. 하나의 오브젝트가 전달되면 그것을 변환하여 다른 것을 리턴한다. 입력 오브젝트와 출력 오브젝트는 같을수도 있고, 다를수도 있다. process 메소드에 개발자가 원하는 비즈니스 로직이 들어갈 수 있다.


다양한 ItemProcessors를 chain 할 수 있다. 앞의 ItemPrcoessor의 출력이 다음 ItemProcessor의 입력으로 제공된다.

<bean id="compositeItemProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor">
<property name="delegates">
<list>
<bean class="..FooProcessor"/>
<bean class="..BarProcessor"/>
</list>
</property>
</bean>

ItemProcessor에서 레코드를 필터링 하는 방법은 굉장히 단순한다. null을 리턴함으로써 필터링을 할 수 있다.


ItemStream


ItemReader, ItemWriter 모두 개별적인 목적에 잘 맞게 제공되어 있다. 그러나 다른 인터페이스에 의해 참조될 경우도 있을 것이다. 일반적인 배치작업에 범위에서 reader와 writer는 열고, 닫고 해당 상태가 저장되어져야 한다.

public interface ItemStream {
void open(ExecutionContext executionContext) throws ItemStreamException;

void update(ExecutionContext executionContext) throws ItemStreamException;

void close() throws ItemStreamException;
}

ItemReader의 클라이언트들은 ItemStream을 구현해하는데 파일과 같은 특정 커넥션을 획득하기 위해 read메소드를 호출하기 전에 open을 수행하여야 한다. 비슷한 제약사항은 ItemWriter에도 적용된다. ExecutionContext에 있는 데이터를 이용하여 ItemReader나 ItemWriter에 위치를 지정할 수도 있으며, 반대로 커넥션을 닫기 위해 해당 정보를 사용하기도 한다. Update가 호출되면 제공된 ExecutionContext의 현재 상태를 유지 하기 위해 저장하기도 한다. 이러한 정보는 재시작시 사용되기도 한다.


그러나 ItemReader나 ItemWriter를 step 범위내에서 사용할 수 있도록 delegate 패턴을 제공한다. 아래는 다음과 같이 해석이 된다. CustomCompositeItemWriter는 ItemStream을 구현하고 있지 않지만, step의 생명주기와 같이 barWriter는 열리고, 닫히고 업데이트 될 것이다.

<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="fooProcessor"
writer="compositeItemWriter" commit-interval="2">
<streams>
<stream ref="barWriter"/>
</streams>
</chunk>
</tasklet>
</step>
</job>

<bean id="compositeItemWriter" class="...CustomCompositeItemWriter">
<property name="delegate" ref="barWriter"/>
</bean>

<bean id="barWriter" class="...BarWriter"/>

Reusing Existing Services


만약 기존재하는 서비스가 있다면 배치에서 간단하게 재활용할 수 있다. 아래와 같이 Adapter를 이용하면 해당 시점에 해당 서비스를 호출해준다.

<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter">
<property name="targetObject" ref="fooService"/>
<property name="targetMethod" value="generateFoo"/>
</bean>
<bean id="fooService" class="org.springframework.batch.item.sample.FooService"/>


Validating Input


입력을 검증하기 위해서 스프링 배치에서는 ValidatingItemProcessor를 제공한다. 간단하게 validator를 구현해서 주입시키면 입력에 대한 validation을 수행할 수 있다.

public interface Validator<T> {
void validate(T value) throws ValidationException;
}


Comments