真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

SpringBatch快速入門

Spring Batch簡(jiǎn)介

Spring Batch簡(jiǎn)單來(lái)說(shuō)就是一個(gè)輕量級(jí)的批處理框架,從名字就可以知道它是Spring 的子項(xiàng)目。我們?cè)谄髽I(yè)開發(fā)中可能會(huì)面臨到一些需要處理較大數(shù)據(jù)量的場(chǎng)景,例如將一個(gè)表的全部數(shù)據(jù)導(dǎo)入到另一張表結(jié)構(gòu)類似的表中、批量讀取一個(gè)或多個(gè)文件內(nèi)容并寫入到數(shù)據(jù)庫(kù)中,又或者將一張表的數(shù)據(jù)批量更新到另一張表中。而Spring Batch可以幫助我們快速的開發(fā)這種場(chǎng)景下的批處理應(yīng)用程序。

成都創(chuàng)新互聯(lián)公司是一家專業(yè)提供宜城企業(yè)網(wǎng)站建設(shè),專注與網(wǎng)站設(shè)計(jì)、成都網(wǎng)站設(shè)計(jì)、成都h5網(wǎng)站建設(shè)、小程序制作等業(yè)務(wù)。10年已為宜城眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)的建站公司優(yōu)惠進(jìn)行中。

Spring Batch提供了在處理大量數(shù)據(jù)時(shí)必不可少的可重用功能,包括日志記錄/跟蹤、事務(wù)管理、作業(yè)處理統(tǒng)計(jì)信息、作業(yè)重新啟動(dòng)、跳過(guò)和資源管理。對(duì)于大數(shù)據(jù)量和高性能的批處理任務(wù),Spring Batch 同樣提供了高級(jí)功能和特性來(lái)支持,例如分區(qū)功能、遠(yuǎn)程功能等,大大簡(jiǎn)化了批處理應(yīng)用的開發(fā),將開發(fā)人員從復(fù)雜的任務(wù)配置管理過(guò)程中解放出來(lái),讓我們可以更多地去關(guān)注核心的業(yè)務(wù)的處理過(guò)程??傊ㄟ^(guò) Spring Batch 我們就能夠?qū)崿F(xiàn)簡(jiǎn)單的或者復(fù)雜的和大數(shù)據(jù)量的批處理作業(yè)。

Spring Batch的結(jié)構(gòu)圖如下:
Spring Batch快速入門

  • JobRepository:用來(lái)注冊(cè)job的容器
  • JobLauncher:用來(lái)啟動(dòng)Job的接口
  • Job:實(shí)際執(zhí)行的任務(wù),包含一個(gè)或多個(gè)Step
  • Step:包含ItemReader、ItemProcessor和ItemWriter
  • ItemReader:用來(lái)讀取數(shù)據(jù)的接口
  • ItemProcessor:用來(lái)處理數(shù)據(jù)的接口
  • ItemWriter: 用來(lái)輸出數(shù)據(jù)的接口

本文目的主要是教大家如何快速地使用Spring Boot集成Spring Batch實(shí)現(xiàn)一個(gè)定時(shí)的批處理作業(yè)Demo,所以不會(huì)對(duì)Spring Batch理論部分進(jìn)行過(guò)多的介紹,因?yàn)槠涔倬W(wǎng)及網(wǎng)絡(luò)上都有詳細(xì)的參考文檔。

官網(wǎng)地址如下:

  • https://spring.io/projects/spring-batch

創(chuàng)建數(shù)據(jù)庫(kù)表格

本文以操作數(shù)據(jù)庫(kù)的批處理示例,當(dāng)我們的批處理作業(yè)需要操作數(shù)據(jù)庫(kù)時(shí),Spring Batch要求在數(shù)據(jù)庫(kù)中創(chuàng)建好批處理作業(yè)的元數(shù)據(jù)的存儲(chǔ)表格。如下,其中以batch開頭的表,是Spring Batch用來(lái)存儲(chǔ)每次執(zhí)行作業(yè)所產(chǎn)生的元數(shù)據(jù)。而student表則是作為我們這個(gè)Demo中數(shù)據(jù)的來(lái)源:
Spring Batch快速入門

下圖顯示了所有6張表的ERD模型及其相互關(guān)系(摘自官網(wǎng)):
Spring Batch快速入門

綜上,所以我們需要在數(shù)據(jù)庫(kù)中執(zhí)行如下來(lái)自官方的元數(shù)據(jù)模式SQL腳本:

