관리 메뉴

나만의공간

Spring Batch #9 (Step의 이해) 본문

IT/Spring Batch

Spring Batch #9 (Step의 이해)

밥알이 2023. 10. 16. 19:23

StepBuilderFactory와 StepBuilder

  • StepBuilderFactory
    • StepBuilder를 생성하는 팩토리 클래스로서 get 메소드를 제공함.
    • StepBuilderFactory.get("stepName")
      1. stepName으로 step을 생성하도록 StepBuilder에게 전달
  • StepBuilder
    • Step을 구성하는 설정 조건에 따라 다섯 개의 하위 빌더 클래스를 생성하고, Step생성을 위임함.
      1. TaskletStepBuilder
        -. Tasklet을 생성하는 기본 빌더 클래스
      2. SimpleStepBuilder
        -. Tasklet을 생성하며, 내부적으로 청크기반 작업을 처리하는 ChunkOrientedTasklet 클래스를 생성합니다.
      3. PartitionStepBuilder
        -. PartitionStep을 생성, 멀티 스레드 방식으로 Job을 실행
      4. JobStepBuilder
        -. JobStep을 생성, Step안에서 Job을 실행
      5. FlowStepBuilder
        -. FlowStep을 생성, Step안에서 Flow를 실행

아키텍쳐

JobBuilderFactory와 유사한 구조를 가짐
StepBuilderFactory에서 get을 호출하면 내부적으로 StepBuilder를 호출
StepBuilder는 API의 파라미터 타입과 구분에 따라 적절한 하위 빌더를 생성

상속구조

상속 구조도 JobBuildrFactory와 유사함

TaskletStep

  • 스프링 배치에서 제공하는 Step의 구현체로서 Tasklet을 실행시키는 도메인 객체
  • TaskletStep은 Tasklet을 실행시키는데 있어, 중간에 RepeatTemplate을 사용하여, Tasklet의 구문을 트랜잭션 경계 내에서 반복해서 실행시킴
  • Task 기반과 Chunk 기반으로 나눠서 Tasklet을 실행

Task와 Chunk기반 비교

스프링 배치에서 Step의 실행 단위는 크게 Step과 Chunk로 나눠짐.

  • Chunk 기반
    • 하나의 큰 덩어리를 N개씩 나눠서 실행한다는 의미로 대량 처리를 하는 경우에 적합합니다.
    • ItemReader, ItemProcessor, ItemWriter를 사용하며 청크 기반 전용 Tasklet인 ChunkOrientedTasklet 구현체가 제공
  • Task 기반
    • 단일 작업 기반으로 처리되는 경우에 적합합니다.
    • 주로 Tasklet 구현체를 만들어서 사용합니다.

기본API

tasklet()

  • Tasklet 타입의 클래스를 세팅하는 API
  • TaskletStepBuilder가 반환되어 관련 API를 설정할 수 있음

Tasklet

  • 인터페이스로 execute 단일 메서드를 제공합니다.
  • Step 내에서 구성되고 실행되는 도메인 객체로 주로 단일 테스크를 수행하기 위한 것
  • TaskletStep에 의해 반복적으로 수행되며, 반환값에 따라 계속 수행 혹은 종료 여부가 결정됩니다.
    • RepeatStatus: Tasklet의 반복 여부 상태값
      1. FINSHED: 반복종료(null로 반환시에도 이값 적용0
      2. CONTINUABLE: 무한반복
  • 익명 클래스 혹은 구현 클래스로 만들어서 사용
  • Step에 오직 하나의 Tasklet설정이 가능 하며, 두 개 이상 설정할 경우 마지막 설정한 Tasklet이 적용됩니다.

Tasklet 기본 예시

package com.study.springbatch.domain.job;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CustomTasklet implements Tasklet {

	@Override
	public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
		log.info("tasklet 1 complete");
		return RepeatStatus.FINISHED;
	}
}
package com.study.springbatch.domain.job;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@RequiredArgsConstructor
@Slf4j
@Configuration
public class TaskletSampleJobConfig {

	private final JobBuilderFactory jobBuilderFactory;
	private final StepBuilderFactory stepBuilderFactory;

	@Bean
	public Job helloJobConfiguration() {
		return jobBuilderFactory.get("taklet1")
			.start(step1())
			.incrementer(new RunIdIncrementer())
			.build();
	}

	public Step step1() {
		return stepBuilderFactory.get("taskletStep1")
			.tasklet(new CustomTasklet())
			.build();
	}
}

startLimit()

  • Step의 실행 횟수를 조정할 때 사용됩니다.
  • Step 마다 설정할 수 있습니다.
  • 설정 값을 초과한다면 StartLimitExceededException이 발생합니다.
  • 기본값은 Integer.MAX_VALUE입니다.

※ 아래 예시로 실행을 해도 해당 배치는 계속적인 실행이 되고 있어, startLimit가 어떤 경우에 발생을 하게 되는지 확인이 필요함.

startLimit 예시

