반응형

많은 배치 처리 문제는 single 스레드, single 프로세스 작업으로 해결할 수 있다. 그리고 복잡한 구현을 생각하기 전에 단순하게 처리할 수 있는지 체크하는 것은 좋은 생각이다. 실제작업이 성능을 측정하고 가장 단순한 방법으로 처리 할 수 있다는 그것이 베스트이다. 수백 메가바이트의 파일을 읽고 쓰는데도 1분이면 충분할 것이다.  우리는 이번에 작업을 병렬로 수행하는 방법에 대해 알아볼 것이다. 가장 크게 나누어 본다면 single process에서 멀티 스레드로 작업을 수행하는 것과 멀티 프로세스에서 처리하는 방법에 대해서 알아볼 것이다.


Multi-threaded step


병렬처리를 시작하는 가장 단순한 방법은 Step 설정에 TaskExecutor를 추가하는 것이다.


<step id="loading"> 

<tasklet task-executor="taskExecutor" throttle-limit="20">...</tasklet>

</step>


이 예제에서 TaskExceutor는 다른 빈 정의를 참조한다. TaskExecutor는 표준적인 스프링의 인터페이스이다. 가장 단순한 멀티스레드 TaskExecutor는SimpleAsyncTaskExecutor이다. 위 설정의 결론은 Step 실행시 Chunk(각 commit interval에 구성된 아이템리스트)를 읽고 처리하고 출력할 때, 개별의 스레드에서 수행된다는 것이다. 주의할 점은 이렇게 처리된 아이템들의 순서는 보장할 수 없다는 것이다. 게다가 TaskExecutor의 스레드풀의 한계에 도달할 수 있으므로, 스레드풀 설정을 잘 확인해야 할 것이다.

Multi-threaded step에는 몇가지 현실적인 제약이 있다. 스텝에는 여러가지 reader나 writer가 참가하게 되는데 이런 것들이 stateful하다. 만약 state가 스레드로 분리될 수 없다면 해당 컴포넌트들은 멀티 스레드 스텝에서 사용될 수 없다. 특히나 Spring Batch에서 제공하는 많은 reader나 writer가 멀티스레드를 고려하여 설계되지 않았다. 하지만 reader나 writer를 stateless하거나 thread safe하게 동작하기 위해 process indicator를 이용한 예제를 제공한다. 데이터베이스 입력 테이블에 아이템의 처리 상태를 기록하는 방식이다.

멀티 스레드 스텝을 사용하기 전에 꼭 javadoc을 확인하여 해당 reader나 writer가 thread safe한지 확인하도록 하자.

Parallel Steps

병렬 처리가 필요한 어플리케이션 로직은 특정한 응답으로 분류될 수 있어야 하며 개별적인 스텝에 할당 될 수 있다는 것이다. 병렬 스텝의 쉽게 정의되고 사용될 수 있다. 아래 예에서 보면 (step1,step2)와 step3는 동시에 수행되며, 해당 처리는 step4로 전달된다.

<job id="job1">
<split id="split1" task-executor="taskExecutor" next="step4">
<flow>
<step id="step1" parent="s1" next="step2"/> 
<step id="step2" parent="s2"/>
            </flow>
            <flow>
<step id="step3" parent="s3"/> 
</flow>
</split>
<step id="step4" parent="s4"/> 
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>


Remote Chunking


리모트 청크처리는 스텝처리를 다수의 프로세스로 분할하는것을 의미한다. 미들웨어를 통해서 커뮤니케이션을 수행한다.




마스터 컴포넌트는 하나의 프로세스로 슬레이브는 멀티 리모트 프로세스이다. 이러한 패턴이 잘 동작하기 위해서는 마스터는 bottlenack이 없어야 한다. 마스터는 스프링 배치의 스텝의 구현체이고 ItemWriter는 chunk를 미들웨어에 메시지로 전달한다. 슬레이브는 미들웨어로 부터 메시지를 받아서 ItemWriter나 ItemProcessor를 거쳐 chunk를 처리한다. 이러한 패턴을 썼을때 하나의 이점은 reader와 processor, writer 컴포넌트가 분리될 수 있다는 것이다. 아이템이 자동으로 분할되고 미들웨어를 통해 작업이 공유된다면 리스너는 컨슈머 역할을 하게 되므로, 자연스럽게 로드 밸런싱을 할 수 있다. 미들웨어는 견고해야 한다. 각 메시지의 전달과 하나의 컨슈머에 의해 사용되는 것을 보장해야 한다. JMS는 후보가 될 수 있고, 다른 옵션으로는 그리드나 메모리 공유 제품이 될 수 있다. 


Partitioning


스프링 배치에서는 또한 스텝 수행을 파티셔닝하고 리모트로 실행할 수 있는 SPI를 제공한다. 이 경우 리모트 참가자들은 단순한 스텝 인스턴스이다. 쉽게 정의될 수 있으며, 로컬 프로세싱으로 이용된다.



왼쪽에 있는 작업은 스텝의 순서에 따라 수행되며 그리고 Master라고 표시된 스텝은 현재 수행중이다. 슬레이브는 마스터의 결과를 가지고 동일한 출력을 작업에 전달하는 모두 스텝의 인스턴스들이다. 슬레이브는 일반적으로 리모트 서비스 일 수 있다. 그러나 프로세스의 스레드라도 무방하다. 마스터로부터 슬레이브로 전달된 메시지는 견고할 필요는 없으며 전송만 보장되면 된다. 스프링 배치의 메타데이터인 JobRepository가 각 작업 수행에 대해 각 슬레이브가 한번 실행 됨을 보장할 것이다.


스프링배치의 SPI는 step의 특별한 구현체(PartitionStep)과 두개의 전략적인 인터페이스로 구성된다. 전략 인터페이스는 PartionHandler와 StepExecutionSplitter이다. 



멀티 스레드 스텝의 throttle-limit 속성과 비슷하게 grid-size는 스텝으로부터 요청이 포화되는 것을 방지해준다. 심플한 설정의 예제는 Spring Batch Samples에 포함되어 있으니 참고하기 바란다. 스프링 배치는 "step1:partition0"으로 불리는 step execution을 생성하고, 마스터 스텝으로 언급된 "step1:master"를 일관성 유지를 위해 생성한다. 스프링 3.0부터는 step의 alias를 사용할 수 있다.


파티셔 핸들러는 리모팅이나 그리드 환경의 구조를 알고 있는 컴포넌트이다. 이것은 리모트 스텝으로 StepExecution을 전송할 수 있다. 이때 StepExecution은 DTO와 같은 구조에 특화된 포맷에 포함될 수 있다. 이것은 입력 데이터가 어떻게 분할 되는지는 모르고, 다수의 StepExecution의 결과가 어떻게 조합되는지만 알고 있다. 일반적으로 많은 경웅에 복원이나 failover에 대해 알 필요도 없다. 어쨌든 스프링 배치는 구조로부터 독립된 방식의 재시작을 지원한다. 실패한 작업은 항상 재시작할 수 있으며 실패한 스텝만 재시작 될 것이다.


파티션 핸들러 인터페이스는 다양한 형태의 특별한 구현이 될 수 있다. (RMI remoting, EJB remoting, custom web service, JMS, Java Spaces, shared memory grids (like Terracotta or Coherence), grid execution fabrics (like GridGain).) 스프링 배치는 그리드나 리모팅에 대한 구현체를 포함하고 있지는 않다.


