Spring/Spring Batch

[Spring Batch] 5장 : Step 설정하기 (Configuring a Step) 1

공대키메라 2023. 12. 5. 21:04

지난  시간에는 Job을 설정하고 실행하는 여러 방법에 대해 알아보았다.

(지난 내용이 궁금하면 여기 클릭!)

 

이번에는 Step을 어떻게 설정하는지에 대한 글을 읽어볼 것이다.

 

와우! 너무 기대되는걸?

 

꿀잼 ^^

출처 : https://event.leagueoflegends.co.kr/beemo-Teemo/

 


 

1. Configuring a Step

domain chapter에서 논의된 Step은 배치잡의 독립적으로 연속적인 절(phase)를 캡슐화한 도메인 객체로 실제 배치 처리를 정의하고 제어하는데 필요한 모든 정보를 포함한다. 모든 주어진 Step의 내용물은 Job을 쓰는(writing) 개발자의 재량에 달려있기 때문에 이것은 필연적으로 모호한 묘사다(개발자들이 각자 만들어 쓰는 것이라 Step에 대한 의미가 애매할 수 밖에 없다는 것 같다!). Step은 개발자가 요구하는것에 따라 간단할 수도 복잡할 수도 있다. 간단한 Step은 간단한 요구사항과 코드 없이 파일에서 데이터베이스로 데이터를 로드할 수 있다. 더 복잡한 Step은 다음 그림 처럼 처리 부분에 적용되는 복잡한 비즈니스 규칙을 가질 지도 모른다. 

 

Figure 1. Step

 

2. Chunk-oriented Processing

Spring Batch는 가장 흔한 구현체로 "chunk-oriented"(청크 기반) 처리 스타일을 사용한다. Chunk oriented 처리는 트랜잭션 경계 내에 쓰여진 'chunk'를 생성하고 한 번에 데이터 하나를 읽는것을 말한다. item read의 수가 commit 간격과 동일하다면 전체 chunk는 ItemWritter에 의해 쓰여지고 나서 transaction이 commit된다. 

 

* Spring Batch에서의 Chunk란 데이터 덩어리로 작업 할 때 각 커밋 사이에 처리되는 row 수를 얘기합니다.
즉, Chunk 지향 처리란 한 번에 하나씩 데이터를 읽어 Chunk라는 덩어리를 만든 뒤, Chunk 단위로 트랜잭션을 다루는 것을 의미합니다.

(출처 : https://tech-monster.tistory.com/manage/newpost/?type=post&returnURL=%2Fmanage%2Fposts%2F# )

 

 

다음 예시 코드가 간단한 형식의 같은 개념을 보여준다.

 

List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
    Object item = itemReader.read();
    if (item != null) {
        items.add(item);
    }
}
itemWriter.write(items);

 

코드를 보면 한번 읽어보면...

item read의 수가 commit 간격과 동일하다면 전체 chunk는 ItemWritter에 의해 쓰여지고 나서 transaction이 commit
한다 라는 말이 코드로 나와 있다.

나닛? 이렇게 직관적일 수가!?!?

 

또한 ItemWriter에 넘기기 전에 item들을 처리하는 ItemProcessor를 선택적으로 chunk-oritented step에 설정할 수 있다.

 

Figure 3. Chunk-oriented Processing with Item Processor

 

다음 예시 코드는 간단한 형식으로 어떻게 구현되는지 보여준다.

 

List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
    Object item = itemReader.read();
    if (item != null) {
        items.add(item);
    }
}

List processedItems = new Arraylist();
for(Object item: items){
    Object processedItem = itemProcessor.process(item);
    if (processedItem != null) {
        processedItems.add(processedItem);
    }
}

itemWriter.write(processedItems);

 

이전 코드와 다르게  ItemProcessor를 이용해서 itemWriter에 item들을 넘기기 전에 무언가 처리하는 코드이다. 
정말 말 그대로네!

 

ItemProcessor에 대한 더 자세한 내용은 Item processing을 보면 된다.

 

계속 chunk-oriented processing을 들여가보기 전에 정리를 하려고한다.
step을 구성하는 방법으로는 tasklet과 chunk-oriented processing 으로 구성하는 방법이 있다. 

Tasklet 은 단계 내에서 단일 태스크를 수행하기 위한 것으로 임의의 Step을 실행할 때 읽기/처리/쓰기를 하나의 작업으로 처리하는 방식이다.

