Spring/Spring Batch

[Spring Batch] 5장 : Configuring a Step - 3 : StepExecutionListener ~ Example Tasklet Implementation

공대키메라 2023. 12. 13. 22:30

지난 시간에는 스킵 로직을 설정하는 것에서부터 Step 실행에 중도에 끼어드는 방법까지 알아 보았다.

(Configuring Skip Logic ~ Intercepting Step Execution)

지난 내용은 여기 클릭!

 

이번 시간에는 StepExecutionListener에서부터 다시 이어서 읽어보도록 하겠다.

 


 

Listener?

리스너는 배치 흐름 중에 Job, Step, Chunk 실행 전후에 어떤 일을 하도록 하는 Interceptor 개념의 클래스다. 스프링 MVC의 Interceptor가 실제로 HandlerAdapater 호출 전후로 Interceptor가 호출되는 것처럼 Job, Step, Chunk 전후로 오버라이딩된 메서드가 호출된다. 

 

주로 각 단계별로 로그 기록을 남기거나, 진행된 시간, 실행 상태 정보들을 참조 및 조회하는데 도움이 된다. SpringBatch에서 Listener를 사용하기 위해서는 API 리스너에 리스너를 지정해주면 된다. 

 

1. StepExecutionListener

StepExecutionListener는 Step 실행에서 가장 일반적인 listener이다. 정상적으로 끝나던지 실패하던지step이 시작되기 전와 끝난 후에 알림을 허용하는데, 이는 다음 예시에 나와 있다.

 

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

 

ExitStatus가 afterStep의 반환 형식을 가지는데, listener에게 Step의 완료에 의해 반환된 exit code를 수정할 기회를 제공한다. 이 인퍼테이스에 해당하는 주석은 

- @BeforeStep

- @AfterStep

 

2. ChunkListener

"chunk"는 트랜잭션 범위 안에서 처리된 아이템으로 정의된다. 각각의 커밋 간격에서 트랜잭션을 커밋하는것은 chunk를 커밋한다. chunk가 처리를 시작하기 전에 혹은 chunk가 성공적으로 완료된 후에 로직을 실행하기 위해 ChunkListener를 사용할 수 있다. 

 

public interface ChunkListener extends StepListener {

    void beforeChunk(ChunkContext context);
    void afterChunk(ChunkContext context);
    void afterChunkError(ChunkContext context);

}

 

beforeChunk 메소드는 트랜잭션이 시작된 후에, 하지만 ItemReader를 시작해서 읽기 전에 호출된다. 대조적으로 afterChunk는 chunk가 커밋된 후에 호출된다. (혹은 rollback이 없는 경우)

 

이 인터페이스에 해당하는 어노테이션은 @BeforeChunk, @AfterChunk, @AfterChunkError이다. 

 

chunk 선언이 없을 때 ChunkListener를 적용할 수 있다. TaskletStep은 ChunkListener를 호출을 담당하는데 non-time-oriented tasklet에 적용한다. ( tasklet전에 그리고 후에 호출된다)

 

3. ItemReaderListener

이전의 생략 로직에 대해 논할 때 나중에 다뤄질 수 도 있기 때문에 스킵한 레코드들을 기록하는하는게 이익일지도 모른다고 언급했다. read error의 경우에 아래 인터페이스 정의가 보여주듯이 ItemReaderListener로 이 일을 해낸다.

 

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

 

beforeRead 메소드는 각 호출 전에 Item Reader에서 읽기 위해 호출된다. afterRead 메소드는 각 성공 호출 후에 읽기 위해서 그리고 읽힌 아이템을 넘기기 위해 호출된다. 만약 읽는 동안 에러가 있다면 onReadError 메소드가 호출된다. 맞닥드린 예외는 기록할 수 있도록 제공된다.

 

이 인터페이스에 맞는 어노테이션은 다음과 같다.

  • @BeforeRead
  • @AfterRead
  • @OnReadError

4. ItemProcessorListener

ItemProcessorListener로 아이템 처리는 "listened"될 수 있는데 다음 인터페이스 정의가 보여준다.

 

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

 

