Spring Batch에서 로직을 처리하다보면 첫 번째 Step에서 처리한 작업을 다음 Step에서 사용하고 싶은 요구가 생기게 된다. 보통 chunk-oriented 처리를 할 때 이런 요구가 생긴다. 쓰기 작업에서는 CompositeItemWriter와 같이 2개의 쓰기를 할 수 있는 객체가 존재하지만 Reader에는 그런 기능이 있는 객체가 없기 때문이다.
그런데 Job에서는 Step끼리 직접 호출하여 데이터를 주고 받을 수 있는 기능을 제공하지 않기 때문에 데이터를 공유하기 위해 우회하여 전달하는 방법을 사용해야 한다.
방법1. 스프링 공식 문서에서 추천하는 방법
첫 번째로 소개할 방법은 스프링 공식 문서에서 추천해주고 있는 방식이다. ExecutionContextPromotionListener 객체를 사용해 StepExecutionContext를 JobExecutionContext로 승격시키는 방법이다.
ExecutionContext
Spring Batch에서 ExecutionContext의 종류는 크게 2가지다. StepExecutionContext와 JobExecutionContext다. Step의 ExecutionContext는 하나의 Step에만 종속된다. Step이 끝나면 같이 사라진다는 얘기다. Job의 ExecutionContext 또한 Job과 수명을 함께 한다.
현재 진행하는 Step이 다음 진행할 Step에게 전달하고 싶다면 어떻게 하면 될까? Job ExecutionContext에 값을 저장하면 된다. Job ExecutionContext는 모든 Step이 실행될 때까지 살아있으니깐? (그러면 ExecutionContextPromotionListener을 사용할 게 아니라 JobExecutionContext를 사용하는 게 낫지 않냐고 얘기할 수도 있지만 그러지 않는 것이 좋다. 그 이유는 뒤에서 설명하겠다.)
ExecutionContextPromotionListener
- Spring Batch가 제공하는 객체로 Step이 완료되는 시점에 StepExecutionContext 중 ExecutionContextPromotionListener 설정한 키 값에 대해 Job ExecutionContext로 승격(promotion)시켜주는 구현체다.
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"member"});
return listener;
}
코드 예시
- chunk-oriented로 Step을 구성하고 itemReader와 itemWriter를 활용해 저장할 수도 있지만 Tasklet을 사용해 간단하게 구성해보겠다.
- @BeforeStep을 통해 Tasklet이 실행되기 전, Step Execution을 가져온다.
- Tasklet이 실행될 때, 필요한 값을 구성해 Step Execution에 저장하도록 하자.
@Bean("memberTasklet")
public Tasklet memberTasklet() {
return new Tasklet() {
private StepExecution stepExecution;
@Override
public RepeatStatus execute(
StepContribution contribution, ChunkContext chunkContext) throws Exception {
Member member = memberService.find();
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("member", member);
return RepeatStatus.FINISHED;
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
};
}
- Step을 등록할 때 주의할 점!
- @BeforeStep이나 @AfterStep 어노테이션을 사용하려면 Step의 listener() 메소드를 사용해 리스너로 등록해줘야 한다!
- 자세한 내용은 해당 링크 참조
@Bean("memberStep")
public Step memberStep(
final JobRepository jobRepository, final PlatformTransactionManager transactionManager) {
return new StepBuilder("memberStep", jobRepository)
.tasklet(memberTasklet(), transactionManager)
.listener(memberTasklet())
.listener(promotionListener())
.build();
}
- ExecutionContextPromotionListener는 다음 Step으로 전달하고 싶은 키 값을 저장해 넘겨주면 된다.
- Step의 listener로 ExecutionContextPromotionListener를 등록해주면 된다.
- 이제 Step이 종료되면 자동으로 Job Execution에 구성한 값이 저장된다.
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"member"});
return listener;
}
- 이제 다음 Step의 ItemWriter에서 JobExecution을 사용해 컨텍스트 값을 가져올 수 있다.
@Bean("orderWriter")
public ItemWriter<Order> orderWriter() {
return new ItemWriter<Order>() {
private Member member;
@Override
public void write(final Chunk<? extends Order> chunk) throws Exception {
orderService.registerOrder(this.member, chunk);
}
@BeforeStep
public void retrieveInterStepData(StepExecution stepExecution) {
final JobExecution jobExecution = stepExecution.getJobExecution();
final ExecutionContext jobContext = jobExecution.getExecutionContext();
this.member = (Member) jobContext.get("member");
}
};
}
방법2. JobExecution에 저장하기
간단하고 실용적인 방법이다.
사용하지 말아야 할 이유
하지만 이 방식은 위에서 간단히 얘기했다시피 추천하지 않는 방식이다. 그 이유는 아래와 같다.
- Step 실행 중 JobExecutionContext에 값을 저장하더라도 Step이 실패하는 경우 데이터가 유실될 수 있다.
- 이 방법은 Step이 JobExecutionContext에 강결합하게 된다. 따라서 다른 Job에서 Step 구현체를 재사용하기 어려워진다.
코드 예시
- 간단하다. 데이터를 저장할 Step에서 JobExecutionContext에 데이터를 저장하고 다음 스텝에서 JobExecutionContext에서 데이터를 가져와 사용하면 된다.
@Bean
public Step memberStep() {
return stepBuilderFactory.get("memberStep")
.tasklet((contribution, chunkContext) -> {
StepContext stepContext = chunkContext.getStepContext();
JobExecution jobExecution = stepContext.getStepExecution().getJobExecution();
Member member = memberService.getMember();
jobExecution.getExecutionContext().put("key", member);
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step orderStep() {
return stepBuilderFactory.get("orderStep")
.tasklet((contribution, chunkContext) -> {
StepContext stepContext = chunkContext.getStepContext();
JobExecution jobExecution = stepContext.getStepExecution().getJobExecution();
Member member = (Member) jobExecution.getExecutionContext().getString("dataKey");
...
return RepeatStatus.FINISHED;
})
.build();
}
방법3. Singleton Bean 사용하기
ExecutionContext와 Serialize의 압박
Step 간에 데이터를 공유할 때 ExecutionContext를 사용하면 발생하는 치명적인 문제가 하나있다. 바로 ExecutionContext에 저장할 객체는 직렬화된다는 것이다. 이게 왜 치명적인 문제냐? ExecutionContext에 데이터를 저장하면서 json string 형태로 변환하는데 이 비용이 생각보다 크다고 한다.
ExecutionContext에 담을 객체가 크지 않다면 별로 문제가 되지 않지만 크기가 너무 크다면 성능 이슈가 발생할 수 있다. 이 때 사용할 수 있는 것이 스레드 세이프한 자료구조를 가진 Singleton Bean이다.
코드 예시
@Component
public class SharedData<T> {
private final Map<String, T> values = new ConcurrentHashMap<>();
public void putData(String key, T data) {
values.put(key, data);
}
public T getData (String key) {
return values.get(key);
}
public int getSize () {
return values.size();
}
}
스레드 세이프한 자료구조를 사용하지 않을 경우
Chunk-Oriented 지향 처리 중 Singleton Bean의 데이터가 오염되어 작업이 망가질 가능성이 있다.
예를 들어 단체 문자를 발송하는 Schduler Job이 있다고 해보자.
이 작업이 실행되면 20분 이상 소요되지만 단체 문자를 발송하는 일이 자주 있지는 않다. 그래도 단체 문자를 등록하면 최대한 빨리 발송하는 것이 좋기 때문에 5분마다 작업이 있는지 확인한다.
첫 번째 요청이 들어왔다. Job은 단체 발송 작업이 들어온 것을 캐치해 발송 내용을 Singleton Bean에 담고 다음 스텝에서 chunk-oriented 지향 처리를 통해 순차적으로 문자를 발송하기 시작했다.
첫 번째 작업이 끝나지 않았는데 다음 단체 발송 문제 요청이 연달아 들어왔다. 두 번째 Job은 첫 번째 Job이 사용하고 있던 Singleton Bean에 새로운 발송 내용을 담게될 것이다. Singleton Bean이 오염되었기 때문에 첫 번째 작업은 그 뒤부터는 두 번째 작업과 동일한 문자를 보내게 된다.
참고자료
- 스프링 공식문서
- 스프링 배치(스프링 Boot 기반) 삽질기 3탄 - 싱글톤 빈을 이용한 step간 데이터 공유
- How to Pass Data Among Steps in a Spring Batch Application