chunk-oriented processing은 
한 번에 모든 행을 읽고 처리하고 쓰는 대신 한 번에 고정 된 양의 레코드(청크)를 읽고 처리하는 방식이다. 

말들이 좀 모호하고 이것을 정확하게 뭐는 뭐다 하는 식으로 정리를 안해주니 필자가 이해하기에 불편했다.

 

3. Configuring Step

Step은 Step에 요구되는 의존성들의 상대적으로 적은 리스트에도, 잠재적으로 많은 협력자를 가진 매우 복잡한 클래스다.

 

Java Configuration을 사용할 때, Spring Batch Builder를 사용할 수 있다. 다음 처럼 말이다.

 

/**
 * Note the JobRepository is typically autowired in and not needed to be explicitly
 * configured
 */
@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
    return new JobBuilder("sampleJob", jobRepository)
                .start(sampleStep)
                .build();
}

/**
 * Note the TransactionManager is typically autowired in and not needed to be explicitly
 * configured
 */
@Bean
public Step sampleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.build();
}

 

필자는 사실 개념이 훨씬 어렵고 실제 사용하는것은 그렇게 어렵지 않다고 생각이 들었던게 해당 부분 때문이다.
개념은 뭐가 이러쿵 저러쿵, 실패하면 뭐 생기는둥 안생기는둥~ 하는데 결국 코드의 틀은 매우 심플하다.
대부분 비즈니스 로직을 스스로 작성하는 것이지 무언가 customizing해서 구현체들을 새로이 만들지는 않을 것이다.

 

 

앞선 설정은 item-oriented step을 생성하기 위해 유일하게 필요한 의존성을 포함한다.

 

  1. reader : item들에게 처리를 제공하는 itemReader
  2. writer : ItemReader에게 제공되는 아이템을 처리하는 ItemWriter
  3. transactionManager : 처리동안 트랜잭션을 시작하고 처리하는 스프링의 PlatformTransactionManager
  4. repository : commit 전에 처리동안 StepExecution과 ExecutionContext를 주기적으로 저장하는 JobRepository의 자바 특화 이름
  5. chunk : 아이템 기반 단계(item-based step)임을 나타내는 종속성의 자바 특정 이름과, 트랜잭션이 커밋되기 전에 처리될 아이템의 수를 나타냄

repository의 기본값은 jobRepository로  transactionManager는 transactionManger의 기본값이다. 또한, ItemProcessor는 선택적인데 reader로 부터 writer로 바로 전달될 수 있다.

 

4. The Commit Interval

 

이전에 언급했듯이, 주기적으로 제공되는 PlatformTransactionManager를 사용해서 commit함으로  step은 item을 읽고 적는다. 1의 commit-interval에서는 각각 개인의 아이템을 쓴 후에 커밋한다.  이것은 많은 상황에서 이상적이지는 않은데 시작과 커밋 트랜잭션이 비싸기 때문이다. 이상적으로, 각각의 트랜잭션에서 가능한한 많은 아이템들을 처리하는것을 선호하는데, 완벽하게 처리되는 데이터의 형태와 step과 상호작용하는 resource에 의존적이다. 이러한 이유로, 커밋시에 처리되는 아이템들의 수를 설정할 수 있다.

 

@Bean
public Job sampleJob(JobRepository jobRepository) {
    return new JobBuilder("sampleJob", jobRepository)
                     .start(step1())
                     .build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.build();
}

 

앞선 예시에서, 10개의 item들이 각각의 트랜잭션에서 처리된다. 처리의 시작에서, 트랜잭션이 시작된다. 또한 매번 read 가 ItemReader에 의해 호출되고 숫자가 증가한다. 10에 도달하면, 모든 아이템 리스트은 ItemWriter에게 넘기고 트랜잭션이 커밋된다.

 

갑자기 필자는 chunk size가 궁금했다. 도대체 사이즈는 몇으로 해야 하는가?

그래서 chatGPT에게 물어보니 다음과 같은 답변이 돌아왔다.

Me : spring batch 에서 chunk size 는 몇으로 하는게 좋아?

GPT : 
작업의 특성
작업의 성격에 따라 chunk size를 조정해야 합니다. 예를 들어, 데이터베이스에서 대량의 레코드를 읽어와야 하는 경우, chunk size를 크게 설정하여 레코드를 효율적으로 처리할 수 있습니다. 그러나 CPU 집중적인 작업이나 네트워크 호출과 같은 작업의 경우 chunk size를 작게 설정하여 더 빠른 반응성을 얻을 수 있습니다.

