SpringBoot:使用Spring Batch实现批处理任务

引言

在这里插入图片描述

在企业级应用中,批处理任务是不可或缺的一部分。它们通常用于处理大量数据,如数据迁移、数据清洗、生成报告等。Spring Batch是Spring框架的一部分,专为批处理任务设计,提供了简化的配置和强大的功能。本文将介绍如何使用Spring Batch与SpringBoot结合,构建和管理批处理任务。

项目初始化

首先,我们需要创建一个SpringBoot项目,并添加Spring Batch相关的依赖项。可以通过Spring Initializr快速生成项目。

添加依赖

pom.xml中添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
    <groupId>org.hsqldb</groupId>
    <artifactId>hsqldb</artifactId>
    <scope>runtime</scope>
</dependency>

配置Spring Batch

基本配置

Spring Batch需要一个数据库来存储批处理的元数据。我们可以使用HSQLDB作为内存数据库。配置文件application.properties

spring.datasource.url=jdbc:hsqldb:mem:testdb
spring.datasource.driverClassName=org.hsqldb.jdbc.JDBCDriver
spring.datasource.username=sa
spring.datasource.password=
spring.batch.initialize-schema=always
创建批处理任务

一个典型的Spring Batch任务包括三个主要部分:ItemReader、ItemProcessor和ItemWriter。

  1. ItemReader:读取数据的接口。
  2. ItemProcessor:处理数据的接口。
  3. ItemWriter:写数据的接口。
创建示例实体类

创建一个示例实体类,用于演示批处理操作:

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;

@Entity
public class Person {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String firstName;
    private String lastName;

    // getters and setters
}
创建ItemReader

我们将使用一个简单的FlatFileItemReader从CSV文件中读取数据:

import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

@Configuration
public class BatchConfiguration {

    @Bean
    public FlatFileItemReader<Person> reader() {
        return new FlatFileItemReaderBuilder<Person>()
                .name("personItemReader")
                .resource(new ClassPathResource("sample-data.csv"))
                .delimited()
                .names(new String[]{"firstName", "lastName"})
                .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                    setTargetType(Person.class);
                }})
                .build();
    }
}
创建ItemProcessor

创建一个简单的ItemProcessor,将读取的数据进行处理:

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

@Component
public class PersonItemProcessor implements ItemProcessor<Person, Person> {

    @Override
    public Person process(Person person) throws Exception {
        final String firstName = person.getFirstName().toUpperCase();
        final String lastName = person.getLastName().toUpperCase();

        final Person transformedPerson = new Person();
        transformedPerson.setFirstName(firstName);
        transformedPerson.setLastName(lastName);

        return transformedPerson;
    }
}
创建ItemWriter

我们将使用一个简单的JdbcBatchItemWriter将处理后的数据写入数据库:

import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;

@Configuration
public class BatchConfiguration {

    @Bean
    public JdbcBatchItemWriter<Person> writer(NamedParameterJdbcTemplate jdbcTemplate) {
        return new JdbcBatchItemWriterBuilder<Person>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO person (first_name, last_name) VALUES (:firstName, :lastName)")
                .dataSource(jdbcTemplate.getJdbcTemplate().getDataSource())
                .build();
    }
}

配置Job和Step

一个Job由多个Step组成,每个Step包含一个ItemReader、ItemProcessor和ItemWriter。

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
        return jobBuilderFactory.get("importUserJob")
                .listener(listener)
                .flow(step1)
                .end()
                .build();
    }

    @Bean
    public Step step1(JdbcBatchItemWriter<Person> writer) {
        return stepBuilderFactory.get("step1")
                .<Person, Person>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer)
                .build();
    }
}

监听Job完成事件

创建一个监听器,用于监听Job完成事件:

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution jobExecution) {
        System.out.println("Job Started");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println("Job Ended");
    }
}

测试与运行

创建一个简单的CommandLineRunner,用于启动批处理任务:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BatchApplication implements CommandLineRunner {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job job;

    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        jobLauncher.run(job, new JobParameters());
    }
}

在完成配置后,可以运行应用程序,并检查控制台输出和数据库中的数据,确保批处理任务正常运行。

扩展功能

在基本的批处理任务基础上,可以进一步扩展功能,使其更加完善和实用。例如:

  • 多步骤批处理:一个Job可以包含多个Step,每个Step可以有不同的ItemReader、ItemProcessor和ItemWriter。
  • 并行处理:通过配置多个线程或分布式处理,提升批处理任务的性能。
  • 错误处理和重试:配置错误处理和重试机制,提高批处理任务的可靠性。
  • 数据验证:在处理数据前进行数据验证,确保数据的正确性。
多步骤批处理
@Bean
public Job multiStepJob(JobCompletionNotificationListener listener, Step step1, Step step2) {
    return jobBuilderFactory.get("multiStepJob")
            .listener(listener)
            .start(step1)
            .next(step2)
            .end()
            .build();
}