-- do not edit this file
-- BATCH JOB 實(shí)例表 包含與aJobInstance相關(guān)的所有信息
-- JOB ID由batch_job_seq分配
-- JOB 名稱,與spring配置一致
-- JOB KEY 對(duì)job參數(shù)的MD5編碼,正因?yàn)橛羞@個(gè)字段的存在,同一個(gè)job如果第一次運(yùn)行成功,第二次再運(yùn)行會(huì)拋出JobInstanceAlreadyCompleteException異常。
CREATE TABLE BATCH_JOB_INSTANCE  (
    JOB_INSTANCE_ID BIGINT  NOT NULL PRIMARY KEY ,
    VERSION BIGINT ,
    JOB_NAME VARCHAR(100) NOT NULL,
    JOB_KEY VARCHAR(32) NOT NULL,
    constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;

-- 該BATCH_JOB_EXECUTION表包含與該JobExecution對(duì)象相關(guān)的所有信息
CREATE TABLE BATCH_JOB_EXECUTION  (
    JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
    VERSION BIGINT  ,
    JOB_INSTANCE_ID BIGINT NOT NULL,
    CREATE_TIME DATETIME NOT NULL,
    START_TIME DATETIME DEFAULT NULL ,
    END_TIME DATETIME DEFAULT NULL ,
    STATUS VARCHAR(10) ,
    EXIT_CODE VARCHAR(2500) ,
    EXIT_MESSAGE VARCHAR(2500) ,
    LAST_UPDATED DATETIME,
    JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
    constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
    references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;

-- 該表包含與該JobParameters對(duì)象相關(guān)的所有信息
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (
    JOB_EXECUTION_ID BIGINT NOT NULL ,
    TYPE_CD VARCHAR(6) NOT NULL ,
    KEY_NAME VARCHAR(100) NOT NULL ,
    STRING_VAL VARCHAR(250) ,
    DATE_VAL DATETIME DEFAULT NULL ,
    LONG_VAL BIGINT ,
    DOUBLE_VAL DOUBLE PRECISION ,
    IDENTIFYING CHAR(1) NOT NULL ,
    constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
    references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

-- 該表包含與該StepExecution 對(duì)象相關(guān)的所有信息
CREATE TABLE BATCH_STEP_EXECUTION  (
    STEP_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
    VERSION BIGINT NOT NULL,
    STEP_NAME VARCHAR(100) NOT NULL,
    JOB_EXECUTION_ID BIGINT NOT NULL,
    START_TIME DATETIME NOT NULL ,
    END_TIME DATETIME DEFAULT NULL ,
    STATUS VARCHAR(10) ,
    COMMIT_COUNT BIGINT ,
    READ_COUNT BIGINT ,
    FILTER_COUNT BIGINT ,
    WRITE_COUNT BIGINT ,
    READ_SKIP_COUNT BIGINT ,
    WRITE_SKIP_COUNT BIGINT ,
    PROCESS_SKIP_COUNT BIGINT ,
    ROLLBACK_COUNT BIGINT ,
    EXIT_CODE VARCHAR(2500) ,
    EXIT_MESSAGE VARCHAR(2500) ,
    LAST_UPDATED DATETIME,
    constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
    references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

-- 該BATCH_STEP_EXECUTION_CONTEXT表包含ExecutionContext與Step相關(guān)的所有信息
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (
    STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
    SHORT_CONTEXT VARCHAR(2500) NOT NULL,
    SERIALIZED_CONTEXT TEXT ,
    constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
    references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;

-- 該表包含ExecutionContext與Job相關(guān)的所有信息
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (
    JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
    SHORT_CONTEXT VARCHAR(2500) NOT NULL,
    SERIALIZED_CONTEXT TEXT ,
    constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
    references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
    ID BIGINT NOT NULL,
    UNIQUE_KEY CHAR(1) NOT NULL,
    constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);

CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
    ID BIGINT NOT NULL,
    UNIQUE_KEY CHAR(1) NOT NULL,
    constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);

CREATE TABLE BATCH_JOB_SEQ (
    ID BIGINT NOT NULL,
    UNIQUE_KEY CHAR(1) NOT NULL,
    constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);

而student表的建表SQL如下:

CREATE TABLE `student` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(20) NOT NULL,
  `age` int(11) NOT NULL,
  `sex` varchar(20) NOT NULL,
  `address` varchar(100) NOT NULL,
  `cid` int(11) NOT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8;

并且student表中有如下簡(jiǎn)單的數(shù)據(jù):
Spring Batch快速入門


創(chuàng)建項(xiàng)目

數(shù)據(jù)庫(kù)準(zhǔn)備完畢后,接下來(lái)我們就是創(chuàng)建Spring Boot項(xiàng)目:
Spring Batch快速入門

填寫項(xiàng)目名、包名等信息:
Spring Batch快速入門

勾選如下紅框標(biāo)注的依賴項(xiàng):
Spring Batch快速入門

點(diǎn)擊Finish完成項(xiàng)目的創(chuàng)建:
Spring Batch快速入門

項(xiàng)目最終的依賴項(xiàng)如下:


    
        org.springframework.boot
        spring-boot-starter-batch
    
    
        org.springframework.boot
        spring-boot-starter-data-jpa
    
    
        org.springframework.boot
        spring-boot-starter-web
    

    
        MySQL
        mysql-connector-java
        runtime
    
    
        org.projectlombok
        lombok
        true
    
    
        org.springframework.boot
        spring-boot-starter-test
        test
    
    
        org.springframework.batch
        spring-batch-test
        test
    

SpringBoot的配置文件內(nèi)容如下(本人習(xí)慣于使用.yml文件格式):

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/springbatch?serverTimezone=Asia/Shanghai&characterEncoding=UTF-8&autoReconnect=true
    hikari:
      password: password
      username: root
  jpa:
    open-in-view: true
    show-sql: true
    hibernate:
      ddl-auto: update
    database: mysql
  # 禁止項(xiàng)目啟動(dòng)時(shí)運(yùn)行job
  batch:
    job:
      enabled: false

基于Spring Batch的批處理Demo

本小節(jié)我們來(lái)開始編寫實(shí)際的代碼,項(xiàng)目最終結(jié)構(gòu)如下:
Spring Batch快速入門

首先是 student 表格的實(shí)體類,我們需要通過(guò)這個(gè)類去操作student表格中的數(shù)據(jù),代碼如下:

package org.zero.example.springbatchdemo.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.persistence.*;

/**
 * student 表格的實(shí)體類
 * 
 * @author 01
 * @date 2019-02-24
 **/
@Data
@Entity
@Table(name = "student")
@NoArgsConstructor
@AllArgsConstructor
public class Student {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Integer id;

    private String name;

    private Integer age;

    private String sex;

    private String address;

    private Integer cid;
}

由于批處理作業(yè)和定時(shí)任務(wù)都需要使用到多線程,所以我們需要配置一下Spring的線程池,代碼如下:

package org.zero.example.springbatchdemo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * 配置任務(wù)線程池執(zhí)行器
 *
 * @author 01
 * @date 2019-02-24
 **/
@Configuration
public class ExecutorConfiguration {

    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(50);
        threadPoolTaskExecutor.setMaxPoolSize(200);
        threadPoolTaskExecutor.setQueueCapacity(1000);
        threadPoolTaskExecutor.setThreadNamePrefix("Data-Job");

        return threadPoolTaskExecutor;
    }
}

實(shí)現(xiàn)一個(gè)作業(yè)的監(jiān)聽器,批處理作業(yè)在執(zhí)行前后會(huì)調(diào)用監(jiān)聽器的方法,這樣我們就可以根據(jù)實(shí)際的業(yè)務(wù)需求在作業(yè)執(zhí)行的前后進(jìn)行一些日志的打印或者邏輯處理等,代碼如下:

package org.zero.example.springbatchdemo.task.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

/**
 * 一個(gè)簡(jiǎn)單的job監(jiān)聽器
 *
 * @author 01
 * @date 2019-02-24
 **/
@Slf4j
@Component
public class JobListener implements JobExecutionListener {

    private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
    private long startTime;

    @Autowired
    public JobListener(ThreadPoolTaskExecutor threadPoolTaskExecutor) {
        this.threadPoolTaskExecutor = threadPoolTaskExecutor;
    }

    /**
     * 該方法會(huì)在job開始前執(zhí)行
     */
    @Override
    public void beforeJob(JobExecution jobExecution) {
        startTime = System.currentTimeMillis();
        log.info("job before " + jobExecution.getJobParameters());
    }

    /**
     * 該方法會(huì)在job結(jié)束后執(zhí)行
     */
    @Override
    public void afterJob(JobExecution jobExecution) {
        log.info("JOB STATUS : {}", jobExecution.getStatus());
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("JOB FINISHED");
            threadPoolTaskExecutor.destroy();
        } else if (jobExecution.getStatus() == BatchStatus.FAILED) {
            log.info("JOB FAILED");
        }
        log.info("Job Cost Time : {}/ms", (System.currentTimeMillis() - startTime));
    }
}

核心的來(lái)了,我們需要配置一個(gè)最基本的Job,Job是真正進(jìn)行批處理業(yè)務(wù)的地方。一個(gè)Job 通常由一個(gè)或多個(gè)Step組成(基本就像是一個(gè)工作流);而一個(gè)Step通常由三部分組成(讀入數(shù)據(jù):ItemReader,處理數(shù)據(jù):ItemProcessor,寫入數(shù)據(jù):ItemWriter)。代碼如下:

package org.zero.example.springbatchdemo.task.job;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.zero.example.springbatchdemo.model.Student;
import org.zero.example.springbatchdemo.task.listener.JobListener;

import javax.persistence.EntityManagerFactory;

/**
 * 配置一個(gè)最基本的Job
 *
 * @author 01
 * @date 2019-02-24
 **/
@Slf4j
@Component
public class DataBatchJob {
    /**
     * Job構(gòu)建工廠,用于構(gòu)建Job
     */
    private final JobBuilderFactory jobBuilderFactory;

    /**
     * Step構(gòu)建工廠,用于構(gòu)建Step
     */
    private final StepBuilderFactory stepBuilderFactory;

    /**
     * 實(shí)體類管理工工廠,用于訪問(wèn)表格數(shù)據(jù)
     */
    private final EntityManagerFactory emf;

    /**
     * 自定義的簡(jiǎn)單Job監(jiān)聽器
     */
    private final JobListener jobListener;

    @Autowired
    public DataBatchJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory,
                        EntityManagerFactory emf, JobListener jobListener) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
        this.emf = emf;
        this.jobListener = jobListener;
    }

    /**
     * 一個(gè)最基礎(chǔ)的Job通常由一個(gè)或者多個(gè)Step組成
     */
    public Job dataHandleJob() {
        return jobBuilderFactory.get("dataHandleJob").
                incrementer(new RunIdIncrementer()).
                // start是JOB執(zhí)行的第一個(gè)step
                        start(handleDataStep()).
                // 可以調(diào)用next方法設(shè)置其他的step,例如:
                // next(xxxStep()).
                // next(xxxStep()).
                // ...
                // 設(shè)置我們自定義的JobListener
                        listener(jobListener).
                        build();
    }

    /**
     * 一個(gè)簡(jiǎn)單基礎(chǔ)的Step主要分為三個(gè)部分
     * ItemReader : 用于讀取數(shù)據(jù)
     * ItemProcessor : 用于處理數(shù)據(jù)
     * ItemWriter : 用于寫數(shù)據(jù)
     */
    private Step handleDataStep() {
        return stepBuilderFactory.get("getData").
                // <輸入對(duì)象, 輸出對(duì)象>  chunk通俗的講類似于SQL的commit; 這里表示處理(processor)100條后寫入(writer)一次
                        chunk(100).
                // 捕捉到異常就重試,重試100次還是異常,JOB就停止并標(biāo)志失敗
                        faultTolerant().retryLimit(3).retry(Exception.class).skipLimit(100).skip(Exception.class).
                // 指定ItemReader對(duì)象
                        reader(getDataReader()).
                // 指定ItemProcessor對(duì)象
                        processor(getDataProcessor()).
                // 指定ItemWriter對(duì)象
                        writer(getDataWriter()).
                        build();
    }

    /**
     * 讀取數(shù)據(jù)
     *
     * @return ItemReader Object
     */
    private ItemReader getDataReader() {
        // 讀取數(shù)據(jù),這里可以用JPA,JDBC,JMS 等方式讀取數(shù)據(jù)
        JpaPagingItemReader reader = new JpaPagingItemReader<>();

        try {
            // 這里選擇JPA方式讀取數(shù)據(jù)
            JpaNativeQueryProvider queryProvider = new JpaNativeQueryProvider<>();
            // 一個(gè)簡(jiǎn)單的 native SQL
            queryProvider.setSqlQuery("SELECT * FROM student");
            // 設(shè)置實(shí)體類
            queryProvider.setEntityClass(Student.class);
            queryProvider.afterPropertiesSet();

            reader.setEntityManagerFactory(emf);
            // 設(shè)置每頁(yè)讀取的記錄數(shù)
            reader.setPageSize(3);
            // 設(shè)置數(shù)據(jù)提供者
            reader.setQueryProvider(queryProvider);
            reader.afterPropertiesSet();

            // 所有ItemReader和ItemWriter實(shí)現(xiàn)都會(huì)在ExecutionContext提交之前將其當(dāng)前狀態(tài)存儲(chǔ)在其中,
            // 如果不希望這樣做,可以設(shè)置setSaveState(false)
            reader.setSaveState(true);
        } catch (Exception e) {
            e.printStackTrace();
        }

        return reader;
    }

    /**
     * 處理數(shù)據(jù)
     *
     * @return ItemProcessor Object
     */
    private ItemProcessor getDataProcessor() {
        return student -> {
            // 模擬處理數(shù)據(jù),這里處理就是打印一下
            log.info("processor data : " + student.toString());

            return student;
        };
    }

    /**
     * 寫入數(shù)據(jù)
     *
     * @return ItemWriter Object
     */
    private ItemWriter getDataWriter() {
        return list -> {
            for (Student student : list) {
                // 模擬寫數(shù)據(jù),為了演示的簡(jiǎn)單就不寫入數(shù)據(jù)庫(kù)了
                log.info("write data : " + student);
            }
        };
    }
}

