SpringBoot + Spring Batch实现批处理任务
1前言
2正文
-
从 csv文件 读取数据,进行业务处理再存储 -
从 数据库 读取数据,进行业务处理再存储


`id` int(11) NOT NULL AUTO_INCREMENT COMMENT ‘主键’,
`blogAuthor` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT ‘博客作者标识’,
`blogUrl` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT ‘博客链接’,
`blogTitle` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT ‘博客标题’,
`blogItem` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT ‘博客栏目’,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 89031 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
<version>6.0.7.Final</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.18</version>
</dependency>
batch:
job:
#设置为 false -需要jobLaucher.run执行
enabled: false
initialize-schema: always
# table-prefix: my-batch
datasource:
druid:
username: root
password: root
url: jdbc:mysql://localhost:3306/hellodemo?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
driver-class-name: com.mysql.cj.jdbc.Driver
initialSize: 5
minIdle: 5
maxActive: 20
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
useGlobalDataSourceStat: true
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
server:
port: 8665

ps:这里我们用到了druid数据库连接池,其实有个小坑,后面文章会讲到。
public class BlogInfo {
private Integer id;
private String blogAuthor;
private String blogUrl;
private String blogTitle;
private String blogItem;
@Override
public String toString() {
return “BlogInfo{” +
“id=” + id +
“, blogAuthor='” + blogAuthor + ‘\” +
“, blogUrl='” + blogUrl + ‘\” +
“, blogTitle='” + blogTitle + ‘\” +
“, blogItem='” + blogItem + ‘\” +
‘}’;
}
}
import org.apache.ibatis.annotations.*;
import java.util.List;
import java.util.Map;
@Mapper
public interface BlogMapper {
@Insert(“INSERT INTO bloginfo ( blogAuthor, blogUrl, blogTitle, blogItem ) VALUES ( #{blogAuthor}, #{blogUrl},#{blogTitle},#{blogItem}) “)
@Options(useGeneratedKeys = true, keyProperty = “id”)
int insert(BlogInfo bloginfo);
@Select(“select blogAuthor, blogUrl, blogTitle, blogItem from bloginfo where blogAuthor < #{authorId}”)
List<BlogInfo> queryInfoById(Map<String , Integer> map);
}
JobLauncher job的执行器
Job job任务,包含一个或多个Step
Step 包含(ItemReader、ItemProcessor和ItemWriter)
ItemReader 数据读取器
ItemProcessor 数据处理器
ItemWriter 数据输出器

* JobRepository定义:Job的注册容器以及和数据库打交道(事务管理等)
* @param dataSource
* @param transactionManager
* @return
* @throws Exception
*/
@Bean
public JobRepository myJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDatabaseType(“mysql”);
jobRepositoryFactoryBean.setTransactionManager(transactionManager);
jobRepositoryFactoryBean.setDataSource(dataSource);
return jobRepositoryFactoryBean.getObject();
}
* jobLauncher定义:job的启动器,绑定相关的jobRepository
* @param dataSource
* @param transactionManager
* @return
* @throws Exception
*/
@Bean
public SimpleJobLauncher myJobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
// 设置jobRepository
jobLauncher.setJobRepository(myJobRepository(dataSource, transactionManager));
return jobLauncher;
}
* 定义job
* @param jobs
* @param myStep
* @return
*/
@Bean
public Job myJob(JobBuilderFactory jobs, Step myStep){
return jobs.get(“myJob”)
.incrementer(new RunIdIncrementer())
.flow(myStep)
.end()
.listener(myJobListener())
.build();
}
* 注册job监听器
* @return
*/
@Bean
public MyJobListener myJobListener(){
return new MyJobListener();
}
* @Author : JCccc
* @Description :监听Job执行情况,实现JobExecutorListener,且在batch配置类里,Job的Bean上绑定该监听器
**/
public class MyJobListener implements JobExecutionListener {
private Logger logger = LoggerFactory.getLogger(MyJobListener.class);
@Override
public void beforeJob(JobExecution jobExecution) {
logger.info(“job 开始, id={}”,jobExecution.getJobId());
}
@Override
public void afterJob(JobExecution jobExecution) {
logger.info(“job 结束, id={}”,jobExecution.getJobId());
}
}
* ItemReader定义:读取文件数据+entirty实体类映射
* @return
*/
@Bean
public ItemReader<BlogInfo> reader(){
// 使用FlatFileItemReader去读cvs文件,一行即一条数据
FlatFileItemReader<BlogInfo> reader = new FlatFileItemReader<>();
// 设置文件处在路径
reader.setResource(new ClassPathResource(“static/bloginfo.csv”));
// entity与csv数据做映射
reader.setLineMapper(new DefaultLineMapper<BlogInfo>() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames(new String[]{“blogAuthor”,”blogUrl”,”blogTitle”,”blogItem”});
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<BlogInfo>() {
{
setTargetType(BlogInfo.class);
}
});
}
});
return reader;
}

