코딩공부

Spring Boot 3.x에서 Chunk 사용해서 데이터 read, write 사용률 알아보기

integerJI 2024. 12. 24. 14:28

 

0. Chunk를 사용하는 이유는 뭘까?

  • 메모리 관리에 용이
    • 전체 데이터를 한 번에 로드하지 않고 설정된 Chunk 크기만큼 메모리에 유지시킨다. 
    • 대용량 데이터 처리 시에도 OutOfMemoryError를 방지할 수 있다. 
    • Garbage Collection 부하가 감소된다. 
  • 2. 트랜잭션 관리
    • Chunk 단위로 트랜잭션 처리가 되기 때문에 전체적인 프로세스 안정성에 향상된다. 
  • 3. 성능 최적화
    • JDBC Batch 처리와 결합하여 데이터베이스 작업에 최적화한다. 
    • 네트워크 통신 횟수를 감소시킨다. 
    • 데이터베이스에 부하를 분산시킨다. 

크게 이 3가지로 보인다. 

 

나는 회사 프로젝트에서 대용량 데이터를 조금 더 효율적으로 다루어 보기 위해 테스트를 진행해 보았다. 

 

1. 프로젝트 설정

1-1. 사용된 언어와 라이브러리 버전

Java 17
spring boot 3.3.6

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-batch:3.3.6'
	implementation 'org.springframework.boot:spring-boot-starter-jdbc:3.3.6'
	compileOnly 'org.projectlombok:lombok:1.18.36'
	runtimeOnly 'com.microsoft.sqlserver:mssql-jdbc:12.6.4.jre11'
	annotationProcessor 'org.projectlombok:lombok:1.18.36'
	testImplementation 'org.springframework.boot:spring-boot-starter-test:3.3.6'
	testImplementation 'org.springframework.batch:spring-batch-test:5.1.2'
	testRuntimeOnly 'org.junit.platform:junit-platform-launcher:1.10.5'
}

 

1-2. 테이블 생성 및 데이터 Insert (mssql)

-- 테스트 데이터를 저장할 테이블
CREATE TABLE test_table (
    id BIGINT IDENTITY PRIMARY KEY, -- 자동 증가 ID
    name NVARCHAR(100),             -- 이름
    value INT,                      -- 랜덤 값
    created_at DATETIME             -- 생성 날짜
);

-- 처리된 데이터를 저장할 테이블
CREATE TABLE processed_table (
    id BIGINT IDENTITY PRIMARY KEY, -- 자동 증가 ID
    name NVARCHAR(100),             -- 변환된 이름
    value INT,                      -- 변환된 값
    created_at DATETIME             -- 생성 날짜
);
-- 대용량 데이터 삽입
SET NOCOUNT ON;

DECLARE @Counter INT = 1;

WHILE @Counter <= 300000
BEGIN
    INSERT INTO test_table (name, value, created_at)
    VALUES (
        CONCAT('TestName_', @Counter),        -- 고유한 이름
        ABS(CHECKSUM(NEWID()) % 10000),       -- 랜덤 값 (0~9999)
        DATEADD(SECOND, @Counter, GETDATE()) -- 고유한 생성 날짜
    );
    SET @Counter = @Counter + 1;
END;

 

2. 배치 생성

Chunk를 사용하여 데이터를 Read, Write 하는 함수를 만듭니다. 

package com.home.batch.config;

import com.home.batch.listener.PerformanceStepListener;
import com.home.batch.model.InputType;
import com.home.batch.model.OutputType;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
public class BatchConfig {

    private final DataSource dataSource;

    public BatchConfig(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Bean
    public Job batchJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new JobBuilder("batchJob", jobRepository)
                .start(step1(jobRepository, transactionManager))
                .build();
    }