完成以上Job配置后,就可以執(zhí)行了。通常運(yùn)行Job的方式有兩種,一種是我們把Job對(duì)象注入到Spring容器里,Spring Batch默認(rèn)在項(xiàng)目啟動(dòng)完成后就會(huì)運(yùn)行容器里配置好的Job,如果配置了多個(gè)Job也可以通過(guò)配置文件去指定。但是以我個(gè)人經(jīng)驗(yàn)來(lái)說(shuō)大多數(shù)業(yè)務(wù)場(chǎng)景都是要求定時(shí)去執(zhí)行Job的,所以這里采用定時(shí)任務(wù)去運(yùn)行Job。通過(guò)調(diào)用的方式主動(dòng)去運(yùn)行Job的話,需要使用到JobLauncher中的run方法。具體代碼如下:

package org.zero.example.springbatchdemo.task;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.zero.example.springbatchdemo.task.job.DataBatchJob;

/**
 * 簡(jiǎn)單的定時(shí)任務(wù)
 *
 * @author 01
 * @date 2019-02-24
 **/
@Slf4j
@Component
public class TimeTask {

    private final JobLauncher jobLauncher;
    private final DataBatchJob dataBatchJob;

    @Autowired
    public TimeTask(JobLauncher jobLauncher, DataBatchJob dataBatchJob) {
        this.jobLauncher = jobLauncher;
        this.dataBatchJob = dataBatchJob;
    }

