Spring Batch

In this article I will show how to create a complete Spring Batch project. I will show how to configure a Job and the Steps. I will implement a Step with a reader, processor and Writer. I will implement another Step with a Tasklet.

Content

  • Concepts
  • Create a Spring Batch application
  • Job & Steps
  • Tasklets
  • Database Followup Tables
  • Trigger by Request

Watch this explanatory video for more details.

All the code used is available in this repository.

Concepts

Let’s first explain some of the main components of Spring Batch and I will then implement a real example with Spring Batch.

Jobs and Steps

I can configure jobs to be run at different time, in parallel or when calling an endpoint. Those jobs will be composed by a single step or by multiple steps. Those steps used to share a final goal, they used to be connected. I may have a job which updates the user’s information and the steps can be: import the new users from a file, update some user’s activity and delete the old accounts.

I can chain all the steps in a linear way. If the first one fails, the rest won’t run. Or I can have independent steps. If the first one fails, the second one continues. But I can connect the steps in some other ways, in some more complicated ways. I can configure the steps with a retry mechanism. I can configure the steps to be executed in a repeated way, as a loop. I can configure a step to run depending on the result of the previous job. Or I can run multiple steps in parallel.

Reader, Processor, Writer

A single step is usually composed of three items: a Reader, a Processor and a Writer. Spring Batch already have multiple readers and writers to read or write from multiple databases, CSV files, JSON files, Kafka, Mongo and more. But of course, I can create my own reader and writer. With this approach, the output of the Reader will be passed as an input of the Processor. Then, the Processor will handle an item and return it. This entity will be passed then to the Writer. Finally, the Writer will save the entity in the desired destination.

Tasklet

If my job doesn’t fit in this model, I also have the Tasklets. The Tasklets are simple actions that can be run as a step. In this case, with the Tasklets, I can operate whatever action I want inside the Tasklet definition. Maybe I need some more complicated aggregation operations that hardly fit in the previous step definition.

So, why choosing the reader processor and writer instead of a Tasklet? What happens if I have a lot of data to handle that doesn’t even fit in the memory? With the Tasklet, I have to manage by myself the pagination. But with the reader processor and writer, I can define a chunk size. With the chunk size, Spring Batch will automatically handle the pagination when reading, when processing and when writing the data.

Create a Spring Batch application

Let’s go now create a Spring Batch microservice with a real example. I will start creating a new project with Spring Initializr adding the following dependencies:

  • spring-boot-starter-batch, of course
  • spring-boot-starter-actuator to monitor my server
  • spring-boot-starter-web to let me trigger the job from an endpoint
  • spring-boot-starter-data-jpa as I will import an edit data in the database
  • spring-cloud-starter-bootstrap to have the cloud context
  • spring-cloud-starter-config as I have a distributed configuration server
  • spring-cloud-starter-netflix-eureka-client for the servie discovery
  • postgresql as I have a PostgreSQL database
  • And Lombok for code generation.

I will create a job which will perform some actions on my users table and import some new users from a file.

Job & Steps

Let’s now create the job. I will first create the steps which will import the new users from a CSV file.

    @Bean
    public Step importUsersStep() {
        return stepBuilderFactory.get("importUsersStep")
                .<BatchFileUser, BookstoreUser>chunk(100)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }

At line 3, importUsersStep is the name of the step. It will be useful later, when monitoring the job. At line 4, I read the files in chunks of 100 elements. Also in the line 4, I need to indicate the objects that will use the processor as input and output. Let’s now create the reader.

    @Bean
    public ItemReader<BatchFileUser> reader() {
        return new FlatFileItemReaderBuilder<BatchFileUser>()
                .name("userReader")
                .resource(new ClassPathResource("users.csv"))
                .linesToSkip(1)
                .delimited()
                .delimiter(";")
                .names("id", "name", "email", "birth_date")
                .targetType(BatchFileUser.class)
                .build();
    }

I will use a reader created by Spring Batch, FlatFileItemReaderBuilder. This reader is already prepared to read CSV files. I indicate the file to read at line 5, located in the resource folder. At line 6 I want to skip the first line, as here will be located the headers. Then at line 7 and 8, I specify the delimited character. The default one is the comma. If I want another delimiter character, I need to indicate it. The name of the fields per columns that will be mapped into my DTO are indicated at line 9. And finally, the target class to store each line of the CSV file. Let’s continue with the processor.

    @Bean
    public ItemProcessor<BatchFileUser, BookstoreUser> processor() {
        return item -> {
            Random r = new Random();
            String generatedPassword = r.ints(48, 122)
                    .limit(16)
                    .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
                    .toString();

            return new BookstoreUser(item.getId(),
                    item.getEmail().split("@")[0],
                    passwordEncoder.encode(generatedPassword),
                    LocalDate.parse(item.getBirthDate()),
                    0
                    );
        };
    }

The item declared in the lambda is the previously used DTO. The main purpose of the processor is to map the DTO into the entity object BookstoreUser. The rest (from lines 4 to 8) is only to generate a random password for the imported users. And finally, the writer.

    @Bean
    public ItemWriter<BookstoreUser> writer() {
        return userRepository::saveAll;
    }

