코딩은 마라톤

[Spring Batch] 스프링 배치 아키텍쳐 본문

Backend/Spring Batch

[Spring Batch] 스프링 배치 아키텍쳐

anxi 2024. 4. 23. 03:51

스프링 배치 기본 구조

Spring Batch Arcitecture

 

  • Job Launcher 가 Job을 실행하고 Job이 Step을 실행한다.
  • Job Repository : db 또는 메모리에 스프링 배치가 실행할 수 있도록 메타 데이터를 관리하는 클래스
  • Job : 배치의 실행 단위
    • N개의 Step을 실행할 수 있고, 흐름(Flow)을 관리할 수 있다.
  • Step : Job의 세부 실행 단위, N개가 등록돼 실행된다.
    • Step의 실행 단위는 크게 2가지다.
      • Chunk 기반 : 하나의 큰 덩어리를 n개씩 나눠서 실행
      • Task 기반 : 하나의 작업 기반으로 실행
    • Chunk 기반 Step은 ItemReader, ItemProcessor, ItemWriter가 있다.
      • ItemReader : 배치 처리 대상 객체(Item) 를 읽어 ItemProcessor, ItemWriter에게 전달한다
        • ex) 파일 또는 DB에서 데이터를 읽는 동작
      • ItemProcessor : input 객체를 output 객체로 필터링 또는 processing해서 ItemWriter에게 전달한다.
        • ex) ItemReader에서 읽은 데이터를 수정 또는 ItemWriter 대상인지 필터링한다.
        • ItemProcessor은 Optional하다.
        • ItemProcessor이 하는 일은 ItemReader 또는 ItemWriter가 대체할 수 있다.
      • ItemWriter : 배치 처리 대상 객체를 처리한다.
        • ex) DB 업데이트, 처리 대상 사용자에게 알림을 보내는 동작

스프링 배치 테이블 구조

BATCH_ : 배치 실행을 위한 메타 데이터가 저장되는 테이블

  • BATCH_JOB_INSTANCE
    • Job이 실행되며 생성되는 최상위 계층 테이블
    • job_name과 job_key를 기준으로 하나의 행이 생성, 같은 job_name과 job_key가 저장될 수 없다.
    • job_key는 BATCH_JOB_EXECUTION_PARAMS에 저장되는 Parameter를 나열해 암호화해 저장한다.
  • BATCH_JOB_EXECUTION
    • Job이 실행되는 동안 시작/종료 시간, job 상태 등을 관리
  • BATCH_JOB_EXECUTION_PARAMS
    • Job을 실행하기 위해 주입된 parameter 정보 저장
  • BATCH_JOB_EXECUTION_CONTEXT
    • Job이 실행되며 공유해야할 데이터를 직렬화해 저장
  • BATCH_STEP_EXECUTION
    • Step이 실행되는 동안 필요한 데이터 또는 실행된 결과 저장
  • BATCH_STEP_EXECUTION_CONTEXT
    • Step이 실행되며 공유해야할 데이터를 직렬화해 저장

 

배치 테이블과 매핑되는 클래스 - Job, JobInstance, JobExecution, Step, StepExecution

 

  • JobInstance == BATCH_JOB_INSTANCE
    • JobInstance의 생성 기준은 JobParameters 중복 여부에 따라 생성
    • 다른 parameter로 Job이 실행되면, JobInstance가 생성
    • 같은 parameter로 Job이 실행되면, 이미 생성된 JobInstance가 실행
    • Job을 항상 새로운 JobInstance가 실행할 수 있도록 RunIdIncrementer 제공
      • RunIdIncermenter은 항상 다른 run.id를 parameter로 제공
  • JobExecution == BATCH_JOB_EXECUTION
    • JobExecution은 항상 새롭게 생성
  • JobParameters == BATCH_JOB_EXECUTION_PARAMS
  • ExecutionContext == BATCH_JOB_EXECUTION_CONTEXT, BATCH_STEP_EXECUTION_CONTEXT
    • BATCH_JOB_EXECUTION_CONTEXT : Job내에서 공유할 수 있다. (Job에 묶인 여러 Step에서 공유)
    • BATCH_STEP_EXECUTION_CONTEXT : 하나의 Step에서만 공유할 수 있다. (다른 Step 공유 X)
  • StepExecution == BATCH_STEP_EXECUTION