    @Bean
    public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("step1", jobRepository)
                .<InputType, OutputType>chunk(1000, transactionManager)
                .reader(itemReader())
                .processor(itemProcessor())
                .writer(itemWriter())
                .listener(new PerformanceStepListener())
                .allowStartIfComplete(false)
                .build();
    }

    @Bean
    public JdbcCursorItemReader<InputType> itemReader() {
        return new JdbcCursorItemReaderBuilder<InputType>()
                .name("jdbcItemReader")
                .dataSource(dataSource)
                .sql("SELECT id, name, value, created_at FROM test_table")
                .rowMapper((rs, rowNum) -> new InputType(
                        rs.getString("name"),
                        rs.getInt("value"),
                        rs.getTimestamp("created_at").toLocalDateTime()
                ))
                .build();
    }

    @Bean
    public ItemProcessor<InputType, OutputType> itemProcessor() {
        return input -> new OutputType(
                input.getName().toUpperCase(),
                input.getValue() * 2,
                input.getCreatedAt()
        );
    }

    @Bean
    public JdbcBatchItemWriter<OutputType> itemWriter() {
        return new JdbcBatchItemWriterBuilder<OutputType>()
                .dataSource(dataSource)
                .sql("INSERT INTO processed_table (name, value, created_at) VALUES (:name, :value, :createdAt)")
                .beanMapped()
                .build();
    }
}

 

하나씩 알아보자

2-1. Step

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("step1", jobRepository)
            .<InputType, OutputType>chunk(1000, transactionManager)
            .reader(itemReader())
            .processor(itemProcessor())
            .writer(itemWriter())
            .listener(new PerformanceStepListener())
            .allowStartIfComplete(false)
            .build();
}

 

chunk를 100으로 설정한다. 

 

reader -> processor -> writer 하나의 묶음으로 동작한다. 

 

listener을 통하여 메타데이터를 확인한다. 

 

allowstartIfComplete를 false로 설정하여 완료된 Step은 재실행되지 않도록 한다.

 

2-2. reader

@Bean
public JdbcCursorItemReader<InputType> itemReader() {
    return new JdbcCursorItemReaderBuilder<InputType>()
            .name("jdbcItemReader")
            .dataSource(dataSource)
            .sql("SELECT id, name, value, created_at FROM test_table")
            .rowMapper((rs, rowNum) -> new InputType(
                    rs.getString("name"),
                    rs.getInt("value"),
                    rs.getTimestamp("created_at").toLocalDateTime()
            ))
            .build();
}

 

test_table 테이블에서 데이터를 조회한다. 

 

테스트를 위해 30만 건 Insert.

 

2-3. processor

@Bean
public ItemProcessor<InputType, OutputType> itemProcessor() {
    return input -> new OutputType(
            input.getName().toUpperCase(),
            input.getValue() * 2,
            input.getCreatedAt()
    );
}

 

로직 수행을 위해 read에서 읽은 데이터를 변환한다. 

name는 대문자로, value는 두배로 증폭.

 

2-4. writer

@Bean
public JdbcBatchItemWriter<OutputType> itemWriter() {
    return new JdbcBatchItemWriterBuilder<OutputType>()
            .dataSource(dataSource)
            .sql("INSERT INTO processed_table (name, value, created_at) VALUES (:name, :value, :createdAt)")
            .beanMapped()
            .build();
}

 

작업 완료된 데이터를 processed_table에 inert를 진행한다.

 

3. 메모리 체크 함수 생성

배치가 수행되는 동안 애플리케이션의 메모리를 체크합니다.

package com.home.batch.listener;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.listener.StepExecutionListenerSupport;

import java.lang.management.ManagementFactory;
import com.sun.management.OperatingSystemMXBean;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;

public class PerformanceStepListener extends StepExecutionListenerSupport {

    private final Runtime runtime;

