Java SpringAI

Spring AI with Spring Batch — Bulk AI Processing for Large Datasets

Spring AI with Spring Batch — Bulk AI Processing for Large Datasets

When you need to process thousands of records through an AI model — rewriting product descriptions, classifying support tickets, extracting data from documents — you need batch processing. Spring Batch provides exactly the right framework: chunk-based processing, restartability, parallel execution, and progress tracking. This tutorial combines Spring Batch with Spring AI for production bulk AI workflows.

Use Cases for Batch AI Processing

✔ Classify 50,000 support tickets into categories
✔ Generate SEO descriptions for 10,000 product listings
✔ Extract structured data from 5,000 PDF invoices
✔ Translate 100,000 knowledge base articles
✔ Moderate content across 1 million user posts
✔ Generate embeddings for a document corpus

Maven Dependencies

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.ai</groupId>
    <artifactId>spring-ai-openai-spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

Domain Model

@Entity
@Table(name = "support_tickets")
public class SupportTicket {

    @Id @GeneratedValue
    private Long id;

    private String title;
    private String description;
    private String category;         // filled by batch AI job
    private String priority;         // filled by batch AI job
    private boolean aiProcessed;
    private LocalDateTime processedAt;

    // getters and setters...
}

ItemReader — Read Unprocessed Tickets

@Component
@StepScope
public class TicketItemReader implements ItemReader<SupportTicket> {

    private final JdbcCursorItemReader<SupportTicket> delegate;

    public TicketItemReader(DataSource dataSource) {
        this.delegate = new JdbcCursorItemReaderBuilder<SupportTicket>()
                .name("ticketReader")
                .dataSource(dataSource)
                .sql("SELECT * FROM support_tickets WHERE ai_processed = false ORDER BY id")
                .rowMapper((rs, rowNum) -> {
                    SupportTicket t = new SupportTicket();
                    t.setId(rs.getLong("id"));
                    t.setTitle(rs.getString("title"));
                    t.setDescription(rs.getString("description"));
                    return t;
                })
                .build();
        this.delegate.open(new ExecutionContext());
    }

    @Override
    public SupportTicket read() throws Exception {
        return delegate.read();
    }
}

ItemProcessor — Call AI for Classification

@Component
@StepScope
public class TicketClassificationProcessor implements ItemProcessor<SupportTicket, SupportTicket> {

    private final ChatClient chatClient;

    public TicketClassificationProcessor(ChatClient.Builder builder) {
        this.chatClient = builder
                .defaultSystem("""
                    You are a support ticket classifier. Classify tickets precisely.
                    Always respond with valid JSON only.
                    """)
                .build();
    }

    public record Classification(String category, String priority) {}

    @Override
    public SupportTicket process(SupportTicket ticket) throws Exception {
        Classification result = chatClient.prompt()
                .user("""
                      Classify this support ticket:
                      Title: %s
                      Description: %s

                      Category options: BILLING, TECHNICAL, ACCOUNT, FEATURE_REQUEST, BUG
                      Priority options: CRITICAL, HIGH, MEDIUM, LOW

                      Return JSON: {"category":"...", "priority":"..."}
                      """.formatted(ticket.getTitle(), ticket.getDescription()))
                .call()
                .entity(Classification.class);

        ticket.setCategory(result.category());
        ticket.setPriority(result.priority());
        ticket.setAiProcessed(true);
        ticket.setProcessedAt(LocalDateTime.now());

        return ticket;
    }
}

ItemWriter — Save Results

@Component
@StepScope
public class TicketItemWriter implements ItemWriter<SupportTicket> {

    private final SupportTicketRepository repository;

    public TicketItemWriter(SupportTicketRepository repository) {
        this.repository = repository;
    }

    @Override
    public void write(Chunk<? extends SupportTicket> chunk) {
        repository.saveAll(chunk.getItems());
        System.out.printf("Saved %d classified tickets%n", chunk.size());
    }
}

Batch Job Configuration

@Configuration
@EnableBatchProcessing
public class TicketClassificationJobConfig {

    @Bean
    public Step classificationStep(
            JobRepository jobRepository,
            PlatformTransactionManager transactionManager,
            TicketItemReader reader,
            TicketClassificationProcessor processor,
            TicketItemWriter writer) {

        return new StepBuilder("classificationStep", jobRepository)
                .<SupportTicket, SupportTicket>chunk(10, transactionManager)  // process 10 at a time
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .faultTolerant()
                .retryLimit(3)
                .retry(Exception.class)          // retry AI call failures
                .skipLimit(100)
                .skip(AiProcessingException.class)  // skip and log bad records
                .build();
    }

    @Bean
    public Job ticketClassificationJob(
            JobRepository jobRepository,
            Step classificationStep,
            JobExecutionListener listener) {

        return new JobBuilder("ticketClassificationJob", jobRepository)
                .listener(listener)
                .start(classificationStep)
                .build();
    }

    @Bean
    public JobExecutionListener jobListener() {
        return new JobExecutionListener() {
            @Override
            public void afterJob(JobExecution jobExecution) {
                System.out.printf("Job completed. Status: %s. " +
                        "Read: %d, Written: %d, Skipped: %d%n",
                        jobExecution.getStatus(),
                        jobExecution.getStepExecutions().iterator().next().getReadCount(),
                        jobExecution.getStepExecutions().iterator().next().getWriteCount(),
                        jobExecution.getStepExecutions().iterator().next().getProcessSkipCount()
                );
            }
        };
    }
}

Trigger Job via REST Endpoint

@RestController
@RequestMapping("/batch")
public class BatchJobController {

    private final JobLauncher jobLauncher;
    private final Job         ticketClassificationJob;

    public BatchJobController(JobLauncher jobLauncher, Job ticketClassificationJob) {
        this.jobLauncher             = jobLauncher;
        this.ticketClassificationJob = ticketClassificationJob;
    }

    @PostMapping("/classify-tickets")
    public String runClassification() throws Exception {
        JobParameters params = new JobParametersBuilder()
                .addLocalDateTime("runTime", LocalDateTime.now())
                .toJobParameters();

        JobExecution execution = jobLauncher.run(ticketClassificationJob, params);
        return "Job started: " + execution.getJobId() + " Status: " + execution.getStatus();
    }
}

Output

POST /batch/classify-tickets
→ Job started: 42 Status: STARTED

Console:
Saved 10 classified tickets
Saved 10 classified tickets
...
Job completed. Status: COMPLETED. Read: 1000, Written: 987, Skipped: 13

Database after job:
id | title                       | category    | priority | ai_processed
───────────────────────────────────────────────────────────────────────────
1  | Login page not loading       | TECHNICAL   | HIGH     | true
2  | Charge me twice for order    | BILLING     | CRITICAL | true
3  | Add dark mode please         | FEATURE_REQ | LOW      | true
4  | Password reset email missing | ACCOUNT     | HIGH     | true

Key Points

  • Set chunk size to 10–50 for AI processing — smaller chunks mean faster restart after failure but more overhead
  • Enable .retryLimit(3) to handle transient AI API errors (rate limits, timeouts) automatically
  • Use .skipLimit(100) with .skip(Exception.class) to skip malformed records without stopping the job
  • Spring Batch stores job state in a database — failed jobs can be restarted from the last checkpoint without reprocessing completed records
  • For very large datasets (1M+ records), enable parallel processing with TaskExecutor partitioned steps
Topics: Java SpringAI
← Newer Post Older Post →