For each item received, for each chunk received, I store it with the repository. And now I need to integrate this step in a job.

    @Bean
    public Job updateUsersJob(Step importUsersStep, Step updateAgeTasklet) {
        return jobBuilderFactory.get("updateUsersJob")
                .incrementer(new RunIdIncrementer())
                .start(importUsersStep)
                .build();
    }

The name of the job is indicated at the line 3. I indicate at line 4 that each time the job is run, I will use an incremented ID. And the job starts by the step I’ve created at line 5.

Tasklets

And if I want to add another step, like a Tasklet, I need to edit my job like that.

    @Bean
    public Job updateUsersJob(Step importUsersStep, Step updateAgeTasklet) {
        return jobBuilderFactory.get("updateUsersJob")
                .incrementer(new RunIdIncrementer())
                .start(importUsersStep)
                .next(updateAgeTasklet)
                .build();
    }

And let’s create this Tasklet. Which is another step.

    @Bean
    public Step updateAgeTasklet() {
        return stepBuilderFactory.get("updateAgeTasklet")
                .tasklet((stepContribution, chunkContext) -> {
                    List<BookstoreUser> users = userRepository.findAll();
                    users.forEach(user -> user.setAge(ChronoUnit.YEARS.between(user.getBirthDate(), LocalDate.now())));
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

As each step, I need to name it. Inside the Tasklet, I update the age of each user in the database calculating it from the birthday. Finally, I return the status Finished.

Before starting the service, I need to edit my application.yml.

server:
  port: 0
  servlet:
    context-path: /batch

management:
  health:
    livenessState.enabled: true
    readinessState.enabled: true
  endpoint.health.probes.enabled: true

spring:
  application.name: service-batch
  datasource:
    platform: postgres
    driver-class-name: org.postgresql.Driver
    url: jdbc:postgresql://localhost:5434/bookstore
    username: service_users
    password: srv-usrs
    initialization-mode: always
    jpa:
      database-platform: org.hibernate.dialect.PostgreSQLDialect
      show-sql: false
      hibernate:
        ddl-auto: create
  jackson:
    deserialization:
      FAIL_ON_IGNORED_PROPERTIES: false
    serialization:
      INDENT_OUTPUT: false
      WRITE_DATES_AS_TIMESTAMPS: false
      WRITE_BIGDECIMAL_AS_PLAIN: true
  batch:
    initialize-schema: always
    schema: classpath:org/springframework/batch/core/schema-postgresql.sql

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka

I’ve added the context path to access the available endpoints at line 4. I will create one later. I’ve added the health check configuration for Actuator at lines 6 to 10. I’ve configured the database for JPA at lines 14 to 25. I’ve configured Eureka to talk with the service discovery at lines 37 to 40. And at lines 33 to 35 is the Batch specific configuration. I indicate to try always to create the schema, the schema with the follow-up tables of Spring Batch. And at line 35 is the script where are located the queries to create those tables. There are other SQL scripts depending on the database.

Database Followup Tables

If I run my application, I can see that the job was run successfully without any other action from me. Let’s check the database.

                          List of relations
    Schema     |             Name             | Type  |     Owner
---------------+------------------------------+-------+---------------
 service_users | batch_job_execution          | table | service_users
 service_users | batch_job_execution_context  | table | service_users
 service_users | batch_job_execution_params   | table | service_users
 service_users | batch_job_instance           | table | service_users
 service_users | batch_step_execution         | table | service_users
 service_users | batch_step_execution_context | table | service_users

I see multiple tables created by Spring Batch. batch_job_instance reflects the definition of all the jobs. batch_job_execution, batch_job_execution_context and batch_job_execution_params will inform me about all the executions. I will see if they were successful or not, and with which parameters they were started. And if the job failed, I will see the stacktraces. I will have a similar information in the steps table batch_step_execution and batch_step_execution_context. I will see the status of each step linked to each available job.

Trigger by request

The previous job started directly when starting the service. But I may want to start the job when calling an endpoint. Let’s create this endpoint.

@RequiredArgsConstructor
@RestController
public class JobController {

    private final Job job;
    private final JobLauncher jobLauncher;

    @PostMapping("/run")
    public ResponseEntity<Void> runJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
        jobLauncher.run(job, new JobParameters());
        return ResponseEntity.noContent().build();
    }
}

As I only have one job created in my project, the autowired at line 5 is automatic. And my job doesn’t need any parameter, that’s why i leave the JobParameters empty. Finally, the job launcher will start the job in a background thread and continue with the execution. This makes my job start asynchronously. My endpoint won’t get blocked until my job finishes.

Conclusion

  • I’ve created a new project with the dependency Spring Batch and some others to connect to my cloud architecture, as the service discovery, the distributed configuration and the database.
  • I’ve added the entities and repositories, as usual with Spring JPA.
  • I’ve created the batch configuration bean, with the annotation @EnableBatchProcessing.
  • I’ve created two steps. The first one was composed of a reader, a processor and a writer, to read a CSV file and store it in database. The second step was a Tasklet which only updates the age of each user.
  • I’ve created a job chaining those two steps.
  • Then i’ve edited my application.yml to indicate to create the follow-up tables.
  • And finally, I’ve created a controller with the job launcher to start the job asynchronously.

References

Repository

My New ebook, How to Master Git With 20 Commands, is available now.

Leave a comment

A WordPress.com Website.