    public PerformanceStepListener() {
        this.runtime = Runtime.getRuntime();
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        LocalDateTime startTime = stepExecution.getStartTime() != null
                ? stepExecution.getStartTime()
                : LocalDateTime.now();

        LocalDateTime endTime = stepExecution.getEndTime() != null
                ? stepExecution.getEndTime()
                : LocalDateTime.now();

        Instant startInstant = localDateTimeToInstant(startTime);
        Instant endInstant = localDateTimeToInstant(endTime);

        long executionTimeMs = Duration.between(startInstant, endInstant).toMillis();

        float cpuUsage = getCpuUsage();
        double memoryUsage = getMemoryUsage();

        printPerformanceMetrics(stepExecution, executionTimeMs, cpuUsage, memoryUsage);
        return stepExecution.getExitStatus();
    }

    private Instant localDateTimeToInstant(LocalDateTime localDateTime) {
        return localDateTime.atZone(ZoneId.systemDefault()).toInstant();
    }

    private float getCpuUsage() {
        try {
            OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
            long startTime = System.nanoTime();
            double startCpuTime = osBean.getProcessCpuTime();

            Thread.sleep(100); // 100ms 대기

            long endTime = System.nanoTime();
            double endCpuTime = osBean.getProcessCpuTime();

            return (float) ((endCpuTime - startCpuTime) / (endTime - startTime) * 100 / osBean.getAvailableProcessors());
        } catch (Exception e) {
            e.printStackTrace();
            return 0.0f;
        }
    }

    private double getMemoryUsage() {
        return (runtime.totalMemory() - runtime.freeMemory()) / (1024.0 * 1024.0);
    }

    private static void printPerformanceMetrics(StepExecution stepExecution, long executionTimeMs, float cpuUsage, double memoryUsage) {
        System.out.printf("""
                === Batch Step Performance Metrics ===
                Step Name: %s
                Execution Time: %d ms
                CPU Usage: %.2f %%
                Memory Usage: %.2f MB
                Items Read: %d
                Items Written: %d
                ======================================
                """,
                stepExecution.getStepName(),
                executionTimeMs,
                cpuUsage,
                memoryUsage,
                stepExecution.getReadCount(),
                stepExecution.getWriteCount()
        );
    }
}

 

3-1. StepExecutionListenerSupport

public class PerformanceStepListener extends StepExecutionListenerSupport

 

Spring Batch의 StepExecutionListenerSupport를 상속받아 Step 실행 이후(afterStep)에 특정 작업을 수행할 수 있도록 확장했다.

 

3-2. runtime

public PerformanceStepListener() {
    this.runtime = Runtime.getRuntime();
}

 

JVM 메모리 정보를 얻기 위해 Java의 Runtime 인스턴스를 사용

 

3-3. afterStep

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
    LocalDateTime startTime = stepExecution.getStartTime() != null
            ? stepExecution.getStartTime()
            : LocalDateTime.now();

    LocalDateTime endTime = stepExecution.getEndTime() != null
            ? stepExecution.getEndTime()
            : LocalDateTime.now();

    Instant startInstant = localDateTimeToInstant(startTime);
    Instant endInstant = localDateTimeToInstant(endTime);

    long executionTimeMs = Duration.between(startInstant, endInstant).toMillis();

    float cpuUsage = getCpuUsage();
    double memoryUsage = getMemoryUsage();

    printPerformanceMetrics(stepExecution, executionTimeMs, cpuUsage, memoryUsage);
    return stepExecution.getExitStatus();
}

private Instant localDateTimeToInstant(LocalDateTime localDateTime) {
    return localDateTime.atZone(ZoneId.systemDefault()).toInstant();
}

private float getCpuUsage() {
    try {
        OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
        long startTime = System.nanoTime();
        double startCpuTime = osBean.getProcessCpuTime();

        Thread.sleep(100); // 100ms 대기

        long endTime = System.nanoTime();
        double endCpuTime = osBean.getProcessCpuTime();

        return (float) ((endCpuTime - startCpuTime) / (endTime - startTime) * 100 / osBean.getAvailableProcessors());
    } catch (Exception e) {
        e.printStackTrace();
        return 0.0f;
    }
}