메모리 사용량
chunk size를 크게 설정하면 한 번에 메모리에 로드되는 아이템 수가 늘어나므로 메모리 사용량이 증가할 수 있습니다. 메모리 부족 문제를 피하기 위해 메모리 사용량을 모니터링하고 적절한 chunk size를 선택해야 합니다.

트랜잭션 관리
chunk size는 트랜잭션 커밋 주기를 결정합니다. 더 큰 chunk size는 더 적은 트랜잭션 커밋을 의미하며, 더 작은 chunk size는 더 자주 커밋을 의미합니다. 이에 따라 트랜잭션 롤백 및 복구 전략을 고려해야 합니다.

성능 테스트
어떤 chunk size가 작업의 최상의 성능을 제공하는지 확인하기 위해 성능 테스트를 수행해야 합니다. 다양한 chunk size 값을 실험하여 어떤 값이 가장 효율적인 결과를 내는지 확인할 수 있습니다.


일반적으로 chunk size는 작업의 성격과 하드웨어 환경에 따라 다릅니다. 초기에는 작은 값을 선택하고 성능 테스트를 통해 최적의 chunk size를 찾아내는 것이 좋습니다.

그렇다네...

 

5. Configuring a Step for Restart

configuring and running a job 섹션에서, Job을 재시작 하는것에 대해 논의했다. restart는 step에 많은 영향을 미치며 결과적으로 몇개의 구체적인 설정을 요구한다.

 

5-1 : Setting a Start Limit(시작 제한수 세팅)

Step이 시작 될 수 있는 횟수의 수를 제어해야하는 시나리오가 있다. 예를 들어, 특정 Step이 단 한번만 실행하도록 설정하필요가 있는데 다시 시작 되기 전에 고정되야 하는 몇몇 resource를 입증해야 하기 때문이다. 이것은 step level에서 설정할 수 있는데 다른 step들이 다른 요구사항을 가지기 때문이다. 한 번만 실행될 수 있는 Step은 무한히 실행될 수 있는 Step와 동일한 작업의 일부로 존재할 수 있다.

 

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.startLimit(1)
				.build();
}

 

앞선 예시에서 보인 스텝은 오직 한번 실행한다. 다시 시작하려는 시도는 StartLimitExceededException을 던진다. start-limit의 기본값은 Integer.MAX_VALUE라는것을 유의하라.

 

5-2 : Restarting a Completed Step

처음 시도에 성공적인지 여부와는 상관없이 재시작할 수 있는 경우에 항상 실행해야만 하는 하나 혹은 더 많은 step이 있을 수 있다. 예시는 validation step이거나 처리 전에 자원을 치우는 Step일 수도 있다. 재시작된 job의 일반적인 처리동안 COMPLETED 상태릐 모든 step은 생략한다. allow-start-if-complete 을 true로 세팅하는 것은 step이 항상 실행하도록 오버라이드 한다. 

 

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.allowStartIfComplete(true)
				.build();
}

 

 

5-3 : Step Restart Configuration Example (Step 재시작 설정 예시)

다음 자바 예시는 어떻게 job이 재시작 할 수 있는 step을 설정하는지 보여준다.

 

잠시 재시작을 더 읽어 보기 전에 Spring batch에서 job 내에서 실패시 어떻게 흘러가는지 보려고 한다.

예를 들어, 하나의 Job에 Step A, B, C가 있다고 해보자. 실행은 A -> B -> C 순으로 진행된다.

A를 실행한 후 B가 실패한다고 했을 때 job이 멈출 것이고, 다음 실행시에는 B부터 다시 실행할 것이다. 

그런데 A를 항상 실행하도록 설정하면 A -> B -> C 이대로 실행할 것이다. 

여태 재실행에 대해서 이야기를 했는데 너무 설정법만 보다보니 헷갈려서 막간을 이용해 적어보았다.

다음 코드가 길어보여서 이해를 돕기 위해 말이다! (내 자신 화이팅!)

 

@Bean
public Job footballJob(JobRepository jobRepository) {
	return new JobBuilder("footballJob", jobRepository)
				.start(playerLoad())
				.next(gameLoad())
				.next(playerSummarization())
				.build();
}