beforeProcess 메소드는 ItemProcessor의 처리 전에 호출되고 처리된 아이템을 다룬다. afterProcess 메소드는 item이 성공적으로 처리된 후에 호출된다. 처리하는동안 에러가 발생한다면 onProcessError 메소드가 호출된다. 맞닥드린 예외와 처리되기 위해 시도된 item이 제공되며 기록될 수 다. 

 

이 인터페이스에 맞는 어노테이션은 다음과 같다.

  • @BeforeProcess
  • @AfterProcess
  • @OnProcessError

5. ItemWriteListener

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

 

... 생략 (앞의 Listener들과 내용이 흡사)

 

무언가 계속 반복되는 것 같지 않은가?

listener들을 읽어보니 뭐... 대략적으로 비슷해서 뭐가 있겠거니~ 하면 있을 것 같은 느낌이다. 느낌느낌!

(루키루키! 넌나의 수퍼루키루키! 마치마치 이 느낌적인 느낌느낌!)

6. SkipListener

ItemReaderListener, ItemProcessListener 그리고 ItemWriteListener 모두 에러를 알 수 있는 메커니즘을 제공하지만 기록이 실제로 스킵되었다고 알려주지는 않는다. 예를 들어 onWriteError는 아이템이 재시도되고 성공한다면 호출된다.

이러한 이유로 다음 인터페이스 정의에서 보듯이 생략된 아이템을 추척하는 별도의 인터페이스가 있다. 

 

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

 

onSkipInRead는 아이템이 읽을 동안 생략될 때 마다 호출된다. rollback이 한 번 이상으로 생략됐을 때 같은 item이 등록되도록 할 수 있다는 점을 유의하라. onSkipInWrite는 item이 쓰는 동안 생략되면 호출된다. 

 

  • @OnSkipInRead
  • @OnSkipInWrite
  • @OnSkipInProcess

 

7. SkipListener and Transactions

SkipListener의 가장 흔한 use case중의 하나는 생략된 item을 기록해서 바른 배치 처리 혹은 다른 인간 과정(human process ?)가 생략을 유발한 issue를 평가하고 고치는데 사용하도록 하는 것이다. 오리지널 트랜잭션이 롤백될지도 모르는 많은 경우가 있기 때문에 Spring Batch는 두 가지를 보장한다.

 

  • 적절한 스킵 메소드가 item 하나에 오직 한 번만 호출된다.
  • SkipListener는 항상 트랜잭션이 커밋되기 전에 호출된다. 이것은 listener에 의해 호출된 모든 트랜잭션 자원들이 ItemWrite이내에서 실패로 rollback되지 않는 점을 명확히 한다.

8. TaskletStep

청크 기반(Chunk-oriented)처리는 Step을 처리하는 유일한 방법이 아니다. Step이 stored procesure 호출로 구성되야 한다면 어떨까? ItemReader로서 호출을 구현할 수 있고 처리가 끝난 후에 null을 반환할 수 있다. 하지만 그렇게 하는건 약간 부자연스러운데 No-op Item Writer가 있어야 하기 때문이다. Spring Batch는 이 시나리오를 위해 TaskletStep을 제공한다.

 

Tasklet 인터페이스는 execute 메소드 하나를 가지고 있는데 RepeatStatus.FINISHED를 반환할 때 까지 혹은 실패 신호로 예외를 던질 때 까지 TaskletStep에 의해 반복적으로 호출된다. Tasklet의 각각의 호출은 트랜잭션 안에 감싸져 있다. Tasklet을 구현하는 사람은 stored procedure 혹은 SQL update statement를 호출할 수도 있다.

 

Java에서 TaskletStep을 생성하려면 빌더에 tasklet 메소드를 넘겨받은 빈이 Tasklet 인퍼테이스를 구현해야만 한다. TaskletStep을 작성할 때 청크에 대한 호출을 수행할 수 없다.

 

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("step1", jobRepository)
    			.tasklet(myTasklet(), transactionManager)
    			.build();
}

 

* Note

만약 TaskelStep이 StepListener 인터페이스를 구현한다면, TaskletStep은 자동적으로 StepListener를 tasklet에 등록한다.

 

9. TaskletAdapter

