이전 글에서 이어지는 글입니다. 이 글은 프로젝트에서의 구체적인 Spring Batch Job 구현 내용과 파이프라인 안정화 과정에서 마주했던 이슈를 다룹니다.
Spring Batch Job 구현
Spring Batch는 대용량 데이터를 처리할 때, 지정된 개수(Chunk)만큼 데이터를 메모리에 읽어 한 번에 처리하고 저장하는 Chunk 지향 처리 모델을 사용한다. 이 모델은 ItemReader
, ItemProcessor
, ItemWriter
라는 세 가지 컴포넌트로 구성되며, 각 컴포넌트의 역할을 명확히 분리하여 구현했다.
kamisDailyPriceSyncJob
: 일일 가격 데이터 수집
매주 화~토요일 오전 3시에 전날의 지역별 가격 데이터를 수집하여 daily_regional_price
테이블에 저장하는 Job이다.
ItemReader
(KamisDailyPriceApiReader
):read()
메서드가 처음 호출될 때, 사전에 정의된 모든 지역과 상품 부류 코드 조합에 대해 KAMIS API를 호출하여 모든 가격 데이터를 메모리에 로드한다.- 이후
read()
가 호출될 때마다 메모리에 로드된 데이터를 하나씩 반환한다.
ItemProcessor
(KamisDailyPriceProcessor
):- API 응답 DTO를 DB에 저장하기 위한
DailyRegionalPrice
엔티티로 변환한다. - 매번 DB를 조회하는 오버헤드를 줄이기 위해,
product
와region
마스터 데이터를lazy
초기화를 통해 메모리에 캐싱하고, 이를 활용하여 엔티티를 조립한다.
- API 응답 DTO를 DB에 저장하기 위한
ItemWriter
(JpaItemWriter
):- Spring Batch가 기본으로 제공하는
JpaItemWriter
를 사용했다. Processor로부터 전달받은 엔티티 목록을 Chunk 단위로 묶어 효율적인 Bulk Insert를 수행한다.
- Spring Batch가 기본으로 제공하는
1
2
3
4
5
6
7
8
9
10
// KamisDailyPriceBatchConfiguration.kt
@Bean
fun kamisDailyPriceSyncStep(): Step {
return StepBuilder(STEP_NAME, jobRepository)
.chunk<KamisDailyPriceApiData, DailyRegionalPrice>(CHUNK_SIZE, transactionManager)
.reader(kamisDailyPriceApiReader(null))
.processor(kamisDailyPriceProcessor(null))
.writer(kamisDailyPriceJpaWriter())
.build()
}
kamisAnnualPriceSyncJob
: 연평균 가격 데이터 갱신
매주 일요일 오전 4시에 상품별 당해 전국 평균 가격을 annual_national_price
테이블에 갱신하는 Job이다.
ItemReader
(JpaPagingItemReader
):- 일일 가격 Job과 달리, 이 Job은 DB에 저장된 각 상품별로 API를 호출해야 한다.
- 따라서
JpaPagingItemReader
를 사용하여Product
테이블의 데이터를 페이징 기반으로 안전하게 조회했다.
ItemProcessor
(KamisAnnualPriceProcessor
):- Reader로부터 전달받은
Product
엔티티 정보를 기반으로 ‘연도별 가격 API’를 호출한다. - API 응답(JSON)을 파싱하여
AnnualNationalPrice
엔티티 리스트로 변환한다.
- Reader로부터 전달받은
ItemWriter
(커스텀KamisAnnualPriceJpaItemWriter
):- 이 Job은 데이터가 이미 존재하면
UPDATE
, 없다면INSERT
를 수행(Upsert)해야 했다. - 이를 위해 커스텀
ItemWriter
를 구현했다. Writer는 전달받은 엔티티에 대해 DB에 데이터가 존재하는지 먼저 확인하고, 존재 여부에 따라 엔티티의 상태를 변경하거나(UPDATE), 그대로 저장(INSERT)하는 로직을 수행한다.
- 이 Job은 데이터가 이미 존재하면
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// KamisAnnualPriceJpaItemWriter.kt
class KamisAnnualPriceJpaItemWriter(...) : ItemWriter<List<AnnualNationalPrice>> {
override fun write(chunk: Chunk<out List<AnnualNationalPrice>>) {
val flattenedList = chunk.items.flatten()
// ...
for (item in flattenedList) {
val existingPrice = annualNationalPriceRepository.findByProductIdAndSurveyYearAndMarketType(...)
if (existingPrice != null) {
existingPrice.updatePrice(item.price) // UPDATE
itemsToSave.add(existingPrice)
} else {
itemsToSave.add(item) // INSERT
}
}
annualNationalPriceRepository.saveAll(itemsToSave)
}
}
최종 ETL 파이프라인
이렇게 해서 완성된 ETL 파이프라인은 다음과 같다.
테스트용 API 구현
배치 Job을 개발하고 테스트하는 과정에서 스케줄링된 시간까지 기다리는 것은 매우 비효율적이다. 이를 해결하기 위해 @Profile
어노테이션을 활용하여 local
과 dev
환경에서만 활성화되는 배치 수동 실행 API를 구현했다.
1
2
3
4
5
6
7
8
9
10
11
// BatchJobController.kt
@RestController
@RequestMapping("/api/v1/batch")
@Profile("local", "dev")
class BatchJobController(...) {
@PostMapping("/run/{jobName}")
fun runBatchJob(@PathVariable jobName: String): ResponseEntity<String> {
// ... JobLauncher를 사용하여 Job 실행 ...
}
}
주요 이슈 및 해결 방안
외부 API 호출 시 간헐적 네트워크 오류 발생
현상
WebClientRequestException
이 불규칙적으로 발생하며 배치가 실패했다.1
WebClientRequestException: Connection prematurely closed BEFORE response
배치 실행 내역이 기록되는 Spring Batch 메타 테이블인
BATCH_JOB_EXECUTION
테이블을 통해 총 세 차례의 실패가 발생했으며,그 중 두 번이
WebClientRequestException
로 인한 실패라는 것을 발견했다.원인 분석
Reactor Netty 공식 문서와 GitHub reactor-netty 리포지토리에 등록된 issue를 찾아보니, 유사한 이슈에 대해 서버나 중간 네트워크 장비(로드밸런서, 프록시 등)의 유휴 시간 제한(Idle Timeout) 정책을 주된 원인으로 꼽았다.
보다 정확한 원인 분석을 위해서는
tcpdump
로 네트워크 패킷을 직접 분석해야 했지만, 해당 시점에는 분석에 제약이 있었다. 그 대신 배치 작업의 특성을 바탕으로 원인을 가정한 뒤, 문제가 해결되는지를 확인하는 식으로 검증하기로 했다.배치 작업은 연결이 유휴 상태에 빠질 틈 없이 단시간에 대량의 API 요청을 보낸다. 따라서 문서에서 언급된 ‘Idle Timeout’보다는, 요청이 집중되는 순간 서버의 ‘트래픽 제어(Throttling)’ 정책에 걸렸을 가능성이 높다고 판단했다. 이 외에도 일시적인 네트워크 불안정도 원인이 될 수 있다고 가정했다.
해결
이 문제를 해결하기 위해 두 가지 조치를 취했다.
- 속도 제어: 각 API 호출 사이에
Thread.sleep(200L)
을 추가하여 서버에 가해지는 부하를 분산시켰다. - 재시도 로직 도입: 보다 근본적인 해결책으로, Spring Batch가 제공하는
faultTolerant
기능을 활용했다. Step 설정에retryLimit(3)
과retry(WebClientRequestException.class)
를 추가하여, 특정 네트워크 예외가 발생했을 때 실패로 처리하지 않고 해당 Chunk를 최대 3회까지 자동으로 재시도하도록 구성했다.
1 2 3 4 5 6 7 8 9 10 11
// KamisDailyPriceBatchConfiguration.kt @Bean fun kamisDailyPriceSyncStep(): Step { return StepBuilder(STEP_NAME, jobRepository) .chunk<KamisDailyPriceApiData, DailyRegionalPrice>(CHUNK_SIZE, transactionManager) // ... reader, processor, writer .faultTolerant() // 재시도 기능 활성화 .retryLimit(3) // 최대 3회 재시도 .retry(WebClientRequestException::class.java) // 특정 예외에 대해서만 재시도 .build() }
- 속도 제어: 각 API 호출 사이에
이후 며칠동안 배치가 정상적으로 실행되는 것을 확인했다.
데이터 중복으로 인한 Unique Key 제약 조건 위반
현상
kamisDailyPriceSyncJob
을 처음 실행하자마자Duplicate entry ... for key 'uk_daily_price_product_region_date_market'
예외가 발생하며 Step이 실패했다.1
org.hibernate.exception.ConstraintViolationException: could not execute statement [Duplicate entry '18-1-2025-08-22-RETAIL' for key 'daily_regional_price.uk_daily_price_product_region_date_market'] [insert into daily_regional_price (created_at,market_type,price,product_id,region_id,survey_date,updated_at) values (?,?,?,?,?,?,?)]
- 원인 분석
daily_regional_price
테이블은(product_id, region_id, survey_date, market_type)
조합을 Unique Key로 설정한 상태였다.1 2 3 4 5 6 7 8 9 10 11 12
@Entity @Table( name = "daily_regional_price", uniqueConstraints = [ UniqueConstraint( name = "uk_daily_price_product_region_date_market", columnNames = ["product_id", "region_id", "survey_date", "market_type"], ) ], ) class DailyRegionalPrice
그런데 Postman으로 API 응답 명세를 검토한 결과, 동일한 품목(e.g., 고구마)에 대해 ‘상품’, ‘중품’ 등 복수의 품질 등급(rank) 데이터가 하나의 배열에 포함되어 있었다. 이로 인해 제약 조건 위반이 발생한 것이었다.
KAMIS API 응답 예시
1 2 3 4 5 6 7 8
[ { "item_name": "고구마", "kind_name": "밤(1kg)", "rank": "상품", "dpr1": "5,370" }, { "item_name": "고구마", "kind_name": "밤(1kg)", "rank": "중품", "dpr1": "4,425" } ]
해결
KamisDailyPriceProcessor
에서 서비스의 기준이 되는 ‘상품’ 등급의 데이터만 엔티티로 변환하고, 그 외 등급의 데이터는null
을 반환하도록 필터링 로직을 추가했다. Spring Batch의 Chunk 모델에서 Processor가null
을 반환하면, 해당 아이템은 Writer로 전달되지 않고 무시된다.1 2 3 4 5 6 7 8 9
// KamisDailyPriceProcessor.kt override fun process(wrapper: KamisDailyPriceApiData): DailyRegionalPrice? { val item = wrapper.item // '상품' 등급이 아니면 필터링 if (item.rank != "상품") { return null } // ... }
불필요한 I/O로 인한 효율 저하
문제 식별
kamisAnnualPriceSyncJob
의 실행 로그를 분석하던 중, 연평균 가격 데이터를 제공하지 않는 품목(축산물,categoryCode: '500'
)에 대해서도 불필요한 API 호출이 발생하고 있음을 발견했다. 또한 API는 과거 6년치 데이터를 한 번에 반환하는데, 우리 로직은 현재 연도의 데이터만 필요함에도 불구하고 불필요한 데이터를 가공하고 있었다.해결
- API Call 최적화:
ItemReader
인JpaPagingItemReader
의 JPQL 쿼리에WHERE p.categoryCode != '500'
조건을 추가하여 API를 호출할 대상을 사전에 필터링했다. - DB Update 최적화:
ItemProcessor
로직을 수정하여, API 응답 데이터 중 현재 연도(survey_year
)에 해당하는 데이터만 필터링하여 엔티티로 변환하도록 개선했다. 이로써 불필요한 데이터 처리 및 DB UPDATE 연산을 제거할 수 있었다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// KamisAnnualPriceBatchConfiguration.kt (Reader 최적화) @Bean fun kamisAnnualPriceReader(): ItemReader<Product> { return JpaPagingItemReaderBuilder<Product>() // ... // '축산물' 카테고리는 조회 대상에서 제외 .queryString("SELECT p FROM Product p WHERE p.categoryCode != '500' ORDER BY p.id ASC") .build() } // KamisAnnualPriceProcessor.kt (Processor 최적화) private fun convertToEntities(...) { val entities = priceItems .filter { it.div == currentYear } // 현재 연도 데이터만 필터링 .mapNotNull { /* ... to Entity ... */ } // ... }
- API Call 최적화:
결론
이번 프로젝트를 통해 Spring Batch 기반의 ETL 파이프라인을 성공적으로 구축하여, 서비스의 핵심인 가격 데이터를 매일 안정적으로 자동 동기화하는 시스템을 완성할 수 있었다. 이 과정에서 다음과 같은 것들을 배웠다.
- 실용적 아키텍처 설계의 중요성: 프로젝트의 현재 규모와 단계를 객관적으로 분석하고, 그에 맞는 기술을 선택하는 것이 빠른 개발 속도와 효율적인 자원 사용의 핵심임을 알게됐다.
- 외부 시스템 연동의 안정성 확보: 외부 API 연동은 항상 실패 가능성을 내포하고 있으며, 재시도(Retry)와 같은 방어적 프로그래밍 패턴을 통해 시스템의 안정성을 높일 수 있다는 것을 체감했다.
- 프레임워크의 깊이 있는 활용: Spring Batch가 제공하는
faultTolerant
,JpaPagingItemReader
, Chunk 기반 처리 모델 등을 적극적으로 활용함으로써, 복잡한 비기능적 요구사항을 쉽게 해결하고 비즈니스 로직 자체에 더 집중할 수 있었다.