@Bean
public Step step2(JdbcBatchItemWriter<Person> writer) {
    return stepBuilderFactory.get("step2")
            .<Person, Person>chunk(10)
            .reader(reader())
            .processor(processor())
            .writer(writer)
            .build();
}
并行处理

可以通过配置多个线程来实现并行处理:

@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {
    return stepBuilderFactory.get("step1")
            .<Person, Person>chunk(10)
            .reader(reader())
            .processor(processor())
            .writer(writer)
            .taskExecutor(taskExecutor())
            .build();
}

@Bean
public TaskExecutor taskExecutor() {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
    taskExecutor.setConcurrencyLimit(10);
    return taskExecutor;
}

结论

通过本文的介绍,我们了解了如何使用Spring Batch与SpringBoot结合,构建和管理批处理任务。从项目初始化、配置Spring Batch、实现ItemReader、ItemProcessor和ItemWriter,到配置Job和Step,Spring Batch提供了一系列强大的工具和框架,帮助开发者高效地实现批处理任务。通过合理利用这些工具和框架

,开发者可以构建出高性能、可靠且易维护的批处理系统。希望这篇文章能够帮助开发者更好地理解和使用Spring Batch,在实际项目中实现批处理任务的目标。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/755460.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

mysql wrnning Difficult to find free blocks in the buffer pool解决方法

mysql [InnoDB] Difficult to find free blocks in the buffer pool (140397 search iterations)! 我使用的是mysql8,。 原因&#xff1a;这种情况&#xff0c;多半出现在别人在非常大的写入&#xff0c;或者百万级的查询中。 解决方式&#xff0c;centos7在线安装的mysql&am…

【Linux进阶】windows和linux文件互传的两种方式

前言 我们在windows电脑上使用ssh工具&#xff08;比如Xshell&#xff09;来远程登录并使用linux云服务器的时候&#xff0c;难免要将我们的文件传输到linux服务器上&#xff0c;或者将linux服务器的文件传输到我们的windows电脑里&#xff0c;那么&#xff0c;我们要怎么来实…

FFmpeg教程-三-播放pcm文件-1

目录 一&#xff0c;下载SDL 二&#xff0c;在Qt中测试 1&#xff0c;在pro文件中加入路径 2&#xff0c;在.cpp文件中加入头文件 3&#xff0c;进行测试 4&#xff0c;显示结果 一&#xff0c;下载SDL 通过编程的方式播放音视频&#xff0c;也是需要用到这2个库: FFmpeg…

电脑数据恢复篇:如何恢复误删除的文件

在清理电脑或优化存储设备时无意中删除重要文件是人类常见的错误。不可否认的是&#xff0c;在批量删除文件时&#xff0c;您经常会同时删​​除垃圾文件和重要文件。后来您发现一堆重要的文档或文件不见了。在这种情况下&#xff0c;您唯一的选择就是寻找恢复已删除文件的方法…

【机器学习300问】135、决策树算法ID3的局限性在哪儿?C4.5算法做出了怎样的改进?

ID3算法是一种用于创建决策树的机器学习算法&#xff0c;该算法基于信息论中的信息增益概念来选择最优属性进行划分。信息增益是原始数据集熵与划分后数据集熵的差值&#xff0c;熵越小表示数据集的纯度越高。有关ID3算法的详细步骤和算法公式在我之前的文章中谈到&#xff0c;…

单调队列优化DP——AcWing 135. 最大子序和

单调队列优化DP 定义 单调队列优化DP是一种在动态规划&#xff08;Dynamic Programming, DP&#xff09;中应用的数据结构优化方法。它利用单调队列&#xff08;Monotonic Queue&#xff09;这一数据结构来高效维护一个区间内的最值&#xff08;通常是最大值或最小值&#xf…

自定义一个背景图片的高度,随着容器高度的变化而变化,小于图片的高度时裁剪,大于时拉伸100%展示

1、通过js创建<image?>标签来获取背景图片的宽高比&#xff1b; 2、当元素的高度大于原有比例计算出来的高度时&#xff0c;背景图片的高度拉伸自适应100%&#xff0c;否则高度为auto&#xff0c;会自动被裁减 3、背景图片容器高度变化时&#xff0c;自动计算背景图片的…

RFID固定资产管理系统在企业中的应用与优势

随着企业资产规模的不断扩大和管理复杂性的增加&#xff0c;传统的资产管理方式已无法满足企业高效管理的需求。RFID固定资产管理系统凭借其高效、准确、实时的特点&#xff0c;成为企业固定资产管理的新宠。 一、什么是RFID固定资产管理系统 RFID&#xff08;无线射频识别&…

浪潮信息存储的灵魂:平台化+场景化 全面释放数据价值

