请选择 进入手机版 | 继续访问电脑版

25 评论

0 收藏

分享

Spring Batch批处理框架介绍

批处理顾名思义是批量处理大量数据,但是这个大量数据又不是特别大的大数据,比Hadoop等要轻量得多,适合企业单位人数薪资计算,财务系统月底一次性结算等常规数据批量处理。SpringBatch教程
/ {+ x. ~+ V; q5 A+ ~
Spring Batch是一个用于创建健壮的批处理应用程序的完整框架。您可以创建可重用的函数来处理大量数据或任务,通常称为批量处理。
Spring Batch文档中所述,使用该框架的最常见方案如下:
  • 定期提交批处理
  • 并行处理作业的并发批处理
  • 分阶段,企业消息驱动处理
  • 大型并行批处理
  • 手动或故障后的计划重新启动
  • 依赖步骤的顺序处理(扩展到工作流程驱动的批处理)
  • 部分处理:跳过记录(例如,回滚时)
  • 整批事务:对于批量小或现有存储过程的情况/脚本
    0 _, l* N# o' j4 F$ j; b4 y

- X9 T% j7 T3 G
Spring Batch的特点有:
  • 事务管理,让您专注于业务处理,实现批处理机制,你可以引入平台事务机制或其他事务管理器机制
  • 基于块Chunk的处理,通过将一大段大量数据分成一段段小数据来处理,。
  • 启动/停止/重新启动/跳过/重试功能,以处理过程的非交互式管理。
  • 基于Web的管理界面(Spring Batch Admin),它提供了一个用于管理任务的API。
  • 基于Spring框架,因此它包括所有配置选项,包括依赖注入。
  • 符合JSR 352:Java平台的批处理应用程序。
  • 基于数据库管理的批处理,可与Spring Cloud Task结合,适合分布式集群下处理。
  • 能够进行多线程并行处理,分布式系统下并行处理,变成一种弹性Job分布式处理框架。4 k: j* w+ x! R  F& i0 q, J
    . ]- w! }% M+ ~* ]
    . o+ i4 N, |! o( Z
  Spring批处理的基本单元是Job,你需要定义一个Job代表一次批处理工作,每个Job分很多步骤step,每个步骤里面有两种处理方式Tasklet(可重复执行的小任务)和Chunk(块),掌握Spring Batch主要是将这几个核心概念搞清楚。
1.png

7 C0 T, c+ `# B; m3 [3 c
在SpringBoot架构下,我们只要做一个JobConfig组件作为JobLauncher,使用@Configuration配置,然后完成上图中Job和Step以及ItemReader,ItemProcessor和ItemWriter,后面这三个分别是存在一个步骤里,用于处理条目的输入读 、处理然后输出写出。至于图中JobRepository只要我们在Application.properties中配置上datasource,SpringBoot启动时会自动将batch需要的库表导入到数据库中。

1 j1 K3 _6 f: L  t$ W" L
下图是每个步骤内部的事务处理过程,在进行读入 处理和写出时,如果有任何一个步骤出错,将会事务回滚,也只有这三个步骤全部完成,才能提交事务机制,进而完成一个步骤。
! q4 d3 R: K- q) A" v1 B
2.png

! w3 L! d0 J5 Y2 V
下面我们看一个简单案例如何使用SpringBatch的,这个案例功能是从一个CSV文件中导入数据到数据库中。
首先导入pom.xml:
[Java] 纯文本查看 复制代码
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <scope>runtime</scope>
</dependency>
这里使用MysSQL作为Job仓库,在Application.properties配置:
[Java] 纯文本查看 复制代码
spring.batch.initialize-schema=always

spring.datasource.url=jdbc:mysql://localhost:3306/mytest
spring.datasource.username=banq
spring.datasource.password=XXX
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

  b* b: x+ g, o* J, Z8 m8 Q  l* g. y
配置了spring.batch.initialize-schema为always这样能自动启动时导入批处理需要的数据库表。
下面我们实现批处理的关键类@Configuration:
首先定义一个Job:
[Java] 纯文本查看 复制代码
@Bean
public Job importUserJob() {
   return jobBuilderFactory.get("importUserJob")
         .incrementer(new RunIdIncrementer())
         .flow(step1())
         .end()
         .build();
}
这个Job名称是importUserJob,其中使用了步骤step1:
3 ?0 T' q: |/ R; v" X/ u
[Java] 纯文本查看 复制代码
@Bean
public Step step1() {
   return stepBuilderFactory.get("step1").<User, User> chunk(3)
         .reader(reader())
         .processor(processor())
         .writer(writer())
         .build();
}

5 A0 Q$ L3 y# ^( }( S2 A1 n
这个步骤step1中使用了chunk,分块读取数据处理后输出。下面是依次看看输入 处理和输出的方法:

7 B, Y9 J8 Q8 m3 _5 V
[Java] 纯文本查看 复制代码
@Bean
public FlatFileItemReader<User> reader(){
   FlatFileItemReader<User> reader = new FlatFileItemReader<User>();
   reader.setResource(new ClassPathResource("users.csv"));
reader.setLineMapper(new DefaultLineMapper<User>() {{
      setLineTokenizer(new DelimitedLineTokenizer() {{
         setNames(new String[] { "name" });
      }});
      setFieldSetMapper(new BeanWrapperFieldSetMapper<User>() {{
         setTargetType(User.class);
      }});

   }});

   return reader;
}

) `) Z( @* l1 _& H; L5 l; J6 z
这是输入,读取classpath下的uers.csv文件:

1 J4 Z% r, S) Z; }
[Java] 纯文本查看 复制代码
testdata1
testdata2
testdata3

  P( Q1 J6 n: t/ V* x7 P4 \: i
一次读入三行,提取一行中数据作为User这个对象的name输入其中:
9 ~/ b0 P. _$ a1 H2 `9 U
[Java] 纯文本查看 复制代码
@Entity
public class User {

   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private int id ;

   public int getId() {
      return id;
   }

   public void setId(int id) {
      this.id = id;
   }

   private String name;

   public String getName() {
      return name;
   }

   public void setName(String name) {
      this.name = name;
   }
}

+ @& w- Z+ J0 N  C9 Q4 w4 OUser是我们的一个实体数据,其中ID使用数据库自增,name由user.csv导入,User对应的数据表schema.sql是:2 v7 V( z" Y( W5 [% s- ]: Y+ M
[Java] 纯文本查看 复制代码
CREATE TABLE  `user` (
  `id` int(11) NOT NULL auto_increment,
  `name` varchar(45) NOT NULL default '',
  PRIMARY KEY  (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

4 K' S* X" h" h2 g
我们只要在pom.xml中导入JPA包:

: \8 _$ H1 A$ h- Z
[Java] 纯文本查看 复制代码
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
+ n' l+ A: o7 R% o5 s
并在application.properties中加入,就可以在SpringBoot启动时,自动使用datasource配置的数据库建立User表了。

% F, X/ X+ X* Z  r5 h
[Java] 纯文本查看 复制代码
spring.jpa.generate-ddl=true

+ O( [. i7 K+ H" v
下面我们回到批处理,前面定义了输入,下面依次是条目处理:

& M0 M! T1 W% R, Y# o( A8 F
[Java] 纯文本查看 复制代码
public class UserItemProcessor implements ItemProcessor<User, User> {

   @Override
   public User process(User user) throws Exception {
      return user;
   }
}
& {7 s; m3 i5 |2 {! h
这个条目处理就是对每个User对象进行处理,这时User对象已经包含了从CSV读取的数据,如果希望再进行加工处理就在这里进行。
下面是条目输出:
[Java] 纯文本查看 复制代码
@Bean
public JdbcBatchItemWriter<User> writer(){
   JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<User>();
   writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<User>());
   writer.setSql("INSERT INTO user(name) VALUES (:name)");
   writer.setDataSource(dataSource);

   return writer;
}
每一行数据我们从CSV读出以后放入到User中,然后再插入数据表user保存。
至此,我们简单完成了一个批处理开发过程
" l1 t6 E' \5 D6 M( ?
项目源码百度云链接地址:
游客,如果您要查看本帖隐藏内容请回复
提取码: gge6  (回帖即可获取项目源码,无任何套路!!!)
. i" d+ }5 l$ R2 |! o6 v( ]. ]6 I2 b4 l
' m. f1 A- z: V) O
回复

举报 使用道具

相关帖子
全部回复 (25)
查看全部
学习了,spring batch入门案例

举报 回复 使用道具

测试看看,体验一下springbatch

举报 回复 使用道具

学习了,spring批处理入门案例

举报 回复 使用道具

学习一下,体验一下样例

举报 回复 使用道具

下载学习学习

举报 回复 使用道具

2122ASDSADSASADSADSA

举报 回复 使用道具

试一试获取地址

举报 回复 使用道具

下载学习下

举报 回复 使用道具

学习下,最近准备处理大批量数据

举报 回复 使用道具

主题 24
回复 1
粉丝 0
快速回复 返回顶部 返回列表