批处理顾名思义是批量处理大量数据,但是这个大量数据又不是特别大的大数据,比Hadoop等要轻量得多,适合企业单位人数薪资计算,财务系统月底一次性结算等常规数据批量处理。SpringBatch教程 3 i$ z4 u% |6 J* \- |" v
Spring Batch是一个用于创建健壮的批处理应用程序的完整框架。您可以创建可重用的函数来处理大量数据或任务,通常称为批量处理。 - 定期提交批处理
- 并行处理作业的并发批处理
- 分阶段,企业消息驱动处理
- 大型并行批处理
- 手动或故障后的计划重新启动
- 依赖步骤的顺序处理(扩展到工作流程驱动的批处理)
- 部分处理:跳过记录(例如,回滚时)
- 整批事务:对于批量小或现有存储过程的情况/脚本. V: f. Z7 L' H% D9 B( o
! J9 i7 j6 @% ?7 _! _3 W
Spring Batch的特点有: - 事务管理,让您专注于业务处理,实现批处理机制,你可以引入平台事务机制或其他事务管理器机制
- 基于块Chunk的处理,通过将一大段大量数据分成一段段小数据来处理,。
- 启动/停止/重新启动/跳过/重试功能,以处理过程的非交互式管理。
- 基于Web的管理界面(Spring Batch Admin),它提供了一个用于管理任务的API。
- 基于Spring框架,因此它包括所有配置选项,包括依赖注入。
- 符合JSR 352:Java平台的批处理应用程序。
- 基于数据库管理的批处理,可与Spring Cloud Task结合,适合分布式集群下处理。
- 能够进行多线程并行处理,分布式系统下并行处理,变成一种弹性Job分布式处理框架。8 ~2 P4 L3 E, O4 B0 t: q
% e" X- l$ B6 W
" N$ E- K5 Q" }- S6 Q/ {
Spring批处理的基本单元是Job,你需要定义一个Job代表一次批处理工作,每个Job分很多步骤step,每个步骤里面有两种处理方式Tasklet(可重复执行的小任务)和Chunk(块),掌握Spring Batch主要是将这几个核心概念搞清楚。 1 c9 p9 j& U9 I! }4 R0 P& R, t
在SpringBoot架构下,我们只要做一个JobConfig组件作为JobLauncher,使用@Configuration配置,然后完成上图中Job和Step以及ItemReader,ItemProcessor和ItemWriter,后面这三个分别是存在一个步骤里,用于处理条目的输入读 、处理然后输出写出。至于图中JobRepository只要我们在Application.properties中配置上datasource,SpringBoot启动时会自动将batch需要的库表导入到数据库中。
: d2 p; @* H y# ~9 i
下图是每个步骤内部的事务处理过程,在进行读入 处理和写出时,如果有任何一个步骤出错,将会事务回滚,也只有这三个步骤全部完成,才能提交事务机制,进而完成一个步骤。 0 t o6 {- d# X4 |/ `) l" d
2 H1 B, N; G/ }. \ Q
下面我们看一个简单案例如何使用SpringBatch的,这个案例功能是从一个CSV文件中导入数据到数据库中。 首先导入pom.xml: [Java] 纯文本查看 复制代码 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
这里使用MysSQL作为Job仓库,在Application.properties配置: [Java] 纯文本查看 复制代码 spring.batch.initialize-schema=always
spring.datasource.url=jdbc:mysql://localhost:3306/mytest
spring.datasource.username=banq
spring.datasource.password=XXX
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
9 A" c' |) S7 V- X/ p b; I, `6 S
配置了spring.batch.initialize-schema为always这样能自动启动时导入批处理需要的数据库表。 下面我们实现批处理的关键类@Configuration: 首先定义一个Job: [Java] 纯文本查看 复制代码 @Bean
public Job importUserJob() {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.flow(step1())
.end()
.build();
} 这个Job名称是importUserJob,其中使用了步骤step1: 2 q# z: [" O6 j. B0 Q
[Java] 纯文本查看 复制代码 @Bean
public Step step1() {
return stepBuilderFactory.get("step1").<User, User> chunk(3)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
* n1 M4 g4 |# H2 i& c0 x o
这个步骤step1中使用了chunk,分块读取数据处理后输出。下面是依次看看输入 处理和输出的方法:
' y8 v1 L4 e5 `* j[Java] 纯文本查看 复制代码 @Bean
public FlatFileItemReader<User> reader(){
FlatFileItemReader<User> reader = new FlatFileItemReader<User>();
reader.setResource(new ClassPathResource("users.csv"));
reader.setLineMapper(new DefaultLineMapper<User>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[] { "name" });
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<User>() {{
setTargetType(User.class);
}});
}});
return reader;
} 7 Q' x9 R9 K: @9 d1 j
这是输入,读取classpath下的uers.csv文件:
$ V* f0 [+ a% t* \7 ^9 ? Q3 _/ E, K[Java] 纯文本查看 复制代码 testdata1
testdata2
testdata3
1 t% _- a( Z9 ^一次读入三行,提取一行中数据作为User这个对象的name输入其中: 5 R F8 t, H) B% N0 S! _) J8 R
[Java] 纯文本查看 复制代码 @Entity
public class User {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private int id ;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
} ; G+ ^: U( s" p/ k- ^ ? o, d3 v
User是我们的一个实体数据,其中ID使用数据库自增,name由user.csv导入,User对应的数据表schema.sql是:
) O: P+ J: F" f( ] b2 n[Java] 纯文本查看 复制代码 CREATE TABLE `user` (
`id` int(11) NOT NULL auto_increment,
`name` varchar(45) NOT NULL default '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; , i0 N' A. ~' y3 x. \5 T" g
我们只要在pom.xml中导入JPA包: : C1 m8 `, b8 M" R# H% `& h4 C9 V" r+ e
[Java] 纯文本查看 复制代码 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
' i, a0 t5 I/ M7 ~, Z
并在application.properties中加入,就可以在SpringBoot启动时,自动使用datasource配置的数据库建立User表了。 8 W% N# ^" Q) Y! e2 v1 Y
[Java] 纯文本查看 复制代码 spring.jpa.generate-ddl=true
( }$ u/ j* ]# s8 Z, a s8 o下面我们回到批处理,前面定义了输入,下面依次是条目处理:
( m3 u0 Z( V6 L: \5 t/ ][Java] 纯文本查看 复制代码 public class UserItemProcessor implements ItemProcessor<User, User> {
@Override
public User process(User user) throws Exception {
return user;
}
}
9 |4 b! v" m$ u Y* T! l7 x9 c这个条目处理就是对每个User对象进行处理,这时User对象已经包含了从CSV读取的数据,如果希望再进行加工处理就在这里进行。 下面是条目输出: [Java] 纯文本查看 复制代码 @Bean
public JdbcBatchItemWriter<User> writer(){
JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<User>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<User>());
writer.setSql("INSERT INTO user(name) VALUES (:name)");
writer.setDataSource(dataSource);
return writer;
} 每一行数据我们从CSV读出以后放入到User中,然后再插入数据表user保存。 至此,我们简单完成了一个批处理开发过程
! M5 X `( i" _8 q% U4 e, @
项目源码百度云链接地址: 提取码: gge6 (回帖即可获取项目源码,无任何套路!!!)
4 w( J, x, L4 N6 }( g# p( ]7 r% ~
/ F" g5 W. v0 V# K. K |