private Logger logger = LoggerFactory.getLogger(MyReadListener.class);
@Override
public void beforeRead() {
}
@Override
public void afterRead(BlogInfo item) {
}
@Override
public void onReadError(Exception ex) {
try {
logger.info(format(“%s%n”, ex.getMessage()));
} catch (Exception e) {
e.printStackTrace();
}
}
}
* 注册ItemProcessor: 处理数据+校验数据
* @return
*/
@Bean
public ItemProcessor<BlogInfo, BlogInfo> processor(){
MyItemProcessor myItemProcessor = new MyItemProcessor();
// 设置校验器
myItemProcessor.setValidator(myBeanValidator());
return myItemProcessor;
}
* 注册校验器
* @return
*/
@Bean
public MyBeanValidator myBeanValidator(){
return new MyBeanValidator<BlogInfo>();
}
里面我的数据处理逻辑是,获取出读取数据里面的每条数据的blogItem字段,如果是springboot,那就对title字段值进行替换。
import org.springframework.batch.item.validator.ValidatingItemProcessor;
import org.springframework.batch.item.validator.ValidationException;
public class MyItemProcessor extends ValidatingItemProcessor<BlogInfo> {
@Override
public BlogInfo process(BlogInfo item) throws ValidationException {
/**
* 需要执行super.process(item)才会调用自定义校验器
*/
super.process(item);
/**
* 对数据进行简单的处理
*/
if (item.getBlogItem().equals(“springboot”)) {
item.setBlogTitle(“springboot 系列还请看看我Jc”);
} else {
item.setBlogTitle(“未知系列”);
}
return item;
}
}
import org.springframework.batch.item.validator.Validator;
import org.springframework.beans.factory.InitializingBean;
import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.ValidatorFactory;
import java.util.Set;
public class MyBeanValidator<T> implements Validator<T>, InitializingBean {
private javax.validation.Validator validator;
@Override
public void validate(T value) throws ValidationException {
/**
* 使用Validator的validate方法校验数据
*/
Set<ConstraintViolation<T>> constraintViolations =
validator.validate(value);
if (constraintViolations.size() > 0) {
StringBuilder message = new StringBuilder();
for (ConstraintViolation<T> constraintViolation : constraintViolations) {
message.append(constraintViolation.getMessage() + “\n”);
}
throw new ValidationException(message.toString());
}
}
/**
* 使用JSR-303的Validator来校验我们的数据,在此进行JSR-303的Validator的初始化
* @throws Exception
*/
@Override
public void afterPropertiesSet() throws Exception {
ValidatorFactory validatorFactory =
Validation.buildDefaultValidatorFactory();
validator = validatorFactory.usingContext().getValidator();
}
}
ps:其实该篇文章没有使用这个数据校验器,大家想使用的话,可以在实体类上添加一些校验器的注解@NotNull @Max @Email等等。我偏向于直接在处理器里面进行处理,想把关于数据处理的代码都写在一块。
* ItemWriter定义:指定datasource,设置批量插入sql语句,写入数据库
* @param dataSource
* @return
*/
@Bean
public ItemWriter<BlogInfo> writer(DataSource dataSource){
// 使用jdbcBcatchItemWrite写数据到数据库中
JdbcBatchItemWriter<BlogInfo> writer = new JdbcBatchItemWriter<>();
// 设置有参数的sql语句
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<BlogInfo>());
String sql = “insert into bloginfo “+” (blogAuthor,blogUrl,blogTitle,blogItem) ”
+” values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)”;
writer.setSql(sql);
writer.setDataSource(dataSource);
return writer;
}

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ItemWriteListener;
import java.util.List;
import static java.lang.String.format;
public class MyWriteListener implements ItemWriteListener<BlogInfo> {
private Logger logger = LoggerFactory.getLogger(MyWriteListener.class);
@Override
public void beforeWrite(List<? extends BlogInfo> items) {
}
@Override
public void afterWrite(List<? extends BlogInfo> items) {
}
@Override
public void onWriteError(Exception exception, List<? extends BlogInfo> items) {
try {
logger.info(format(“%s%n”, exception.getMessage()));
for (BlogInfo message : items) {
logger.info(format(“Failed writing BlogInfo : %s”, message.toString()));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
* step定义:
* 包括
* ItemReader 读取
* ItemProcessor 处理
* ItemWriter 输出
* @param stepBuilderFactory
* @param reader
* @param writer
* @param processor
* @return
*/
@Bean
public Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader<BlogInfo> reader,
ItemWriter<BlogInfo> writer, ItemProcessor<BlogInfo, BlogInfo> processor){
return stepBuilderFactory
.get(“myStep”)
.<BlogInfo, BlogInfo>chunk(65000) // Chunk的机制(即每次读取一条数据,再处理一条数据,累积到一定数量后再一次性交给writer进行写入操作)
.reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2)
.listener(new MyReadListener())
.processor(processor)
.writer(writer).faultTolerant().skip(Exception.class).skipLimit(2)
.listener(new MyWriteListener())
.build();
}

* @Author : JCccc
* @Description :
**/
@RestController
public class TestController {
@Autowired
SimpleJobLauncher jobLauncher;
@Autowired
Job myJob;
@GetMapping(“testJob”)
public void testJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
// 后置参数:使用JobParameters中绑定参数 addLong addString 等方法
JobParameters jobParameters = new JobParametersBuilder().toJobParameters();
jobLauncher.run(myJob, jobParameters);
}
}







ps:前排提示使用druid有坑。后面会讲到。
JobLauncher job的执行器
Job job任务,包含一个或多个Step
Step 包含(ItemReader、ItemProcessor和ItemWriter)
ItemReader 数据读取器
ItemProcessor 数据处理器
ItemWriter 数据输出器
job 监听器
reader 监听器
writer 监听器
process 数据校验器
-
数据读取器:原先使用的是 FlatFileItemReader ,我们现在改为使用 MyBatisCursorItemReader -
数据处理器:新的场景,业务为了好扩展,所以我们处理器最好也新建一个 -
数据输出器:新的场景,业务为了好扩展,所以我们数据输出器最好也新建一个 -
step的绑定设置:新的场景,业务为了好扩展,所以我们step最好也新建一个 -
Job:当然是要重新写一个了
import org.springframework.batch.item.validator.ValidationException;
public class MyItemProcessorNew extends ValidatingItemProcessor<BlogInfo> {
@Override
public BlogInfo process(BlogInfo item) throws ValidationException {
/**
* 需要执行super.process(item)才会调用自定义校验器
*/
super.process(item);
/**
* 对数据进行简单的处理
*/
Integer authorId= Integer.valueOf(item.getBlogAuthor());
if (authorId<20000) {
item.setBlogTitle(“这是都是小于20000的数据”);
} else if (authorId>20000 && authorId<30000){
item.setBlogTitle(“这是都是小于30000但是大于20000的数据”);
}else {
item.setBlogTitle(“旧书不厌百回读”);
}
return item;
}
}
* 定义job
* @param jobs
* @param stepNew
* @return
*/
@Bean
public Job myJobNew(JobBuilderFactory jobs, Step stepNew){
return jobs.get(“myJobNew”)
.incrementer(new RunIdIncrementer())
.flow(stepNew)
.end()
.listener(myJobListener())
.build();
}
@Bean
public Step stepNew(StepBuilderFactory stepBuilderFactory, MyBatisCursorItemReader<BlogInfo> itemReaderNew,
ItemWriter<BlogInfo> writerNew, ItemProcessor<BlogInfo, BlogInfo> processorNew){
return stepBuilderFactory
.get(“stepNew”)
.<BlogInfo, BlogInfo>chunk(65000) // Chunk的机制(即每次读取一条数据,再处理一条数据,累积到一定数量后再一次性交给writer进行写入操作)
.reader(itemReaderNew).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(10)
.listener(new MyReadListener())
.processor(processorNew)
.writer(writerNew).faultTolerant().skip(Exception.class).skipLimit(2)
.listener(new MyWriteListener())
.build();
}
@Bean
public ItemProcessor<BlogInfo, BlogInfo> processorNew(){
MyItemProcessorNew csvItemProcessor = new MyItemProcessorNew();
// 设置校验器
csvItemProcessor.setValidator(myBeanValidator());
return csvItemProcessor;
}
@Autowired
private SqlSessionFactory sqlSessionFactory;
@Bean
@StepScope
//Spring Batch提供了一个特殊的bean scope类(StepScope:作为一个自定义的Spring bean scope)。这个step scope的作用是连接batches的各个steps。这个机制允许配置在Spring的beans当steps开始时才实例化并且允许你为这个step指定配置和参数。
public MyBatisCursorItemReader<BlogInfo> itemReaderNew(@Value(“#{jobParameters[authorId]}”) String authorId) {
System.out.println(“开始查询数据库”);
MyBatisCursorItemReader<BlogInfo> reader = new MyBatisCursorItemReader<>();
reader.setQueryId(“com.example.batchdemo.mapper.BlogMapper.queryInfoById”);
reader.setSqlSessionFactory(sqlSessionFactory);
Map<String , Object> map = new HashMap<>();
map.put(“authorId” , Integer.valueOf(authorId));
reader.setParameterValues(map);
return reader;
}
/**
* ItemWriter定义:指定datasource,设置批量插入sql语句,写入数据库
* @param dataSource
* @return
*/
@Bean
public ItemWriter<BlogInfo> writerNew(DataSource dataSource){
// 使用jdbcBcatchItemWrite写数据到数据库中
JdbcBatchItemWriter<BlogInfo> writer = new JdbcBatchItemWriter<>();
// 设置有参数的sql语句
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<BlogInfo>());
String sql = “insert into bloginfonew “+” (blogAuthor,blogUrl,blogTitle,blogItem) ”
+” values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)”;
writer.setSql(sql);
writer.setDataSource(dataSource);
return writer;
}






SimpleJobLauncher jobLauncher;
@Autowired
Job myJobNew;
@GetMapping(“testJobNew”)
public void testJobNew(@RequestParam(“authorId”) String authorId) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
JobParameters jobParametersNew = new JobParametersBuilder().addLong(“timeNew”, System.currentTimeMillis())
.addString(“authorId”,authorId)
.toJobParameters();
jobLauncher.run(myJobNew,jobParametersNew);
}




We preferHikariCPfor its performance and concurrency. If HikariCP is available, we always choose it.
我们更喜欢hikaricpf的性能和并发性。如果有HikariCP,我们总是选择它。




微信赞赏
支付宝扫码领红包