하지만 스프링 배치는 PartitionHandler의 유용한 구현체를 제공한다. 스텝을 TaskExecutor를 이용하여 여러개의 스레드로 스텝을 분할 시켜서 수행한다. 이 구현체의 이름은 TaskExecutorPartitionHandler이며 xmlnamespace에 의해 스텝에 기본적으로 정의된다. 상세한 정의는 아래와 같다.


<step id="step1.master">

<partition step="step1" handler="handler"/>

</step>

<bean class="org.spr...TaskExecutorPartitionHandler"> 

<property name="taskExecutor" ref="taskExecutor"/> 

<property name="step" ref="step1" />

<property name="gridSize" value="10" />

</bean>


그리드 사이드는 생성될 step execution의 수를 결정한다. 이것은 thread pool의 사이즈와 매치될 수 있다. TaskExecutorPartionHandler는 IO가 중요한 스텝에 꽤 유용하다. 또한 리모트 수행을 위해 이용될 수 있다. 리모트 실행에 대한 프록시로서 step의 구현체로 제공될 수 있다.


파티셔너는 더 단순한 의무를 가진다. 새로운 StepExecution을 위한 ExecutionContext를 생성하기 위한 입력 파라미터로서의 의무만 가진다.


public interface Partitioner {

Map<String, ExecutionContext> partition(int gridSize);

}


반응형
반응형

스프링 배치 매뉴얼을 60% 본 시점 즈음에 다음과 같은 배치 프로그램을 작성해달라는 요청이 생겼다.


요건 : 파일로 부터 IP 리스트를 읽어서 특정 IP 대역을 분류해주세요.


음 엄청 간단하다. 첫번쨰 방법은 멀티 스텝을 구성해서 파일에서 IP 리스트를 읽어서 특정 대역에 해당하는 내용을 한 파일에 쓴다. 그리고 난 다음 두번째 스텝에서 특정대역에 속하지 않는 파일을 쓰면 된다. 그러나 이러한 방법은 우아하지 못하다. 배치라고 하면 대용량일텐데 입력파일을 두번 읽는다면, 두배로 시간이 소요되기 떄문이다. 그래서 구글링을 해봤더니 역시나 방법이 있었다. 이번 예제는 한번 읽어서 분류해서 두개의 파일로 출력하는 방법을 구현해보겠다.

 

먼저 입력 파일을 살펴보자. 아래와 같이 아이피를 가진 입력 파일이다.

210.xxx.xxx.xx
210.xxx.xxx.xx
210.xxx.xxx.xx
210.xxx.xxx.xx
210.xxx.xxx.xx
210.xxx.xxx.xx
210.xxx.xxx.xx
210.xxx.xxx.xx
210.xxx.xxx.xx
219.xxx.xxx.xx
219.xxx.xxx.xx
219.xxx.xxx.xx
219.xxx.xxx.xx
219.xxx.xxx.xx
219.xxx.xxx.xx
219.xxx.xxx.xx
219.xxx.xxx.xx


먼저 입력파일을 읽어드리기 위한 reader를 선언해보자.

<bean id="ipFileReader" class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="classpath:example/iplist.txt"/>
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names" value="ip"/>
</bean>
</property>
<property name="fieldSetMapper">
<bean class="net.daum.datahub.batch.app.classify.IpInfoMapper"/>
</property>
</bean>
</property>
</bean>

FieldSet을 도메인 객체를 변환하기 위해서 도메인 객체와 Mapper 클래스를 작성해보자.

도메인 객체 작성시 필터링 되었는지 여부를 판단할 수 있는 필드를 추가적으로 선언하자.

public class IpInfo {
private String ip;

private boolean filtered = false;

public IpInfo(String ip) {
this.ip = ip;
}

public void setFiltered(boolean filtered) {
this.filtered = filtered;
}

public boolean isFiltered() {
return filtered;
}

@Override
public String toString() {
return ip;
}

public String getIp() {
return ip;
}
}
public class IpInfoMapper implements FieldSetMapper<IpInfo> {
@Override
public IpInfo mapFieldSet(FieldSet fieldSet) throws BindException {
String ip = fieldSet.readString("ip");

return new IpInfo(ip);
}
}


다음은 처리로직을 담은 ItemProcessor를 하나 생성하겠다. 로직은 일단 단순하게 210번대로 시작하는 IP를 필터링 하는 예이다.

public class IpFilterProcessor implements ItemProcessor<IpInfo,IpInfo> {
@Override
public IpInfo process(IpInfo item) throws Exception {
if (item.getIp().startsWith("210.")) {
item.setFiltered(true);
}

return item;
}
}


그런 다음 두개의 ItemWriter를 선언하자. 하나는 필터링된 아이피를 작성할 writer이고, 다른 하나는 필터링되지 않은 아이피를 작성하는 것이다. 

<bean id="filteredIpFileWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" value="file://Users/devsun/output/filtered_ip.csv"/>
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator"/>
</property>
</bean>

<bean id="unfilteredIpFileWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" value="file://Users/devsun/output/unfiltered_ip.csv"/>
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator"/>
</property>
</bean>


마지막으로 전달된 아이템에 따라 사용한 Writer를 지정하기 위한 Classifier를 작성해보자. ClassifiterCompositeItemWriter는 우리가 작성한 Classfier의 classify 메소드를 호출하여 writer를 선택한 후 해당 writer의 write 메소드를 호출할 것이다.

public class IpClassfier implements Classifier<IpInfo,ItemWriter<IpInfo>> {
private Map<Boolean, ItemWriter<IpInfo>> writerMap = new HashMap<Boolean, ItemWriter<IpInfo>>();

@Override
public ItemWriter<IpInfo> classify(IpInfo ipInfo) {
return writerMap.get(ipInfo.isFiltered());
}

public void setWriterMap(Map<Boolean, ItemWriter<IpInfo>> writerMap) {
this.writerMap = writerMap;
}
}
<bean id="ipClassifierFileWriter" class="org.springframework.batch.item.support.ClassifierCompositeItemWriter">
<property name="classifier">
<bean class="net.daum.datahub.batch.app.classify.IpClassfier">
<property name="writerMap">
<map>
<entry key="true" value-ref="filteredIpFileWriter"/>
<entry key="false" value-ref="unfilteredIpFileWriter"/>
</map>
</property>
</bean>
</property>
</bean>


최종적인 작업의 설정이다. 멀티 writer를 오픈하고 닫기 위해 stream으로 등록해주면 된다. 수행 결과는 성공적일 것이다.

<batch:job id="classifyJob">
<batch:step id="simpleStep">
<batch:tasklet>
<batch:chunk reader="ipFileReader" processor="ipProcessor"
writer="ipClassifierFileWriter" commit-interval="10">
<batch:streams>
<batch:stream ref="filteredIpFileWriter"/>
<batch:stream ref="unfilteredIpFileWriter"/>
</batch:streams>
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>


반응형
반응형

모든 배치처리는 대량의 데이터를 읽고 계산하거나 변경하고 결과를 쓰는것으로 묘사될 수 있다. 스프링 배치는 bulk 처리를 위한 세가지 주요한 ItemReader, ItemProcessor 및 ItemWriter를 제공한다.


ItemReader


비록 단순한 컨셉이지만 ItemReader는 다양한 타입의 입력 데이터를 제공하기 위한 것이다. 가장 일반적인 예는 다음과 같다.


