Spring AI with Spring Batch — Bulk AI Processing for Large Datasets
When you need to process thousands of records through an AI model — rewriting product descriptions, classifying support tickets, extracting data from documents — you need batch processing. Spring Batch provides exactly the right framework: chunk-based processing, restartability, parallel execution, and progress tracking. This tutorial combines Spring Batch with Spring AI for production bulk AI workflows.
Use Cases for Batch AI Processing
✔ Classify 50,000 support tickets into categories
✔ Generate SEO descriptions for 10,000 product listings
✔ Extract structured data from 5,000 PDF invoices
✔ Translate 100,000 knowledge base articles
✔ Moderate content across 1 million user posts
✔ Generate embeddings for a document corpus
Maven Dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
Domain Model
@Entity
@Table(name = "support_tickets")
public class SupportTicket {
@Id @GeneratedValue
private Long id;
private String title;
private String description;
private String category; // filled by batch AI job
private String priority; // filled by batch AI job
private boolean aiProcessed;
private LocalDateTime processedAt;
// getters and setters...
}
ItemReader — Read Unprocessed Tickets
@Component
@StepScope
public class TicketItemReader implements ItemReader<SupportTicket> {
private final JdbcCursorItemReader<SupportTicket> delegate;
public TicketItemReader(DataSource dataSource) {
this.delegate = new JdbcCursorItemReaderBuilder<SupportTicket>()
.name("ticketReader")
.dataSource(dataSource)
.sql("SELECT * FROM support_tickets WHERE ai_processed = false ORDER BY id")
.rowMapper((rs, rowNum) -> {
SupportTicket t = new SupportTicket();
t.setId(rs.getLong("id"));
t.setTitle(rs.getString("title"));
t.setDescription(rs.getString("description"));
return t;
})
.build();
this.delegate.open(new ExecutionContext());
}
@Override
public SupportTicket read() throws Exception {
return delegate.read();
}
}
ItemProcessor — Call AI for Classification
@Component
@StepScope
public class TicketClassificationProcessor implements ItemProcessor<SupportTicket, SupportTicket> {
private final ChatClient chatClient;
public TicketClassificationProcessor(ChatClient.Builder builder) {
this.chatClient = builder
.defaultSystem("""
You are a support ticket classifier. Classify tickets precisely.
Always respond with valid JSON only.
""")
.build();
}
public record Classification(String category, String priority) {}
@Override
public SupportTicket process(SupportTicket ticket) throws Exception {
Classification result = chatClient.prompt()
.user("""
Classify this support ticket:
Title: %s
Description: %s
Category options: BILLING, TECHNICAL, ACCOUNT, FEATURE_REQUEST, BUG
Priority options: CRITICAL, HIGH, MEDIUM, LOW
Return JSON: {"category":"...", "priority":"..."}
""".formatted(ticket.getTitle(), ticket.getDescription()))
.call()
.entity(Classification.class);
ticket.setCategory(result.category());
ticket.setPriority(result.priority());
ticket.setAiProcessed(true);
ticket.setProcessedAt(LocalDateTime.now());
return ticket;
}
}
ItemWriter — Save Results
@Component
@StepScope
public class TicketItemWriter implements ItemWriter<SupportTicket> {
private final SupportTicketRepository repository;
public TicketItemWriter(SupportTicketRepository repository) {
this.repository = repository;
}
@Override
public void write(Chunk<? extends SupportTicket> chunk) {
repository.saveAll(chunk.getItems());
System.out.printf("Saved %d classified tickets%n", chunk.size());
}
}
Batch Job Configuration
@Configuration
@EnableBatchProcessing
public class TicketClassificationJobConfig {
@Bean
public Step classificationStep(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
TicketItemReader reader,
TicketClassificationProcessor processor,
TicketItemWriter writer) {
return new StepBuilder("classificationStep", jobRepository)
.<SupportTicket, SupportTicket>chunk(10, transactionManager) // process 10 at a time
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.retryLimit(3)
.retry(Exception.class) // retry AI call failures
.skipLimit(100)
.skip(AiProcessingException.class) // skip and log bad records
.build();
}
@Bean
public Job ticketClassificationJob(
JobRepository jobRepository,
Step classificationStep,
JobExecutionListener listener) {
return new JobBuilder("ticketClassificationJob", jobRepository)
.listener(listener)
.start(classificationStep)
.build();
}
@Bean
public JobExecutionListener jobListener() {
return new JobExecutionListener() {
@Override
public void afterJob(JobExecution jobExecution) {
System.out.printf("Job completed. Status: %s. " +
"Read: %d, Written: %d, Skipped: %d%n",
jobExecution.getStatus(),
jobExecution.getStepExecutions().iterator().next().getReadCount(),
jobExecution.getStepExecutions().iterator().next().getWriteCount(),
jobExecution.getStepExecutions().iterator().next().getProcessSkipCount()
);
}
};
}
}
Trigger Job via REST Endpoint
@RestController
@RequestMapping("/batch")
public class BatchJobController {
private final JobLauncher jobLauncher;
private final Job ticketClassificationJob;
public BatchJobController(JobLauncher jobLauncher, Job ticketClassificationJob) {
this.jobLauncher = jobLauncher;
this.ticketClassificationJob = ticketClassificationJob;
}
@PostMapping("/classify-tickets")
public String runClassification() throws Exception {
JobParameters params = new JobParametersBuilder()
.addLocalDateTime("runTime", LocalDateTime.now())
.toJobParameters();
JobExecution execution = jobLauncher.run(ticketClassificationJob, params);
return "Job started: " + execution.getJobId() + " Status: " + execution.getStatus();
}
}
Output
POST /batch/classify-tickets
→ Job started: 42 Status: STARTED
Console:
Saved 10 classified tickets
Saved 10 classified tickets
...
Job completed. Status: COMPLETED. Read: 1000, Written: 987, Skipped: 13
Database after job:
id | title | category | priority | ai_processed
───────────────────────────────────────────────────────────────────────────
1 | Login page not loading | TECHNICAL | HIGH | true
2 | Charge me twice for order | BILLING | CRITICAL | true
3 | Add dark mode please | FEATURE_REQ | LOW | true
4 | Password reset email missing | ACCOUNT | HIGH | true
Key Points
- Set chunk size to 10–50 for AI processing — smaller chunks mean faster restart after failure but more overhead
- Enable
.retryLimit(3)to handle transient AI API errors (rate limits, timeouts) automatically - Use
.skipLimit(100)with.skip(Exception.class)to skip malformed records without stopping the job - Spring Batch stores job state in a database — failed jobs can be restarted from the last checkpoint without reprocessing completed records
- For very large datasets (1M+ records), enable parallel processing with
TaskExecutorpartitioned steps
Comments