실습

실습을 통해 ExecutionContext에서 데이터 공유가 어떻게 일어나는지 확인하고자 한다.

  @Bean
  public Job sharedJob() {
    return jobBuilderFactory.get("shareJob")
        .incrementer(new RunIdIncrementer())
        .start(this.shareStep())
        .next(this.shareStep2())
        .build();
  }

 

  • shareJob이라는 Job이 실행된다.
    • shareStep()이라는 Step이 실행되고 나서 shareStep2()의 Step이 이어서 실행된다.
  @Bean
  public Step shareStep() {
    return stepBuilderFactory.get("shareStep")
        .tasklet(((contribution, chunkContext) -> {
          StepExecution stepExecution = contribution.getStepExecution();
          ExecutionContext stepExecutionContext = stepExecution.getExecutionContext();
          stepExecutionContext.putString("stepKey", "step execution context");

          JobExecution jobExecution = stepExecution.getJobExecution();
          JobInstance jobInstance = jobExecution.getJobInstance();
          ExecutionContext jobExecutionContext = jobExecution.getExecutionContext();
          jobExecutionContext.putString("jobKey", "job execution context");
          JobParameters jobParameters = jobExecution.getJobParameters();

          log.info("jobName : {}, stepName : {}, parameter : {}",
              jobInstance.getJobName(),
              stepExecution.getStepName(),
              jobParameters.getLong("run.id"));

          return RepeatStatus.FINISHED;
        }))
        .build();
  }

 

  @Bean
  public Step shareStep2() {
    return stepBuilderFactory.get("shareStep2")
        .tasklet(((contribution, chunkContext) -> {
          StepExecution stepExecution = contribution.getStepExecution();
          ExecutionContext stepExecutionContext = stepExecution.getExecutionContext();

          JobExecution jobExecution = stepExecution.getJobExecution();
          ExecutionContext jobExecutionContext = jobExecution.getExecutionContext();

          log.info("jobKey : {}, stepKey : {}",
              jobExecutionContext.getString("jobKey", "emptyJobKey"),
              stepExecutionContext.getString("stepKey", "emptyStepKey"));

          return RepeatStatus.FINISHED;
        }))
        .build();
  }

 

각 Step은 Task기반으로 실행되고 있다.

  • shareStep()
    • JobExecutionContext
      • key : jobKey, value : job execution context
    • StepExecutionContext
      • key : stepKey, value : step execution context

ExecutionContext에 위와 같이 값을 넣어준다. 이때 우리가 기대하는 것은 동일한 Job에서 Step에 따라 ExecutionContext 값이 공유되는지 확인하고자 한다.

  • shareStep2()
    • JobExecutionContext, StepExecutionContext
      • shareStep()에서 넣었던 jobKey와 stepKey를 통해 현재 값이 ExecutionContext에서 공유되는지 확인한다.
      • 공유가 되고 있다면 jobKey에 해당하는 값인 job execution context, 그렇지 않으면 default값인 emptyJobKey가 반환될 것이다.
      • 공유가 되고 있다면 stepKey에 해당하는 값인 step execution context, 그렇지 않으면 default값인 emptyStepKey가 반환될 것이다.

결과

 

Job은 RunIdIncermenter를 통해 run.id=2를 parameter로 가지고 있다. (1번 실행해서 run.id가 2로 되어있다.)

 

첫 번째 Step인 shareStep이 실행시 jobName은 shareJob, stepName은 shareStep으로 잘 나타내고 있다.

 

그러나 두 번째 Step인 shareStep2가 실행시 

  • JobExecutionContext는 같은 Job에서 실행되고 있어 job execution context 를 잘 가져 온다.
  • 그러나 StepExecutionContext는 독립적인 Step이므로 데이터 공유가 일어나지 않아 emptyStepKey를 가져오게 된다.

 

'Backend > Spring Batch' 카테고리의 다른 글

[Spring Batch] ItemReader - CSV, JDBC, JPA  (0) 2024.04.30