ItemRedaer와 ItemWriter 인터페이스에 대한 다른 어댑터와 마찬가지로 Tasklet 인터페이스는 이미 존재하는 클래스 자체를 적응할 수 있는 구현체를 포함한다 : TaskletAdapter. 이것이 유용할 수도 있는 한 예시는 레코드의 집합 flag를 업데이트 할 때 사용되는 기존의 DAO가 있다. Tasklet 인터페이스에 adapter를 사용하는것 없이 TaskletAdapter를 사용해서 이 클래스를 호출할 수 있다.

 

Java Configuration

@Bean
public MethodInvokingTaskletAdapter myTasklet() {
	MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();

	adapter.setTargetObject(fooDao());
	adapter.setTargetMethod("updateFoo");

	return adapter;
}

 

위의 코드는 스프링 배치 작업 실행시 'fooDao'의 'updateFoo' 메소드가 자동으로 호출되도록 구성한 것이다.

 

10. Example Tasklet Implementation

많은 배치 job들이 주요 처리가 시작하기 전에 다양한 리소스를 세팅하기 위해 실행되야 하거나 처리가 끝난 후에 관련 리스소들을 지우기 위한 작업을 하도록 하는 step들을 포함한다. 파일을 가지고 열심히 작업하는 잡의 경우에 자주 특정 파일을 지역적으로 삭제할 필요가 있다. 다음 예시는 그러한 책임을 가진 Tasklet의 구현체이다. (Spring Batch Sample Project에 있다.)

 

public class FileDeletingTasklet implements Tasklet, InitializingBean {

    private Resource directory;

    public RepeatStatus execute(StepContribution contribution,
                                ChunkContext chunkContext) throws Exception {
        File dir = directory.getFile();
        Assert.state(dir.isDirectory());

        File[] files = dir.listFiles();
        for (int i = 0; i < files.length; i++) {
            boolean deleted = files[i].delete();
            if (!deleted) {
                throw new UnexpectedJobExecutionException("Could not delete file " +
                                                          files[i].getPath());
            }
        }
        return RepeatStatus.FINISHED;
    }

    public void setDirectoryResource(Resource directory) {
        this.directory = directory;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.state(directory != null, "directory must be set");
    }
}

 

앞의 tasklet 구현체는 주어진 디렉토리 내에 모든 파일들을 삭제한다. 실행 메소드는 오직 한 번만 호출된다는 점을 기억하라. 남는 것은 step에서 tasklet을 참조하는 것 뿐이다.

다음 예시는 Java의 step으로부터 어떻게 tasklet을 참조하는지를 보여준다.

 

@Bean
public Job taskletJob(JobRepository jobRepository) {
	return new JobBuilder("taskletJob", jobRepository)
				.start(deleteFilesInDir())
				.build();
}

@Bean
public Step deleteFilesInDir(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("deleteFilesInDir", jobRepository)
				.tasklet(fileDeletingTasklet(), transactionManager)
				.build();
}

@Bean
public FileDeletingTasklet fileDeletingTasklet() {
	FileDeletingTasklet tasklet = new FileDeletingTasklet();

	tasklet.setDirectoryResource(new FileSystemResource("target/test-outputs/test-dir"));

	return tasklet;
}

 


 

이번 글에서는 Listener 설정과 tasklet설정에 대해 집중적으로 알아보았다.

 

다만 다시 이 내용을 복기하고 이 글을 마무리하려고 한다.

 

Tasklet 은 단계 내에서 단일 태스크를 수행하기 위한 것으로 임의의 Step을 실행할 때 읽기/처리/쓰기를 하나의 작업으로 처리하는 방식이다.

chunk-oriented processing은 
한 번에 모든 행을 읽고 처리하고 쓰는 대신 한 번에 고정 된 양의 레코드(청크)를 읽고 처리하는 방식이다. 

 

Tasklet의 경우에는 읽기, 처리, 쓰기를 하나의 작업으로 처리를 하니 상대적으로 복잡하지 않은 간단한 업무의 경우에는 유용할 것이다. 

 

하지만 좀 복잡한 상황이라면 chunk로 job을 처리하는것이 현명할 것이다.

 

이러한 것을 사용하는 와중에 listener라는 것이 있는 것이고, tasklet도 설정할 수 있는 것이다. 

 

 

출처

인프런 - 스프링 배치(정수원)

https://docs.spring.io/spring-batch/docs/5.0.4/reference/html/step.html#configureStep