@Bean
public Step playerLoad(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("playerLoad", jobRepository)
			.<String, String>chunk(10, transactionManager)
			.reader(playerFileItemReader())
			.writer(playerWriter())
			.build();
}

@Bean
public Step gameLoad(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("gameLoad", jobRepository)
			.allowStartIfComplete(true)
			.<String, String>chunk(10, transactionManager)
			.reader(gameFileItemReader())
			.writer(gameWriter())
			.build();
}

@Bean
public Step playerSummarization(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("playerSummarization", jobRepository)
			.startLimit(2)
			.<String, String>chunk(10, transactionManager)
			.reader(playerSummarizationSource())
			.writer(summaryWriter())
			.build();
}

 

앞선 예시 설정은 football 게임에 대한 정보를 로드하고 그것을 요약하는 job에 관한 것이다. 이것은 세 개의 step을 포함한다.playerLoad, gameLoad 그리고 playerSummarizaion이다.

 

playerLoad Step은 flat file로 부터 player 정보를 로드하며 gameLoad step은 게임에게서 같은 정보를 받아온다.

그리고나서 마지막 스텝인 playSummarization은 제공된 게임을 바탕으로 각각 플레이에 대한 통계를 요약한다.

 

playLoad에 의해 로드된 파일이 한 번만 로드되어야 하지만 gameLoad는 데이터페이스에 성공적으로 로드된 것들을 지우면서 특정 디렉토리 내에서 발견된 어떤 게임도 로드할 수 있다.

그 결과 playerLoad step은 추가적인 설정을 포함하지 않는다. 완료되면 건너뛰면서 여러 번 시작할 수 있다.

 

하지만 gameLoad step은 여분의 파일이 마지막 실행 후에 추가된 경우 매번 실행할 필요가 있다. allow-start-if-complete를 true로 설정해 항상 시작되도록 해야한다. 

 

job에서 가장 중요한 summarization step은 2번의 시작 제한을 가지도록 설정되어있다. 만약 step이 계속적으로 실패한다면 새로운 exit code가 잡 실행을 제어하는 작업자에게 반환되고 개입이 있기 전까지 다시 시작하지 않기에 유용하다.

 

* flat file : 텍스트 파일을 말함. 주로 데이터의 구분이 컴마로 되어 있다.

 

 footballJob예시의 각가 세 개의 실행 시에 무슨 일이 일어나는지 보자.

 

Run1 : 

1. playerLoad가 실행하고 PLAYERS 테이블에 400명의 선수를 추가하고 성공적으로 완료한다.

2. gameLoad는 11개의 게임 데이터 상당 파일을 실행하고 처리하여 해당 콘텐츠를 GAMES 테이블에 로드

3. playerSummarization가 처리를 시작하고 5분 후에 실패한다.

 

Run2 :

1. playerLoad가 실행하지 않는데 이미 성공적으로 완료되었고 allow-start-if-complete 이 false이기 때문이다.

2. gameLoad가 다시 실행하고 GAME 테이블 또한 그들의 컨텐츠로 로딩하면서 다른 2개의 파일을 처리한다.

3. playerSummarization는 모든 남아있는 게임 데이터를 처리하기 시작하고 30분 뒤에 다시 실패한다.

 

Run3 :

1. playerLoad가 실행하지 않는데 이미 성공적으로 완료되었고 allow-start-if-complete 이 false이기 때문이다.

2. gameLoad가 다시 실행하고 GAME 테이블 또한 그들의 컨텐츠로 로딩하면서 다른 2개의 파일을 처리한다

3. playerSummarization는 시작하지 않았고 잡은 즉시 멈추는데 playerSummarization의 세번째 실행이기 때문인데 제한이 2회이기 때문이다. 그래서 제한이 높아져야 하거나 Job은 새로운 JobInstance를 실행해야만 한다.

 

먼가 가상의 시나리오를 통해서 이런 설정이 있으면 이렇게 흘러가는지 알려주기 위해서 이렇게 예시를 들은 것 같다. 코드와 그 흐름을 보기 전에 앞서 말했던 간단한 흐름이 이것과 크게 다르지는 않았다.

 


 

이번 시간에는 chunk-oriented processing이 무엇이고 Step을 어떻게 설정하는지, 그리고 commit interval과 재시작 설정에 대해서 알아보았다.

 

다음에는 Configuring Skip Logic부터 다시 읽어볼 것이다.

 

출처:

https://docs.spring.io/spring-batch/docs/5.0.4/reference/html/step.html