package com.study.springbatch.domain.job;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CustomTasklet implements Tasklet {

	@Override
	public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
		log.info("================ tasklet 1 complete ======================");
		// throw new Exception("오류가 발생해여 재시작 해주나???");
		int error = 5/0;
		return RepeatStatus.FINISHED;
	}
}
package com.study.springbatch.domain.job;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
@Configuration
public class TaskletSample2JobConfig {

	private final JobBuilderFactory jobBuilderFactory;
	private final StepBuilderFactory stepBuilderFactory;

	@Bean
	public Job helloJob() {
		return jobBuilderFactory.get("taskletSample2")
			.start(step1())
			.incrementer(new RunIdIncrementer())
			.build();
	}

	@Bean
	public Step step1() {
		return stepBuilderFactory.get("taskletStep2")
			.tasklet(new CustomTasklet())
			.startLimit(3)	//taskletStep2는 3번만 실행이 가능하다.
			.build();
	}
}

allowStartIfComplete()

  • 실패로 인한 재시작이 가능한 Job이 다시 실행될때, 이전 실행에서 Step의 성공 여부와 관계없이 항상 Step을 실행하기 위한 설정
  • Job이 실행될때마다 무조건 해당 Step이 실행되게 하는 옵션
  • 실행시 유효성을 검증하는 Step이나 꼭 필요한 사전작업 Step의 경우에 사용함
  • 기본값은 false

TaskletStep 흐름도

Job이 TaskletStep을 호출하는 사이에서 StepExecution과 ExecutionContext가 생성되고, TaskletStep에 전달됨.
TaskLet이 실행되기 전에 StepListener의 beforeStep이 호출되고,
Tasklet 작업이 끝나면 StepListener의 afterStep이 호출됨
StepExecution에 Step의 완료 상태를 업데이트 합니다.
StepExecutionListener 호출 후 추가적으로 exitStatus 상태를 업데이트 할 수 있습니다.

JobStep

기본개념

  • 외부의 Job을 포함하고 있는 하나의 Step
  • 외부의 Job이 실패하면 해당 Step이 실패하므로 해당 Step을 품고 있는 기본 Job도 실패 합니다.
  • 모든 메타데이터는 기본 Job과 외부 Job별로 각각 저장됩니다.
  • 커다란 시스템을 작은 모듈로 쪼개고 Job의 흐름을 관리하고자 할때 사용합니다.
  • Step의 동작이었던 Tasklet대신 Job을 넣었다고 생각하면 이해가 편합니다.

API소개

예시

package com.study.springbatch.domain.job;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.step.job.DefaultJobParametersExtractor;
import org.springframework.batch.core.step.job.JobParametersExtractor;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@RequiredArgsConstructor
@Slf4j
@Configuration
public class JobStepSampleJobConfig {

	private final JobBuilderFactory jobBuilderFactory;
	private final StepBuilderFactory stepBuilderFactory;

	@Bean
	public Job JobSampleJob() {
		return jobBuilderFactory.get("parentJob1")
			.start(jobStep(null))// null값으로 설정하면 DI로 자동 주입됨.
			.incrementer(new RunIdIncrementer())
			.build();
	}

	/**
	 * JobStep을 호출하게 되면 해당 Step에서는 Tasklet을 만들수 없음.
	 * 코딩시 문법 오류가 발생하는데 맞는 정책인지 좀더 고민
	 * @param jobLauncher
	 * @return
	 */
	@Bean
	public Step jobStep(JobLauncher jobLauncher) {
		return stepBuilderFactory.get("parentStep1")
			.job(childJob())
			.parametersExtractor(jobParametersExtractor())
			.listener(new StepExecutionListener() {
				@Override
				public void beforeStep(StepExecution stepExecution) {
					stepExecution.getExecutionContext().putString("name","Hong Gil Dong");
					log.info("Step을 실행하기전 실행되는 메소드 beforeStep");
				}

				@Override
				public ExitStatus afterStep(StepExecution stepExecution) {
					log.info("Step실행을 완료후 실행하는 메소드 afterStep");
					return null;
				}
			})
			.build();

	}

	/**
	 * Step의 ExecutionContext에 name의 Key를 갖는 키밸류 값을 꺼내
	 * JobStep의 JobParameters로 만들어서 전달 한다.
	 * @return
	 */
	private JobParametersExtractor jobParametersExtractor() {
		DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();
		extractor.setKeys(new String[]{"name"});
		return extractor;
	}

	/**
	 * JobStep에서 해당 Job을 호출하여 실행한다.
	 * @return
	 */
	@Bean
	public Job childJob() {
		return jobBuilderFactory.get("childJob")
			.start(childStep())
			.build();

	}

	/**
	 * JobStep에서 호출한 Job에서 호출 되는 Step
	 * @return
	 */
	@Bean
	public Step childStep() {
		return stepBuilderFactory.get("childStep1")
			.tasklet(((stepContribution, chunkContext) -> {
				JobParameters jobParameters = stepContribution.getStepExecution().getJobParameters();
				log.info("Job 에서 전달된 이름은 {} 입니다.",jobParameters.getString("name"));
				return RepeatStatus.FINISHED;
			}))
			.build();
	}

}

참고

https://velog.io/@backtony/Spring-Batch-Step

 

Comments