- Flat File, XML, Database 


public interface ItemReader<T> {
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}


Read 메소드는 ItemReader가 가장 주요한 계약이다. 메소드가 호출되면 하나의 아이템이 리턴되거나 null이 리턴되어야 한다. Item은 file의 라인, 데이터베이스의 로우 혹은 XML 파일의 element일 수 있다. 아이템은 일반적으로 사용가능한 도메인 오브젝트로 매핑될 수 있을 것이다. ItemReader의 일반적인 구현체들은 forward only이다. 이것은 아이템의 부족으로 아무것도 하지 않는 것이 예외를 발생하는 것보다 더 좋기 때문이다. 예를 들어 데이터베이스를 읽었을때 결과가 없으면 null을 리턴하고 작업은 종료하는 것이다. 하지만 예외적으로 트랜잭션에 연관된 특별한 리소스의 경우에는 rollback 시나리오에 호출된 동일한 아이템을 리턴해야 할 수도 있다.


ItemWriter


ItemWriter는 ItermReader와 기능적으로 닮았지만, 반대의 연산을 수행한다. 리소스는 적절한 위치에서 열리고 닫혀야 한다. 데이터 베이스 혹은 큐에서 해당하는 데이터로 포맷팅되어 전송되거나 입력되거나 업데이트 되어야 한다.

public interface ItemWriter<T> {
void write(List<? extends T> items) throws Exception;
}

ItermReader에서는 read처럼 ItemWriter는 write는 가장 기본적인 연산이다. 이 메소드는 아이템리스트를 전달받아 리소스가 열려있는한 쓰기를 시도할 것이다. 아이템 리스트들은 작업되고 chunk로 처리가 수행된다. 리스트를 다 쓴 다음 write 메소드로 돌아오기 전에 flush가 수행된다. 예를 들어 Hibernate DAO로 작성한다면 개별 아이템으로 부터 다중 호출이 만들어져서 처리가 될 것이며,  write 메소드로부터 돌아오기전에 session은 close가 될 것이다.


ItemProcessor


ItemReader와 ItemWriter는 그들의 특정한 단위작업을 수행하는데 매우 유용하다. 그러나 쓰기 전에 비즈니스 로직을 넣고 싶다면? 이러한 경우를 위해 스프링 배치에서는 ItemProcessor 인터페이스를 제공한다.

public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}

ItemProcessor는 매우 단순하다. 하나의 오브젝트가 전달되면 그것을 변환하여 다른 것을 리턴한다. 입력 오브젝트와 출력 오브젝트는 같을수도 있고, 다를수도 있다. process 메소드에 개발자가 원하는 비즈니스 로직이 들어갈 수 있다.


다양한 ItemProcessors를 chain 할 수 있다. 앞의 ItemPrcoessor의 출력이 다음 ItemProcessor의 입력으로 제공된다.

<bean id="compositeItemProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor">
<property name="delegates">
<list>
<bean class="..FooProcessor"/>
<bean class="..BarProcessor"/>
</list>
</property>
</bean>

ItemProcessor에서 레코드를 필터링 하는 방법은 굉장히 단순한다. null을 리턴함으로써 필터링을 할 수 있다.


ItemStream


ItemReader, ItemWriter 모두 개별적인 목적에 잘 맞게 제공되어 있다. 그러나 다른 인터페이스에 의해 참조될 경우도 있을 것이다. 일반적인 배치작업에 범위에서 reader와 writer는 열고, 닫고 해당 상태가 저장되어져야 한다.

public interface ItemStream {
void open(ExecutionContext executionContext) throws ItemStreamException;

void update(ExecutionContext executionContext) throws ItemStreamException;

void close() throws ItemStreamException;
}

ItemReader의 클라이언트들은 ItemStream을 구현해하는데 파일과 같은 특정 커넥션을 획득하기 위해 read메소드를 호출하기 전에 open을 수행하여야 한다. 비슷한 제약사항은 ItemWriter에도 적용된다. ExecutionContext에 있는 데이터를 이용하여 ItemReader나 ItemWriter에 위치를 지정할 수도 있으며, 반대로 커넥션을 닫기 위해 해당 정보를 사용하기도 한다. Update가 호출되면 제공된 ExecutionContext의 현재 상태를 유지 하기 위해 저장하기도 한다. 이러한 정보는 재시작시 사용되기도 한다.


그러나 ItemReader나 ItemWriter를 step 범위내에서 사용할 수 있도록 delegate 패턴을 제공한다. 아래는 다음과 같이 해석이 된다. CustomCompositeItemWriter는 ItemStream을 구현하고 있지 않지만, step의 생명주기와 같이 barWriter는 열리고, 닫히고 업데이트 될 것이다.

<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="fooProcessor"
writer="compositeItemWriter" commit-interval="2">
<streams>
<stream ref="barWriter"/>
</streams>
</chunk>
</tasklet>
</step>
</job>

<bean id="compositeItemWriter" class="...CustomCompositeItemWriter">
<property name="delegate" ref="barWriter"/>
</bean>

<bean id="barWriter" class="...BarWriter"/>

Reusing Existing Services


만약 기존재하는 서비스가 있다면 배치에서 간단하게 재활용할 수 있다. 아래와 같이 Adapter를 이용하면 해당 시점에 해당 서비스를 호출해준다.

<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter">
<property name="targetObject" ref="fooService"/>
<property name="targetMethod" value="generateFoo"/>
</bean>
<bean id="fooService" class="org.springframework.batch.item.sample.FooService"/>


Validating Input


입력을 검증하기 위해서 스프링 배치에서는 ValidatingItemProcessor를 제공한다. 간단하게 validator를 구현해서 주입시키면 입력에 대한 validation을 수행할 수 있다.

public interface Validator<T> {
void validate(T value) throws ValidationException;
}


반응형
반응형



JobLauncher는 새로운 JobExecution을 생성하기 위해 JobRepository를 이용한다. 작업 수행중에 같은 실행을 업데이트하기 위해 같은 JobRepository를 이용한다. 단순한 시나리오는 위와 같지만,  수백 개의 배치 작업과 스케쥴링 요구 사항을 충족하기 위해선 다음과 같이 요구된다.



JobExplorer와 JobOperator는 메타데이터에 대한 쿼리와 컨트롤을 추가한 인터페이스이다.


JobExplorer


기존의 실행을 리파지토리로부터 쿼리를 하는 것이 가장 기본적인 기능일 것이다. 이러한 기능은 JobExplorer 인터페이스에 의해 제공된다. JobExplorer는 JobRepository의 read-only 버젼이다. 팩토리빈에 의해 쉽게 정의할 수 있다.

<bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean"
p:dataSource-ref="dataSource" p:tablePrefix="BATCH_"/>`


JobRegistry


JobRegistry는 반드시 정의해야 되는 것은 아니다. 그러나 해당 컨텍스트에서 이용가능한 작업목록을 유지하고 싶을때 유용하다. 프레임웍은 오직 하나의 구현체만을 제공하고 단순한 맵 형태로 제공된다. 다음과 같이 정의할 수 있다.

<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry"/>

JobRegistry를 자동적으로 저장시키는 방법은 두가지가 있다. 하나는 bean post processor를 이용하는 것이고, 다른 하나는 registrar lifecycle component를 이용하는 것이다.


모든 작업을 등록하는 bean post-processor의 정의는 다음과 같다.

<bean id="jobRegistryBeanPostProcessor" class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
<property name="jobRegistry" ref="jobRegistry"/>
</bean>

AutomaticJobRegistrar는 child context를 생성하고 작업을 생성하는 lifecycle component이다. 이렇게 할 경우 하나의 이점은 child 컨텍스트에 작업명은 전체 registry 유일하게 식별할 수 있어야 하지만 디펜던시 들은 자연스러운 이름을 사용할 수 있다는 것이다. 예를 들어 작업마다 xml configuration을 만들었다고 가정을 해보자. 각 작업마다 ItemReader를 가질 것이다. 거기에  "reader"라는 같은 이름은 빈들이 정의되어 있다고 할 때, 이들이 같은 context에 있다면 crash가 나거나 override가 되지만 이 경우에는 자동적으로 그러한 상황을 피할 수 있게 된다.

<bean id="JobRegistrar" class="org.springframework.batch.core.configuration.support.AutomaticJobRegistrar">
<property name="applicationContextFactories">
<bean class="org.springframework.batch.core.configuration.support.ClasspathXmlApplicationContextsFactoryBean">
<property name="resources" value="classpath*:/config/job*.xml"/>
</bean>
</property>
<property name="jobLoader">
<bean class="org.springframework.batch.core.configuration.support.DefaultJobLoader">
<property name="jobRegistry" ref="jobRegistry"/>
</bean>
</property>
</bean>

Registrar는 두가지 필수적인 프로퍼티를 가지는데, 하나는 ApplicationContextFactory의 배열이고 다른 하나는 JobLoader이다. JobLoader는 child 컨텍스트의 라이프사이클을 관리하고 JobRegistry에 작업을 등록한다.


JobOperator


JobRepository는 메타데이터에 대한 CRUD 기능을 제공하고, JobExplorer는 메타데이터에 대한 read-only 기능을 제공한다고 하였다. 하지만 이러한 기능들은 공통적인 모니터링 task는 즉, 중지, 재시작 또는 작업 요약등과 같은 배치 수행과 같이 사용되었을 때, 더욱 유용하다. JobOperator에 그러한 기능들을 스프링 배치에서 제공한다. 간단한 정의는 다음과 같다.

<bean id="jobOperator" class="org.springframework.batch.core.launch.support.SimpleJobOperator">
<property name="jobRegistry" ref="jobRegistry"/>
<property name="jobRepository" ref="jobRepository"/>
<property name="jobExplorer" ref="jobExplorer"/>
<property name="jobLauncher" ref="jobLauncher"/>
</bean>

JobParametersIncrementer


대부분은 JobOperator 메소드는 명백하고 javadoc를 보면 자세한 설명을 찾을수 있다. 하지만 startNextInstance메소드는 설명을 할 필요가 있다. 이 메소드는 작업의 새로운 작업 인스턴스를 시작한다. JobExecution에 심각한 문제가 있다면 이러한 기능은 꼭 필요하다. 이러한 경우 작업은 처음부터 다시 실행되어야 할 필요가 있을 것이다. 새로운 JobInstance를 수행하기 위해서 새로운 JobParameter가 필요하다. 이러한 경우 startNextInstance 메소드는 작업에 대해서 강제로 JobInstance를 새로 생성하기 위해서 JobParametersIncrementer를 사용한다. 다음은 간단한 JobParamtersIncrementer를 작성하고 설정한 예이다.

public class SimpleJobParametersIncrementer implements JobParametersIncrementer {
@Override
public JobParameters getNext(JobParameters parameters) {
if (parameters==null || parameters.isEmpty()) {
return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters(); }
long id = parameters.getLong("run.id",1L) + 1;
return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
}
}

<batch:job id="job1" parent="baseJob" incrementer="sampleIncrementer">
<step id="step1" parent="standaloneStep"/>
<batch:listeners merge="true">
<batch:listener ref="listenerTwo"/>
</batch:listeners>
</batch:job>

Stopping a job


JobOperator에 작업 중지를 요청했을때, 즉시 강제로 shutdown하는 방법은 없기 때문에 shutdown은 바로 일어나지 않는다. 특히 현재 비즈니스 로직과 같은 개발자의 코드를 수행하고 있을 경우에는 더 그렇다. 이러한 경우 프레임워크가 컨트롤할 수 있는 영역이 아니기 때문이다. 하지만 컨트롤 할 수 있는 최대한 빠른 시점에 현재 StepExecution을 STOPPED 시킬 것이고 JobExecution에서 해당 상황을 전달 할 것이다.


Aborting a job


작업 수행이 실패하였다면 재시작될 수 있다. 그러나 작업의 상태가 ABANDONED라면 작업은 재시작할 수 없다. ABANDONED 상태는 작업 재시작을 스킵하기 위해서 사용할 수도 있다. 만약 프로세스가 죽어버렸다면 JobRepository는 해당 상태가 FAILED인지 ABANDONED인지 알 수가 없다. 이러한 경우에는 비즈니스 적인 의사 결정이 필요하며, 강제로 상태를 세팅해야 된다. 스프링 배치 어드민 JobService에 해당 기능이 제공된다.

반응형
반응형

JobLauncher 정의하기


JobLauncher의 가장 기본적인 구현체는 SimpleJobLauncher이다. 실행을 획득하기 위해 JobRepository에만 의존성을 가진다.

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository"/>
</bean>


JobExecution을 획득하면 Job의 execute 메소드로 전달된다.



이러한 과정은 스케쥴러로부터 구동되어 잘 수행되었을 경우이다. 하지만 Http 요청으로부터 작업이 구동될 경우에는 SimpleJobLauncher는 비동기로 작업을 수행하고 결과는 요청 즉시 클라이언트로 전달할 필요가 있다. 배치와 같은 많은 시간 수행되는 처리동안 HTTP 요청을 유지하는 것은 좋은 방식이 아니기 때문이다.




이러한 경우에 JobLauncher의 정의는 다음과 같이 표현된다.

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository"/>
<property name="taskExecutor">
<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
</property>
</bean>


Running a Job


최소한 배치 작업을 구동하기 위해서는 두 가지가 필요하다. 구동될 작업과 JobLauncher이다. 둘다 같은 context에 포함 될수도 있고 다른 context에 포함 될수도 있다.

만약 커맨드라인으로부터 작업이 구동된다면 각 작업은 새로운 JVM에서 초기화될 것이다. 그러나 web container에서 만약 구동을 한다면 일반적으로 하나의 JobLauncher가 있을 것이며 작업을 비동기로 구동하게 될 것이다.


Running Jobs from the Command Line


엔터프라이즈 스케쥴러로부터 작업을 수행시키기를 원하는 유저들은 커맨드 라인이 주요한 인터페이스이다.

대부분의 스케쥴러들이 OS의 쉘 스크립트를 수행하여 작업을 수행하기 때문이다.

또한 shell script, Perl, Ruby 혹은 maven, ant와 같은 빌드툴에서도 자바 프로세스를 구동하는 방법을 제공한다.

이러한 이유에서 Spring Batch에서는 CommandLineJobRunner을 제공한다.

다음은 CommandLineJobRunner의 주요기능이다.


- 적절한 ApplicationContext 로딩

- 커맨드 라인 arguments를 파싱하여 JobParameters로 변환

- Arguments에 지정한 적절한 작업 수행

- 작업 수행을 위해 ApplicationContext에서 제공한 적절한 JobLauncher 사용


커맨드라인에서 작업을 수행하기 위해서 다음과 같은 arguments를 전달해야 한다. 첫번째는 작업이 정의된 xml의 jobPath이며 두번째는 jobName이다. 이후의 모든 arguments는 JobParameters로 고려되며 'name=value' 포맷이다.


bash$ java CommandLineJobRunner endOfDayJob.xml endOfDay schedule.date(date)=2007/05/05


커맨드라인으로 부터 작업이 구동되면 엔터프라이즈 스케쥴러는 종종 ExitCodes를 이용한다. 대부분의 스케쥴러들은 프로세스 레벨에서만 동작한다. 이 의미는 그들이 실행한 쉘스크립트의 시스템 프로세스만 알고 있다는 것이다. 이러한 시나리오에서 작업의 성공 및 실패 여부를 전달할 수 있는 유일한 방법이 코드를 리턴하는 것이다. 리턴 코드는 숫자이며 스케쥴러는 프로세스로 부터 그 숫자를 전달받고, 실행의 결과를 인지한다. 가장 단순한 케이스는 0은 성공이며 1은 실패이다. 그러나 좀 더 복잡한 케이스를 위해 ExitCodeMapper 인터페이스가 제공된다. 기본적으로 SimpleJVMExitCodeMapper를 제공한다.


Running Jobs from within a Web Container


사적으로 배치 작업은 커맨드 라인으로 오프라인 프로세싱으로 처리되어 왔다. 하지만 많은 경우에서는 HttpRequest에 의해 구동되는 것이 좋은 옵션이 될 수도 있다. 그러한 사례는 리포팅, 애드훅 작업 수행, 웹 어플리케이션 지원 들이다. 배치 작업은 롱텀이기 떄문에 이러한 경우 작업은 비동기로 수행되게 된다. 아래는 스프링 MVC와 결합해 작업을 수행한 예이다.




@Controller
public class JobLauncherController {
@Autowired
private Job job;

@Autowired
private JobLauncher jobLauncher;

@RequestMapping("/launchJob")
public void handle() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
jobLauncher.run(job, new JobParameters());
}
}





반응형

'Programming > Spring Batch' 카테고리의 다른 글

Spring Batch - Step 정의  (1) 2015.01.23
Spring Batch - 메타데이터 이용  (0) 2015.01.22
Spring Batch - JobRepository 구성하기  (0) 2015.01.22
Spring Batch - 작업정의  (1) 2015.01.22
Spring Batch - 도메인  (1) 2015.01.21
반응형

JobRepository 구성하기


JobRepository는 spring batch의 jobExecution과 stepExecution과 같은 domain 오브젝트를 저장하기 위한 CRUD 기능을 위해 사용된다고 언급했다.

JobRepository는 JobLauncher, Job, Step과 같이 프레임워크는 주요한 특징이다. 


<batch:job-repository id="jobRepository"
data-source="dataSource"
transaction-manager="transactionManager"
isolation-level-for-create="SERIALIZABLE" table-prefix="BATCH_"
max-varchar-length="1000"/>


Transaction Configuration for the JobRepository


네임스페이스를 이용하면, transaction 어드바이스는 repository에 자동적으로 적용된다. 실패후 재시작되었을 때 상태를 포함한 메타데이터를 정확하게 하기 위함이다.

작업이 구동되었을 때 create 작업을 수행하는 메소드에 대한 isolation level은 기본적으로 SERIALIZABLE이다. 이는 두개의 프로세서가 동시에 같은 작업을 구동했을때, 오직 하나만 성공할 수 있도록 하기 위함이다. 그러나 다른 isolation level 또한 지정할 수 있는데, READ_COMMITTED의 경우 잘 동작할 것이나, READ_UNCOMMITED의 경우에는 두개의 프로세스가 동시에 작업을 구동할 경우를 제외하곤 잘 동작할 것이다. 네임스페이스에는 아래와 같은 내용을 포함하고 있다고 생각하면 된다.


<aop:config>
<aop:advisor pointcut="execution(* org.springframework.batch.core..*Repository+.*(..))"
advice-ref="txAdvice"/>
</aop:config>
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>


Changing Table prefix


기존 비즈니스적으로 사용하는 테이블과 구분하기 위해서, table prefix도 변경할 수 있다.


In-Memory Repository


굳이 데이터베이스에 메타데이터를 기록하고 싶지 않다면, 메모리에 메타데이터를 저장할 수도 있다. 설정은 아래와 같이 수행하면 된다.

<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager"/>
</bean>


만약 RDBMS를 사용하지 않더라도 SimpleJobRepository에 다양한 Dao를 구현하여 구성한다면 다양한 방식으로 커스터마이징할 수 있다.


Configuring a JobRepository


일단 In-Memory의 경우는 별도의 설치과정이 필요 없지만, RDBMS를 JobRepository로 사용하기 위해서는 테이블을 생성해야 한다. 생성 테이블 스크립트는 spring-batch-core-<version>.jar에 압축하여 같이 제공되고 있다. schema-<db제품명>.sql으로 제공된 파일을 이용하여 스키마를 생성하자.

실제적으로 수행해보면 아래와 같이 실행이력들이 잘 저장되는 것을 확인할 수 있다.



반응형

'Programming > Spring Batch' 카테고리의 다른 글

Spring Batch - 메타데이터 이용  (0) 2015.01.22
Spring Batch - 작업실행  (1) 2015.01.22
Spring Batch - 작업정의  (1) 2015.01.22
Spring Batch - 도메인  (1) 2015.01.21
Spring Batch - 소개  (0) 2015.01.16
반응형





Job은 step의 단순한 컨테이너처럼 보이는 반면에 개발자들이 반드시 알아야 할 많은 옵션들이 있다.

또한 작업을 어떻게 실행되는지 실행시 메타데이터가 어떻게 저장되는지에 대한 고려도 필요하다.

이번에는 job에 관련된 다양한 옵션과 실행시 고려사항에 대해서 설명한다.


작업 정의하기


Job 인터페이스에는 다양한 구현체들이 존재한다. 그러나 네임스페이스 추상화를 통해 정의의 차이점을 없애준다.

작업을 정의하기 위해서 단 세가지의 필수 요소(이름, jobRepository, step들)만이 필요하다.

다음 예제는 parent 정의를 사용하여 step들을 생성하기 위한 예제이다.

job-repository는 필수요소이지만, 암묵적으로 명시하지 않으면 jobReposity라는 id를 가진 빈을 참조한다.


<batch:job id="footballJob" job-repository="jobRepository">
<batch:step id="playerload" parent="s1" next="gameLoad"/>
<batch:step id="gameLoad" parent="s2" next="playerSummarization"/>
<batch:step id="playerSummarization" parent="s3"/>
</batch:job>


Restartablility

Restartablibilty에서 하나의 이슈는 배치 작업이 재시작이 되었을 때가 고려되어 있는지이다.  만약에 jobInstance에 대한 jobExecution이 이미 존재를 한다면 작업은 구동될 때 재시작을 고려한다. 이상적으로는 모든 작업은 재시작 될 수 있어야 하지만, 모든 시나리오에서 가능한 것은 아니다. 이러한 경우에서는 jobInstance가 새로 생성된다는 보는게 일반적일 것이다. 스프링 배치에서는 작업이 재시작 처리를 할 수 있도록 몇가지 도움을 준다. 그러나 만약 작업이 절대로 재시작 되어서는 안되고, 새로운 jobInstance가 생성되어야 한다면, restartable 속성을 false로 세팅하여야 한다.

<batch:job id="footballJob" job-repository="jobRepository" restartable="false">

Intecepting Job Exectuion


작업이 실행되는 과정에 있어서 라이프사이클에 대한 이벤트를 통지받을 수 있다면 매우 유용한다.

SimpleJob은 적절한 시점에 jobListener을 호출하여 이러한 작업을 가능하게 한다.


/*
* Copyright 2006-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core;

/**
* Provide callbacks at specific points in the lifecycle of a {@link Job}.
* Implementations can be stateful if they are careful to either ensure thread
* safety, or to use one instance of a listener per job, assuming that job
* instances themselves are not used by more than one thread.
*
* @author Dave Syer
*
*/
public interface JobExecutionListener {

/**
* Callback before a job executes.
*
* @param jobExecution the current {@link JobExecution}
*/
void beforeJob(JobExecution jobExecution);

/**
* Callback after completion of a job. Called after both both successful and
* failed executions. To perform logic on a particular status, use
* "if (jobExecution.getStatus() == BatchStatus.X)".
*
* @param jobExecution the current {@link JobExecution}
*/
void afterJob(JobExecution jobExecution);

}

<batch:job id="footballJob" job-repository="jobRepository" restartable="false">
<batch:step id="playerload" parent="s1" next="gameLoad"/>
<batch:step id="gameLoad" parent="s2" next="playerSummarization"/>
<batch:step id="playerSummarization" parent="s3"/>
<batch:listeners>
<batch:listener ref="sampleListener"/>
</batch:listeners>
</batch:job>


Inheriting from a Parent Job


작업의 그룹에서 비슷하게 공유할 수 있는 설정이 있다면 작업 정의에 대한 설정을 상속받아 사용할 수 있다.

상속받는 방식은 일반적인 스프링에서의 설정 상속과 동일하다.


<batch:job id="baseJob" abstract="true">
<batch:listeners>
<batch:listener ref="listenerOne"/>
</batch:listeners>
</batch:job>

<batch:job id="job1" parent="baseJob">
<step id="step1" parent="standaloneStep"/>
<batch:listeners merge="true">
<batch:listener ref="listenerTwo"/>
</batch:listeners>
</batch:job>


반응형

'Programming > Spring Batch' 카테고리의 다른 글

Spring Batch - 작업실행  (1) 2015.01.22
Spring Batch - JobRepository 구성하기  (0) 2015.01.22
Spring Batch - 도메인  (1) 2015.01.21
Spring Batch - 소개  (0) 2015.01.16
Spring Batch - 시작하기  (0) 2015.01.16
반응형


아래 그림은 배치의 도메인을 가장 중요한 개념들만 나열한 것이다.

Job은 다양한 Step으로 구성되며, 하나의 ItemReader, ItemProcessor 및 ItemWriter를 가진다.

Job에 대한 수행 및 메타데이터를 관리하기 위해 JobLauncher와 JobRepository과 필요하다.



Job






스프링 배치에서 job이란 step을 위한 컨테이너이다. job는 플로우에 논리적으로 함께 위치한 다수의 step으로 구성된다.

재시작과 같은 모든 step에 공통적인 프로퍼티 설정을 적용할 수 있다. job 정의는 다음을 포함한다.


- 작업의 이름

- 스텝의 정의 및 순서

- 작업의 재시작 가능 여부


스프링에서 제공하는 job 인터페이스의 가장 단순한 구현체로서 가장 표준적인 기능으로 SimpleJob 클래스를 제공한다.

하지만 배치 네임스페이스 추상화를 통해 직접적으로 초기화할 필요가 있다. <job> 태그로 정의 할 수 있다.


JobInstance


Jobinstance은 job의 논리적인 실행이다. 일마감시 한번 수행하는 배치 작업을 생각해보다.

위의 다이어그램에서 EndOfDay job은 하나이지만 작업에 대한 개별적인 실행은 분리해서 추적이 되어야 한다.

이러한 경우에 매일 하나의 논리적인 jobInstance가 생성된다.

예를 들어 1월1일 실행, 1월 2일 실행은 각각 jobInstance가 된다. 

만약 1월1일 실행이 첫번째 실패하고, 다음날 다시 실행했다면 그건 1월 1일 실행이다.

그러므로 각 jobInstance는 다수의 executions를 가질 수 있으며, 하나의 jobInstance는 특정한 job에 일치하며, 

실행 시점에 주어진 jobParamters에 의해 식별될 수 있다.


JobParameters





Job과 jobInstance의 다른 점을 논할 때 근본적인 질문은 다음과 같다.

"하나의 jobInstance는 다른 것들과 어떻게 구분되어지나?"

그 답이 jobParameters이다. JobParamters는 배치 작업이 시작할 때 사용되는 파라미터의 집합이다.

그들은 실행 시 참조 데이터 혹은 식별자로 활용된다.


주의 - 모든 job 파라미터가 jobInstance를 식별하는데 필요하진 않다.


JobExecution


JobExecution은 작업 실행을 위한 단일 시도에 대한 기술적인 컨셉이다. 실행은 성공이나 실패가 될 수 있다.

1월 1일 작업을 위한 JobInstance를 고려해 보면 첫 수행에서 실패하였고, 만약 동일한 job parameters로 다시 실행을 한다면 새로운 jobExecution이 생성되게 된다. 이때도 jobInstance는 하나이다. 작업은 어떻게 수행되는지를 명시하고, jobInstance는 함께 수행 및 재수행에 사용될 오브젝트들의 조합으로 명시된다. 그러나 jobExecution은 실제적인 실행시에 생성되고 저장된다. jobExecution을 식별하기 위해서는 특별한 프로퍼티 status, startTime, endTime, exitStatus, createTime, lastUpdated, executionContext, failureExceptions 등을 가진다.



Step


Step은 배치 작업의 독립적이고 순서가 있는 수행을 캡슐화한 도메인 오브젝트이다. 그러므로 모든 작업은 하나 또는 그 이상의 step으로 구성된다.

Step은 실제적인 배치 수행이 정의된 모든 정보를 포함하고 있다. Step은 개발자의 요구에 따라 간단할수도 복잡할 수도 있다.

파일에서 데이터를 읽어서 데이터베이스로 적재하는 단순한 작업은 경우 코드가 거의 없거나 아에 없을수도 있다.



StepExecution





StepExecution은 하나의 step을 수행하기 위한 단일 시도를 표현한다. 새로운 stepExecution은 jobExecution과 마찬가지로 step이 실행될 때마다 생성된다.

하지만 이전 step이 실패한다면, 해당 step은 저장되지 않을 것이다. StepExecution은 실제적으로 step이 시작할 때에만 생성된다.

각 execution은 해당하는 step과 jobExecution에 references를 가지고 transaction 데이터에 대한 정보도 가진다.

추가적으로 각 stepExecution은 배치 작업 전반에 저장되는 정보를 유지하는 executionContext를 정보를 참조한다.


ExecutionContext


ExecutionContext는 프레임웍에 의해 저장되고 컨트롤되는 key/value상의 collection을 대표한다.

ExecutionContext는 stepExecution이나 jobExecution상에서 개발자에 제공되기 위해 존재한다.

Quartz에서 제공되는 jobDataMap과 굉장히 유사하다. 가장 유용한 예는 리스타트 기능이다.

file 입력을 사용하는 예에서 수행중인 개별 라인을 commit point마다 저장을 한다면 다음 수행시 itemReader는 해당 부분을 스킵할 수 있을 것이다.


JobRepository


JobRepository는 위에 언급된 모든 스테레오타입의 저장 매커니즘이다. JobLauncher, job, step의 구현체들에서 CRUD 기능을 제공한다. Job이 처음 구동될 때, jobExecution은 리파지토리로 부터 생성되며, stepExecution과 jobExecution의 수행하는 과정이 repository로 전달된다.


JobLauncher 


JobLauncher는 주어진 jobParamters로 작업을 구동하는 단순한 인터페이스 기능을 한다. JobRepository로부터 유효한 jobExecution을 획득하고 작업을 실행한다.


ItemReader


ItemReader는 step을 위한 input를 검색하는 기능을 담당한다. ItemReader는 제공할 item을 다 소진했을때, null을 리턴함으로서 이 상태를 알릴 수 있다.


ItemWriter

ItemWriter는 step에 추력에 대한 추상화된 표현. 일반적으로 itemWriter는 다음 입력될 데이터에 대한 내용을 알지 못하며, 전달된 아이템에 대한 처리를 수행한다.

ItemProcessor

ItemProcessor는 아이템 하나의 비즈니스 처리에 대한 추상화된 표현이다. ItemReader는 하나의 아이템을 읽고 itemWriter는 쓰지만, itemProcessor는 아이템을 변형하고 다른 비즈니스 처리를 수행한다. 만약 item 처리 중 아이템이 적절하지 않아서 null를 리턴한다면 itemWriter에 그것을 쓰지 않을 것이다. 



반응형

'Programming > Spring Batch' 카테고리의 다른 글

Spring Batch - 작업실행  (1) 2015.01.22
Spring Batch - JobRepository 구성하기  (0) 2015.01.22
Spring Batch - 작업정의  (1) 2015.01.22
Spring Batch - 소개  (0) 2015.01.16
Spring Batch - 시작하기  (0) 2015.01.16
반응형

스프링 배치 소개


엔터프라이즈 환경에서는 많은 어플리케이션은 미션 크리티컬한 비즈니스 수행을 위한 bulk 처리를 요구한다. 대용량 데이터의 자동화되고 복잡한 비즈니스 수행은 유저 상호작용 없는 처리되는 것이 매우 효율적이다.  이러한 작업은 시간 기반의 이벤트(월말 결산, 공지 및 보고), 복잡한 비즈니스 룰을 주기적으로 수행하는 작업(보험 이율 결산, 이율 조정) 또는 내부/외부 시스템으로의 대량 데이터의 송수신 등을 목적으로 한다. 배치 처리는 엔터프라이즈 환경에서 매일 수백만건의 트랜잭션의 처리를 수행한다. 스프링 배치는 엔터프라이즈 시스템에서 강력한 배치 어플리케이션을 개발할 수 있게 디자인된 가볍고 이해하기 쉬운 배치 프레임워크 이다. 스프링 배치는 POJO 기반의 생산성이 뛰어나며 스프링 프레임워크의 지식을 가진 사람들이 쉽게 이해할 수 있다. 또한 진화된 엔터프라이즈 서비스에서 필요한 다양한 요소들에 대한 접근을 쉽도록 제공한다. 스프링 배치는 스케쥴링 프레임워크가 아니다. 좋은 엔터프라이즈 스케쥴러(Quartz, Tivoli, Control-M)가 존재한다.  스케쥴러와 연계하여 수행하지 스케쥴러가 아니라는 의미이다. 스프링 배치는 대용량의 데이터를 처리하는 필수적이고 재사용한 기능(로깅/트레이싱, 트랜잭션 관리, 작업 처리 통계, 작업 재시작, 스킵 및 리소스 관리)을 제공한다. 또한 극한의 대량과 성능을 목표로 배치작업의 최적화 및 분산처리를 위해 진화된 기술 서비스와 아키텍처를 제공한다. 대용량의 정보 처리를 위한 확장 환경의 복잡하고 대량의 배치 작업을 단순하게 하기 위해서는 프레임워크가 효과적일 수 있다.



스프링 배치 아키텍처


스프링 배치는 확정성 있고 다양한 사용자 그룹을 고려하여 설계 되었다. 아래의 그림은 레이어 아키텍처의 스케치이다.






이 레이어 아키텍처는 세가지 중요한 요소는 어픝리케이션, 코어와 인프라스트럭쳐이다. 어플리케이션은 스프링 배치를 사용하는 개발자에 의해 작성된

모든 배치 작업 및 커스텀 코드들을 포함한다. 배치 코어는 배치 작업을 컨트롤하고 실행하는데 필요한 코어 런타임 클래스들을 포함한다.

이것은 JobLauncher, Job, Step의 실제 구현체를 포함한다. 어플리케이션과 코어는 공통 인프라스트럭쳐 기반위에 생성되었다.

인프라스트럭쳐는 공통적인 리더와 라이터 그리고 RetryTemeplate과 같은 서비스들을 포함한다.


일반적인 배치 원칙 및 가이드라인

배치 아키텍처는 온라인 아키텍처에 영향을 주고 반대로 받기도 한다. 두 아키텍처와 환경은 가능한 공통 기능을 염두해두어야 한다.

가능한 단순하게 복잡한 논리적인 구조를 지양해야 한다.

특히 IO와 같은 시스템 리소스의 사용을 최소화하다. 가능한 많은 오퍼레이션을 내부 메모리에서 수행하도록 하자.

불필요한 물리적인 IO를 피하기 위해 어플리케이션 IO를 확인하자. 특히 다음 4가지 공통 플로우에 대한 확인이 필요하다.
- 모든 트랜잭션에서 읽어드린 데이터는 동작에 따라 캐슁을 고려하자.
- 같은 트랜잭션에서 
- 불필요한 테이블이나 인덱스 스캔을 고려하다.
- SQL WHERE절에 키 값을 명시하지 말자.

배치 실행시 두번 일을 하지 말라. 예를 들어 리포팅 목적으로 데이터를 요약하고, 합계를 증가시켜서 저장해야 한다고 하면, 같은 데이터를 두번 처리하지 않게 하자.

충분한 메모리를 할당하자. 메모리 재할당으로 쓸데없는 수행시간 감소를 피하도록하자.

항상 데이터 정합성을 고려해라. 적당한 시점에 입력 체크와 레코드 검증을 수행하라.

내부적인 검증을 위해 checksums를 구현해라. 예를 들어 SAM 파일에 최종건수등을 trailer에 명시하라.

운영환경과 비슷한 환경에서 실제 데이터량에 준하는 부하 테스트를 수행하고 계획하라.

백업도 도전이다. 특히 시스템이 24시간 온라인으로 동작하고 있다면, 데이터베이스 백업은 특히 온라인 설계를 고려하여야 한다. 



출처 : 스프링 배치 레퍼런스 매뉴얼

반응형

'Programming > Spring Batch' 카테고리의 다른 글

Spring Batch - 작업실행  (1) 2015.01.22
Spring Batch - JobRepository 구성하기  (0) 2015.01.22
Spring Batch - 작업정의  (1) 2015.01.22
Spring Batch - 도메인  (1) 2015.01.21
Spring Batch - 시작하기  (0) 2015.01.16
반응형

오늘은 스프링 배치를 처음으로 알아보도록 하겠다. 흠 요즘 일은 안하고 너무 리서치만 하는게 아닌가 싶다. 근데 지금이 딱 좋은게 아닐까 스킬업을 하기엔..ㅋ 일단 다음과 같이 pom파일을 만들어 보자. 아래와 같은 디펜던시를 가지고 진행을 하겠다.


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework</groupId>
<artifactId>spring-batch-test</artifactId>
<version>0.1.0-SNAPSHOT</version>
<properties>
<jdk.version>1.6</jdk.version>
<spring.version>4.0.5.RELEASE</spring.version>
<spring.batch.version>3.0.2.RELEASE</spring.batch.version>
<junit.version>4.11</junit.version>
</properties>
<dependencies>
<!-- Spring XML to/back object -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>${spring.version}</version>
</dependency>

<!-- Spring Batch dependencies -->
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>${spring.batch.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-infrastructure</artifactId>
<version>${spring.batch.version}</version>
</dependency>

<!-- Spring Batch unit test -->
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<version>${spring.batch.version}</version>
</dependency>

<!-- Junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

일반적으로 배치 작업 프로세스를 간단하게 생각해보면 입력->프로세스->출력 이다.

아래와 같이 spring bean definition 파일을 작성하자.


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">

<batch:job id="sampleJob">
<batch:step id="step01">
<batch:tasklet>
<batch:chunk reader="cvsItemReader" writer="xmlItemWriter"
processor="itemProcesser" commit-interval="1"/>
</batch:tasklet>
</batch:step>
</batch:job>

<bean id="cvsItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="classpath:cvs/input/report.csv"/>
<property name="lineMapper" ref="lineMapper"/>
</bean>

<bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names" value="id,sales,qty,staffName,date"/>
</bean>
</property>
<property name="fieldSetMapper">
<bean class="hello.ReportFieldSetMapper"/>
</property>
</bean>

<bean id="itemProcesser" class="hello.CustomItemProcessor"/>

<bean id="xmlItemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" value="file:xml/output/report.xml" />
<property name="marshaller" ref="reportMarshaller" />
<property name="rootTagName" value="report" />
</bean>

<bean id="reportMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
<property name="classesToBeBound">
<list>
<value>hello.Report</value>
</list>
</property>
</bean>

<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager"/>
</bean>

<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

<bean class="org.springframework.batch.test.JobLauncherTestUtils"/>

<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
</beans>


먼저 적정한 위치에 입력파일을 만든다. sample.csv의 내용은 다음과 같다.


1001,"213,100",980,"mkyong", 29/7/2013
1002,"320,200",1080,"staff 1", 30/7/2013
1003,"342,197",1200,"staff 2", 31/7/2013


cvsItemReader를 통해서 읽어들이게 되고 해당 파일을 조작하기 쉽도록 도메인 객체를 하나 만들자. 그리고 읽어들인 fieldSet을 도메인 객체로 변환하는 Mapper클래스도 하나 만들어 보자.


package hello;


import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import java.math.BigDecimal;
import java.util.Date;

@XmlRootElement(name = "record")
public class Report {

private int id;
private BigDecimal sales;
private int qty;
private String staffName;
private Date date;

@XmlAttribute(name = "id")
public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

@XmlElement(name = "sales")
public BigDecimal getSales() {
return sales;
}

public void setSales(BigDecimal sales) {
this.sales = sales;
}

@XmlElement(name = "qty")
public int getQty() {
return qty;
}

public void setQty(int qty) {
this.qty = qty;
}

@XmlElement(name = "staffName")
public String getStaffName() {
return staffName;
}

public void setStaffName(String staffName) {
this.staffName = staffName;
}

public Date getDate() {
return date;
}

public void setDate(Date date) {
this.date = date;
}

@Override
public String toString() {
return "Report [id=" + id + ", sales=" + sales
+ ", qty=" + qty + ", staffName=" + staffName + "]";
}

}


package hello;

import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

import java.text.ParseException;
import java.text.SimpleDateFormat;


/**
* Created by devsun on 15. 1. 15..
*/
public class ReportFieldSetMapper implements FieldSetMapper<Report> {
private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");

@Override
public Report mapFieldSet(FieldSet fieldSet) throws BindException {

Report report = new Report();
report.setId(fieldSet.readInt("id"));
report.setSales(fieldSet.readBigDecimal("sales"));
report.setQty(fieldSet.readInt("qty"));
report.setStaffName(fieldSet.readString("staffName"));

//default format yyyy-MM-dd
//fieldSet.readDate(4);
String date = fieldSet.readString("date");

try {
report.setDate(dateFormat.parse(date));
} catch (ParseException e) {
e.printStackTrace();
}

return report;

}
}


다음으로 만들 클래스를 프로세서 클래스이다. 입력된 데이터를 출력할 데이터로 가공하는 부분이다. 일단 여기에서는 특별히 수행할 내용이 없으므로 아무런 기능없는 클래스를 만들겠다.


package hello;

import org.springframework.batch.item.ItemProcessor;

/**
* Created by devsun on 15. 1. 15..
*/
public class CustomItemProcessor implements ItemProcessor<Report, Report> {
@Override
public Report process(Report report) throws Exception {
return report;
}
}


출력에 사용될 빈은 xmlItemWriter이다. 해당 빈은 xml 어노테이션 정보를 이용하여, 도에민 객체를 xml로 변환하는 역할을 수행한다. 위에 설정이 참고하기 바란다.


마지막으로 테스트를 수행하기 위한 런쳐 클래스를 생성하도록 하겠다. 어플리케이션을 로딩하고 


package hello;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
* Created by devsun on 15. 1. 15..
*/
public class Application {
public static void main(String[] args) {
String[] springConfig =
{
"classpath:spring-context.xml"
};

ApplicationContext context =
new ClassPathXmlApplicationContext(springConfig);

JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean("sampleJob");

try {
JobExecution execution = jobLauncher.run(job, new JobParameters());
System.out.println("Exit Status : " + execution.getStatus());
} catch (Exception e) {
e.printStackTrace();
}

System.out.println("Done");
}
}


수행결과는 다음과 같다.


<?xml version="1.0" encoding="UTF-8"?><report><record id="1001"><date>2013-07-29T00:00:00+09:00</date><qty>980</qty><sales>213100</sales><staffName>mkyong</staffName></record><record id="1002"><date>2013-07-30T00:00:00+09:00</date><qty>1080</qty><sales>320200</sales><staffName>staff 1</staffName></record><record id="1003"><date>2013-07-31T00:00:00+09:00</date><qty>1200</qty><sales>342197</sales><staffName>staff 2</staffName></record></report>



참고자료 : http://www.mkyong.com/



반응형

'Programming > Spring Batch' 카테고리의 다른 글

Spring Batch - 작업실행  (1) 2015.01.22
Spring Batch - JobRepository 구성하기  (0) 2015.01.22
Spring Batch - 작업정의  (1) 2015.01.22
Spring Batch - 도메인  (1) 2015.01.21
Spring Batch - 소개  (0) 2015.01.16

+ Recent posts