1???場景說明
網(wǎng)站建設(shè)哪家好,找成都創(chuàng)新互聯(lián)!專注于網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、小程序設(shè)計、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了海鹽免費建站歡迎大家使用!
讀取CVS文件,經(jīng)過處理后,保存到數(shù)據(jù)庫。
?
2???項目結(jié)構(gòu)
應(yīng)用程序 | 啟動主程序 | DemoApplication.java |
讀取文件(輸入文件) | UserItemReader.java | |
處理數(shù)據(jù) | UserItemProcess.java | |
輸出文件 | UserItemWriter.java | |
調(diào)度批作業(yè) | 定時處理配置 | QuartzConfiguration.java |
定時調(diào)度 | QuartzJobLauncher.java | |
輔助文件 | 數(shù)據(jù)文件 | User.txt |
對象實體(傳遞對象) | User.java | |
Meaven配置文件 | Pom.xml |
2.1??Pom.xml
??? xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 ? http://maven.apache.org/xsd/maven-4.0.0.xsd"> ??? ? ??? ??? ??? ??? ? ??? ??? ? ??? ?????? ?????? ?????? ?????? ??? ? ??? ?????? ??? ??? ?????? ??? ? ??? ?????? ?????????? ?????????? ?????? ?????? ?????????? ?????????? ?????? ?????? ?????????? ?????????? ?????? ?????? ?????????? ?????????? ?????? ?????? ?????? ?????????? ?????????? ?????????? ?????? ?????? ?????????? ?????????? ?????????? ?????? ?????? ?????????? ?????????? ?????????? ?????? ?????? ?????????? ?????????? ?????? ?????? ?????????? ?????????? ?????????? ?????? ?????? ?????????? ?????????? ?????????? ?????? ??? ? ??? ?????? ?????????? ????????????? ????????????? ?????????? ?????? ??? ? ? |
2.2??User.java
package com.zy.model; ? public class User { ?????? private ? String id; ?????? private ? String name; ?????? private ? String age; ?????? ?????? public ? User(String id, String name, String age) { ????????????? this.id ? = id; ????????????? this.name ? = name; ????????????? this.age ? = age; ?????? } ? ?????? public ? String getId() { ????????????? return ? id; ?????? } ? ?????? public ? void setId(String id) { ????????????? this.id ? = id; ?????? } ? ?????? public ? String getName() { ????????????? return ? name; ?????? } ? ?????? public ? void setName(String name) { ????????????? this.name ? = name; ?????? } ? ?????? public ? String getAge() { ????????????? return ? age; ?????? } ? ?????? public ? void setAge(String age) { ????????????? this.age ? = age; ?????? } ? ?????? @Override ?????? public ? String toString() { ????????????? return ? "User [id=" + id + ", name=" + name + ", age=" ? + age + "]"; ?????? } ?????? } |
2.3??UserItemReader.java
package com.zy.reader; ? import ? org.springframework.batch.item.file.FlatFileItemReader; import ? org.springframework.batch.item.file.LineMapper; import ? org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.mapping.FieldSetMapper; import ? org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import ? org.springframework.batch.item.file.transform.FieldSet; import ? org.springframework.batch.item.file.transform.LineTokenizer; import ? org.springframework.core.io.ClassPathResource; import ? org.springframework.validation.BindException; ? import com.zy.model.User; //從user.txt文件中讀取信息到User public class UserItemReader extends ? FlatFileItemReader ?????? public ? UserItemReader(){ ????????????? createReader(); ?????? } ?????? ?????? private ? void createReader(){ ????????????? this.setResource(new ? ClassPathResource("data/User.txt")); ????????????? this.setLinesToSkip(1); ????????????? this.setLineMapper(userLineMapper()); ?????? } ?????? ?????? private ? LineMapper ????????????? DefaultLineMapper ????????????? lineMapper.setLineTokenizer(userLineTokenizer()); ????????????? lineMapper.setFieldSetMapper(new ? UserFieldStepMapper()); ????????????? lineMapper.afterPropertiesSet(); ? ????????????? return ? lineMapper; ?????? } ?????? ??? ? private LineTokenizer userLineTokenizer(){ ?????? ? ?DelimitedLineTokenizer ? tokenizer = new DelimitedLineTokenizer(); ??????? ? tokenizer.setNames(new String[]{"ID", "NAME", ? "AGE"}); ??????? ? return tokenizer; ??? ? } ??? ? ??? ? private static class UserFieldStepMapper implements ? FieldSetMapper ????????????? @Override ????????????? public ? User mapFieldSet(FieldSet fieldSet) throws BindException { ??????????? return new ? User(fieldSet.readString("ID"), ??????????????????? ? fieldSet.readString("NAME"), ??????????????????? ? fieldSet.readString("AGE")); ????????????? } ? ??? ? } ? ??? ? } |
2.4??User.txt
ID,NAME,AGE 1,zy,28 2,tom,20 3,terry,30 4,lerry,18 5,bob,25 6,linda,27 7,marry,39 8,long,22 9,kin,33 10,王五,40 |
?
2.5??UserItemProcessor.java
package com.zy.processor; import ? org.springframework.batch.item.ItemProcessor; import com.zy.model.User; ? public class UserItemProcessor implements ? ItemProcessor ? ?????? @Override ?????? public ? User process(User item) throws Exception { ????????????? if ? (Integer.parseInt(item.getAge()) > 20) { ???????????????? ???????????????????? return ? item; ????????????? } ????????????? return ? null; ?????? } ? } |
?
2.6??UserItemWriter.java
package com.zy.writer; import java.util.List; import ? org.springframework.batch.item.ItemWriter; import com.zy.model.User; ? public class UserItemWriter implements ? ItemWriter ? ?????? @Override ?????? public ? void write(List extends User> items) throws Exception { ????????????? for(User ? user : items){ ???????????????????? System.out.println(user); ????????????? } ?????? } ? } |
2.7??QuartzJobLauncher
package com.zy.QuartzConfiguration; ? import java.text.SimpleDateFormat; import java.util.Date; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.JobKey; import ? org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import ? org.springframework.batch.core.JobParameters; import ? org.springframework.batch.core.configuration.JobLocator; import ? org.springframework.batch.core.launch.JobLauncher; import ? org.springframework.scheduling.quartz.QuartzJobBean; ? public class QuartzJobLauncher extends ? QuartzJobBean { ?????? @Override ?????? protected ? void executeInternal(JobExecutionContext context) throws JobExecutionException ? { ????????????? ????????????? JobDetail ? jobDetail = context.getJobDetail(); ????????????? JobDataMap ? jobDataMap = jobDetail.getJobDataMap(); ????????????? String ? jobName = jobDataMap.getString("jobName"); ????????????? JobLauncher ? jobLauncher = (JobLauncher) jobDataMap.get("jobLauncher"); ????????????? JobLocator ? jobLocator = (JobLocator) jobDataMap.get("jobLocator"); ????????????? System.out.println("jobName ? : " + jobName); ????????????? System.out.println("jobLauncher ? : " + jobLauncher); ????????????? System.out.println("jobLocator ? : " + jobLocator); ????????????? JobKey ? key = context.getJobDetail().getKey(); ????????????? System.out.println(key.getName() ? + " : " + key.getGroup()); ????????????? SimpleDateFormat ? sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); ????????????? System.out.println("Current ? time : " + sf.format(new Date())); ????????????? ????????????? try ? { ???????????????????? Job ? job = jobLocator.getJob(jobName); ???????????????????? JobExecution ? jobExecution = jobLauncher.run(job, new JobParameters()); ????????????? } ? catch (Exception e) { ???????????????????? e.printStackTrace(); ????????????? } ????????????? ?????? } ? } |
?
2.8??QuartzConfiguration
package com.zy.QuartzConfiguration; ? import java.util.HashMap; import java.util.Map; ? import ? org.springframework.batch.core.configuration.JobLocator; import ? org.springframework.batch.core.configuration.JobRegistry; import ? org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor; import ? org.springframework.batch.core.launch.JobLauncher; import ? org.springframework.beans.factory.annotation.Autowired; import ? org.springframework.context.annotation.Bean; import ? org.springframework.context.annotation.Configuration; import ? org.springframework.scheduling.quartz.CronTriggerFactoryBean; import ? org.springframework.scheduling.quartz.JobDetailFactoryBean; import ? org.springframework.scheduling.quartz.SchedulerFactoryBean; ? @Configuration public class QuartzConfiguration { ?????? ?????? //自動注入進(jìn)來的是SimpleJobLauncher ?????? @Autowired ?????? private ? JobLauncher jobLauncher; ?????? ?????? @Autowired ?????? private ? JobLocator jobLocator; ?????? ?????? /*用來注冊job*/ ?????? /*JobRegistry會自動注入進(jìn)來*/ ?????? @Bean ?????? public ? JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry ? jobRegistry){ ????????????? JobRegistryBeanPostProcessor ? jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor(); ????????????? jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry); ????????????? return ? jobRegistryBeanPostProcessor; ?????? } ?????? ?????? @Bean ?????? public ? JobDetailFactoryBean jobDetailFactoryBean(){ ????????????? JobDetailFactoryBean ? jobFactory = new JobDetailFactoryBean(); ????????????? jobFactory.setJobClass(QuartzJobLauncher.class); ????????????? jobFactory.setGroup("my_group"); ????????????? jobFactory.setName("my_job"); ????????????? Map ????????????? map.put("jobName", ? "zyJob"); ????????????? map.put("jobLauncher", ? jobLauncher); ????????????? map.put("jobLocator", ? jobLocator); ????????????? jobFactory.setJobDataAsMap(map); ????????????? return ? jobFactory; ?????? } ?????? ?????? @Bean ?????? public ? CronTriggerFactoryBean cronTriggerFactoryBean(){ ????????????? CronTriggerFactoryBean ? cTrigger = new CronTriggerFactoryBean(); ????????????? System.out.println("------- ? : " + jobDetailFactoryBean().getObject()); ????????????? cTrigger.setJobDetail(jobDetailFactoryBean().getObject()); ????????????? cTrigger.setStartDelay(3000); ????????????? cTrigger.setName("my_trigger"); ????????????? cTrigger.setGroup("trigger_group"); ????????????? cTrigger.setCronExpression("0/3 ? * * * * ? "); //每間隔3s觸發(fā)一次Job任務(wù) ????????????? return ? cTrigger; ?????? } ?????? ?????? @Bean ?????? public ? SchedulerFactoryBean schedulerFactoryBean(){ ????????????? SchedulerFactoryBean ? schedulerFactor = new SchedulerFactoryBean(); ????????????? schedulerFactor.setTriggers(cronTriggerFactoryBean().getObject()); ????????????? return ? schedulerFactor; ?????? } ? } |
?
?
2.9??BatchConfiguration
package com.zy.config; import ? org.springframework.batch.core.Job; import ? org.springframework.batch.core.Step; import ? org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import ? org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import ? org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import ? org.springframework.beans.factory.annotation.Autowired; import ? org.springframework.context.annotation.Bean; import ? org.springframework.context.annotation.Configuration; import ? org.springframework.context.annotation.Import; import com.zy.QuartzConfiguration.QuartzConfiguration; import com.zy.model.User; import ? com.zy.processor.UserItemProcessor; import com.zy.reader.UserItemReader; import com.zy.writer.UserItemWriter; ? @Configuration @EnableBatchProcessing //@Import({QuartzConfiguration.class}) public class BatchConfiguration { ?????? ?????? @Autowired ?????? public ? JobBuilderFactory jobBuilderFactory; ?????? @Autowired ?????? public ? StepBuilderFactory stepBuilderFactory; ?????? ?????? ?????? /*創(chuàng)建job*/ ?????? @Bean ?????? public ? Job jobMethod(){ ????????????? return ? jobBuilderFactory.get("zyJob") ??????????????????????????? .start(stepMethod()) ??????????????????????????? .build(); ?????? } ?????? ?????? /*創(chuàng)建step*/ ?????? @Bean ?????? public ? Step stepMethod(){ ????????????? return ? stepBuilderFactory.get("myStep1") ??????????????????????????? . ??????????????????????????? .reader(new ? UserItemReader()) ??????????????????????????? .processor(new ? UserItemProcessor()) ??????????????????????????? .writer(new ? UserItemWriter()) ??????????????????????????? .allowStartIfComplete(true) ??????????????????????????? .build(); ?????? } ?????? ? } |
?
3???執(zhí)行Job輸出結(jié)果
2019-04-30 21:31:48.049? INFO 9344 --- [ryBean_Worker-5] ? o.s.b.c.l.support.SimpleJobLauncher????? ? : Job: [SimpleJob: [name=zyJob]] completed with the following ? parameters: [{}] and the following status: [COMPLETED] jobName : zyJob jobLauncher : ? org.springframework.batch.core.launch.support.SimpleJobLauncher@2d27244d jobLocator : org.springframework.batch.core.configuration.support.MapJobRegistry@6fc00b5 my_job : my_group Current time : 2019-04-30 21:31:51 2019-04-30 21:31:51.012? INFO 9344 --- [ryBean_Worker-6] ? o.s.b.c.l.support.SimpleJobLauncher????? ? : Job: [SimpleJob: [name=zyJob]] launched with the following ? parameters: [{}] 2019-04-30 21:31:51.028? INFO 9344 --- [ryBean_Worker-6] ? o.s.batch.core.job.SimpleStepHandler???? ? : Executing step: [myStep1] User [id=1, name=zy, age=28] User [id=3, name=terry, age=30] User [id=5, name=bob, age=25] User [id=6, name=linda, age=27] User [id=7, name=marry, age=39] User [id=8, name=long, age=22] User [id=9, name=kin, age=33] User [id=10, name=ww, age=40] |
?
4???概念總結(jié)
Job Repository | 作業(yè)倉庫,負(fù)責(zé)Job,Step執(zhí)行過程中的狀態(tài)保存。 | |
Job Launcher | 作業(yè)調(diào)度器,提供執(zhí)行Job的入口 | |
Job | 作業(yè),多個Step組成,封裝整個批處理操作。 | |
Step | 作業(yè)步,Job的一個執(zhí)行環(huán)節(jié),由多個或者一個Step組裝成Job | |
Tasklet | Step中具體執(zhí)行的邏輯的操作,可以重復(fù)執(zhí)行,可以具體的設(shè)置同步,異步操作。 | |
Chunk | 給定數(shù)量的Item集合,可以定義對Chunk的讀操作,處理操作,寫操作,提交間隔。 | |
Item | 一條數(shù)據(jù)記錄。 | |
ItemReader | 從數(shù)據(jù)源(文件系統(tǒng),數(shù)據(jù)庫,隊列等)讀取Item | |
ItemProcessor | 在寫入數(shù)據(jù)源之前,對數(shù)據(jù)進(jìn)行處理(如:數(shù)據(jù)清洗,轉(zhuǎn)換,過濾,數(shù)據(jù)校驗等)。 | |
ItemWriter | 將Item批量寫入數(shù)據(jù)源(文件系統(tǒng),數(shù)據(jù)庫,隊列等)。 |
5???Spring Batch結(jié)構(gòu)
Spring Batch的一個基本層級結(jié)構(gòu)。
首先,Spring Batch運行的基本單位是一個Job,一個Job就做一件批處理的事情。
一個Job包含很多Step,step就是每個job要執(zhí)行的單個步驟。
如下圖所示,Step里面,會有Tasklet,Tasklet是一個任務(wù)單元,它是屬于可以重復(fù)利用的東西。
然后是Chunk,chunk就是數(shù)據(jù)塊,你需要定義多大的數(shù)據(jù)量是一個chunk。
Chunk里面就是不斷循環(huán)的一個流程,讀數(shù)據(jù),處理數(shù)據(jù),然后寫數(shù)據(jù)。Spring Batch會不斷的循環(huán)這個流程,直到批處理數(shù)據(jù)完成。