private double getMemoryUsage() {
    return (runtime.totalMemory() - runtime.freeMemory()) / (1024.0 * 1024.0);
}

 

  • 실행 시간 계산
    • Step의 시작 시간(startTime)과 종료 시간(endTime)을 기반으로 실행 시간을 계산
    • 시작/종료 시간이 null인 경우, 현재 시간을 사용
  • CPU 사용량 계산
    • OperatingSystemMXBean 사용
    • osBean.getProcessCpuTime()과 시스템 시간(System.nanoTime())을 사용하여 CPU 사용 시간 측정
    • Thread.sleep(100). Cpu 사용량을 계산하기 위해 100ms 대기
    • CPU 사용률 = CPU 사용 시간 차이 ÷ 시스템 시간 차이 ÷ CPU 코어 수 × 100
  • 메모리 사용량 계산
    • Runtime 객체 사용
    • totalMemory(): JVM에 할당된 총 메모리
    • freeMemory(): JVM에서 사용 가능한 메모리
    • 사용 메모리 = 총 메모리 - 사용 가능한 메모리
    • 계산 결과를 MB 단위로 변환(/ (1024.0 * 1024.0))
  • 4. 결과 출력
    • 계산된 값들을 print로 찍어낸다.

 

4. 배치 수행

1,500,000건으로 테스트를 진행해 보았다. 

 

mssql을 사용했고 서버는 docker container로 올렸다. 

 

4-1. 어플리케이션 수행 (chunk 1000)

=== Batch Step Performance Metrics ===
Step Name: step1
Execution Time: 80352 ms
CPU Usage: 0.01 %
Memory Usage: 48.31 MB
Items Read: 1500000
Items Written: 1500000
======================================

 

chunk 1000건으로 수행하면

 

1,500,000만 건 기준으로 

 

Cpu는 0.01%, Memory는 48MB 사용된 부분을 알 수 있다

 

execution time도 80초, 하지만 1천 단위로 끊어 썼기 때문에 더 안정적이라 생각된다.

 

이 정도면 많이 양호하다 생각된다 

 

4-2. 도커 상태 확인

평시

 

데이터를 삽입하기 위해 잠깐 Cpu가 튀긴 했지만 무난한 수준이다.

 

배치 수행 후 (chunk 1000)

 

Cpu 사용도 Max 80% 이하이다. 

 

양호하다고 생각된다. 

 

메모리 사용율도 양호한데?

 

청크를 높여보자 

 

chunk 5000

=== Batch Step Performance Metrics ===
Step Name: step1
Execution Time: 52986 ms
CPU Usage: 0.01 %
Memory Usage: 85.33 MB
Items Read: 1500000
Items Written: 1500000
======================================

 

시간은 빨리 처리 되었지만 메모리가 많이 먹는다. 

 

cpu는 아까부터 0.01인데 저부분은 확인해 봐야겠다.

 

 

Cpu가 조금 더 오르긴 하다.

 

85%까지 올랐다. 

 

어플리케이션의 메모리를 정리해 본다면

chunk 크기 실행 시간(ms) CPU 사용률 (%) 메모리 사용량 (MB)
1000 80352 0.01 48.31
5000 52986 0.01 85.33

 

5. 마무리

회사 프로젝트에서 사용하는 방식에서 문제가 생겨 다른 방법을 찾아보다 chunk는 얼마나 효율이 좋을지 증명하기 위해 해당 글을 작성해 보았다. 

chunk를 적용하지 않은 버전을 올리지는 않았지만 회사 프로젝트에서 사용하던 방식과 위의 로직을 적용한 버전으로 비교하면 성능이 개선되었기는 했다.

이제 이 연구를 바탕으로 업무에 적용해 결과를 도출할 계획이다. 

추가로 코프링을 이용하여 더 넓은 지식을 습득하고 최적의 방식을 도출해 낼 수 있을 것 같다.

'코딩공부' 카테고리의 다른 글

Java와 Kotlin, 컴파일 속도 차이가 많이날까?  (1) 2024.10.29
Kotlin을 알아보자  (0) 2024.10.27