    // 定時(shí)任務(wù),每十秒執(zhí)行一次
    @Scheduled(cron = "0/10 * * * * ?")
    public void runBatch() throws JobParametersInvalidException, JobExecutionAlreadyRunningException,
            JobRestartException, JobInstanceAlreadyCompleteException {
        log.info("定時(shí)任務(wù)執(zhí)行了...");
        // 在運(yùn)行一個(gè)job的時(shí)候需要添加至少一個(gè)參數(shù),這個(gè)參數(shù)最后會(huì)被寫到batch_job_execution_params表中,
        // 不添加這個(gè)參數(shù)的話,job不會(huì)運(yùn)行,并且這個(gè)參數(shù)在表中中不能重復(fù),若設(shè)置的參數(shù)已存在表中,則會(huì)拋出異常,
        // 所以這里才使用時(shí)間戳作為參數(shù)
        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("timestamp", System.currentTimeMillis())
                .toJobParameters();

        // 獲取job并運(yùn)行
        Job job = dataBatchJob.dataHandleJob();
        JobExecution execution = jobLauncher.run(job, jobParameters);
        log.info("定時(shí)任務(wù)結(jié)束. Exit Status : {}", execution.getStatus());
    }
}

最后,我們需要在Spring Boot的啟動(dòng)類上加上兩個(gè)注解,以開啟批處理及定時(shí)任務(wù),否則批處理和定時(shí)任務(wù)都不會(huì)執(zhí)行,代碼如下:

package org.zero.example.springbatchdemo;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
 * :@EnableBatchProcessing 用于開啟批處理作業(yè)的配置
 * :@EnableScheduling 用于開啟定時(shí)任務(wù)的配置
 *
 * @author 01
 * @date 2019-02-24
 */
@EnableScheduling
@EnableBatchProcessing
@SpringBootApplication
public class SpringBatchDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchDemoApplication.class, args);
    }
}

啟動(dòng)項(xiàng)目,等待十秒,控制臺(tái)輸出日志如下,證明我們的批處理程序正常執(zhí)行了:
Spring Batch快速入門


網(wǎng)站欄目:SpringBatch快速入門
分享路徑:http://weahome.cn/article/jjgiii.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部