在数字化浪潮的席卷下&#xff0c;浪潮信息存储平台凭借卓越的性能和稳定性&#xff0c;正日益成为企业释放数据价值的重要力量。近日&#xff0c;浪潮信息出席了“2024数据基础设施技术峰会”&#xff0c;相关代表聚焦当前数据价值的释放话题&#xff0c;围绕先进存储基础设施…

CSS|01 CSS简介CSS的3种书写方式注释

CSS简介 什么是CSS CSS&#xff08;Cascading Style Sheet&#xff09;&#xff0c;层叠样式表 或者 级联样式表&#xff0c;简称样式表。CSS的作用 主要用来给 HTML网页 设置外观或者样式。CSS的语法规则 h1 {属性:属性值}注意&#xff1a;1. CSS代码是由选择器和一对括号…

Ubuntu Server 和 Ubuntu Desktop 组合使用

1.常见的组合使用方式 Ubuntu Server 和 Ubuntu Desktop 确实可以组合使用&#xff0c;但具体要看你的需求和使用场景。以下是一些常见的组合使用方式&#xff1a; 单一设备上安装&#xff1a;你可以在一台设备上同时安装 Ubuntu Server 和 Ubuntu Desktop。这样&#xff0c;你…

【ONE·Linux || 高级IO(一)】

总言 主要内容&#xff1a;介绍五种IO模型的基本概念、学习IO多路转接&#xff08;select、poll编程模型&#xff09;。       文章目录 总言1、问题引入1.1、网络通信与IO1.2、五种IO模型1.2.1、举例引入1.2.2、IO模型具体含义介绍1.2.2.1、阻塞式IO1.2.2.2、非阻塞轮询检…

mathcup大数据竞赛论文中集成学习(或模型融合)的运用分析

ps: (模型融合和集成学习是两个紧密相关但又有所区别的概念。集成学习是一种更广泛的范式&#xff0c;而模型融合可以被视为集成学习的一种特殊形式或策略。) 1.集成学习原理 图1 如图1所示&#xff0c;集成学习是一种通过结合多个机器学习模型的预测来提高整体性能的策略。其…

数据结构-循环链表和双向链表

目录 前言一、循环链表1.1 循环链表的介绍1.2 循环链表的实现 二、双向链表总结 前言 本篇文章介绍数据结构中的循环链表和双向链表 一、循环链表 1.1 循环链表的介绍 将单链表的形式稍作改变&#xff0c;单链表的最后一个结点指向第一个结点 对第一个结点概念的说明&#…

Echarts地图实现:山东省报考人数

Echarts地图实现&#xff1a;山东省报考人数 效果预览 设计思路 数据可视化&#xff1a;选择地图作为数据展示的方式&#xff0c;可以直观地展示山东省不同城市的报考人数分布。交互性&#xff1a;通过ECharts的交互功能&#xff0c;如提示框&#xff08;tooltip&#xff09;…

致远互联FE协作办公平台 codeMoreWidget SQL注入致RCE漏洞复现

0x01 产品简介 致远互联FE协作办公平台是一款为企业提供全方位协同办公解决方案的产品。它集成了多个功能模块&#xff0c;旨在帮助企业实现高效的团队协作、信息共享和文档管理。 0x02 漏洞概述 致远互联FE协作办公平台 codeMoreWidget.jsp接口处存在SQL注入漏洞,未经授权攻…

有哪些防爬虫的方法

防爬虫的方法有robots.txt文、user-agent过滤、ip限制、验证码、动态页面生成、频率限制、动态url参数和反爬虫技术等。详细介绍&#xff1a;1、robots.txt文件&#xff0c;用于告诉搜索引擎爬虫哪些页面可以访问&#xff0c;哪些页面禁止访问&#xff1b;2、ip限制&#xff0c…

机器学习入门指南:理解基本概念与常见算法

目录 什么是机器学习&#xff1f; 机器学习的基本概念 1. 训练数据 2. 特征工程 3. 模型评估 监督学习与非监督学习的区别 监督学习 非监督学习 常见的机器学习算法 1. 线性回归与逻辑回归 2. 决策树与随机森林 3. 支持向量机&#xff08;SVM&#xff09; 4. K近邻…

2小时动手学习扩散模型(pytorch版)【入门版】【代码讲解】

2小时动手学习扩散模型&#xff08;pytorch版&#xff09; 课程地址 2小时动手学习扩散模型&#xff08;pytorch版&#xff09; 课程目标 给零基础同学快速了解扩散模型的核心模块&#xff0c;有个整体框架的理解。知道扩散模型的改进和设计的核心模块。 课程特色&#xf…

学生宿舍管理系统

摘 要 随着高校规模的不断扩大和学生人数的增加&#xff0c;学生宿舍管理成为高校日常管理工作中的重要组成部分。传统的学生宿舍管理方式往往依赖于纸质记录和人工管理&#xff0c;这种方式不仅效率低下&#xff0c;而且容易出错&#xff0c;无法满足现代高校管理的需求。因此…