使用Spring Batch如何實(shí)現(xiàn)將txt文件寫入數(shù)據(jù)庫(kù)?針對(duì)這個(gè)問題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡(jiǎn)單易行的方法。
在弋陽(yáng)等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站設(shè)計(jì)、成都做網(wǎng)站 網(wǎng)站設(shè)計(jì)制作按需求定制開發(fā),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站制作,成都營(yíng)銷網(wǎng)站建設(shè),外貿(mào)網(wǎng)站制作,弋陽(yáng)網(wǎng)站建設(shè)費(fèi)用合理。
1、創(chuàng)建 Maven 項(xiàng)目,并在 pom.xml 中添加依賴
org.springframework.boot spring-boot-starter-parent 1.5.2.RELEASE 1.8 org.springframework.boot spring-boot-starter-batch org.springframework.boot spring-boot-starter-data-jpa org.springframework.boot spring-boot-starter-test test org.mybatis.spring.boot mybatis-spring-boot-starter 1.2.0 org.projectlombok lombok 1.12.6 org.apache.commons commons-lang3 3.4 MySQL mysql-connector-java runtime com.alibaba druid 1.0.26 org.springframework.boot spring-boot-starter-web
這里是這個(gè)小項(xiàng)目中用到的所有依賴,包括連接數(shù)據(jù)庫(kù)的依賴以及工具類等。
2、編寫 Model 類
我們要從文檔中讀取的有效列就是 uid,tag,type,就是用戶 ID,用戶可能包含的標(biāo)簽(用于推送),用戶類別(用戶用戶之間互相推薦)。
UserMap.java 中的 @Entity,@Column 注解,是為了利用 JPA 生成數(shù)據(jù)表而寫的,可要可不要。
UserMap.java
@Data @EqualsAndHashCode @NoArgsConstructor @AllArgsConstructor //@Entity(name = "user_map") public class UserMap extends BaseModel { @Column(name = "uid", unique = true, nullable = false) private Long uid; @Column(name = "tag") private String tag; @Column(name = "type") private Integer type; }
3、實(shí)現(xiàn)批處理配置類
BatchConfiguration.java
@Configuration @EnableBatchProcessing public class BatchConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("prodDataSource") DataSource prodDataSource; @Bean public FlatFileItemReaderreader() { FlatFileItemReader reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("c152.txt")); reader.setLineMapper(new DefaultLineMapper () {{ setLineTokenizer(new DelimitedLineTokenizer("|") {{ setNames(new String[]{"uid", "tag", "type"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper () {{ setTargetType(UserMap.class); }}); }}); return reader; } @Bean public JdbcBatchItemWriter importWriter() { JdbcBatchItemWriter writer = new JdbcBatchItemWriter<>(); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); writer.setSql("INSERT INTO user_map (uid,tag,type) VALUES (:uid, :tag,:type)"); writer.setDataSource(prodDataSource); return writer; } @Bean public JdbcBatchItemWriter updateWriter() { JdbcBatchItemWriter writer = new JdbcBatchItemWriter<>(); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); writer.setSql("UPDATE user_map SET type = (:type),tag = (:tag) WHERE uid = (:uid)"); writer.setDataSource(prodDataSource); return writer; } @Bean public UserMapItemProcessor processor(UserMapItemProcessor.ProcessStatus processStatus) { return new UserMapItemProcessor(processStatus); } @Bean public Job importUserJob(JobCompletionNotificationListener listener) { return jobBuilderFactory.get("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(importStep()) .end() .build(); } @Bean public Step importStep() { return stepBuilderFactory.get("importStep") . chunk(100) .reader(reader()) .processor(processor(IMPORT)) .writer(importWriter()) .build(); } @Bean public Job updateUserJob(JobCompletionNotificationListener listener) { return jobBuilderFactory.get("updateUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(updateStep()) .end() .build(); } @Bean public Step updateStep() { return stepBuilderFactory.get("updateStep") . chunk(100) .reader(reader()) .processor(processor(UPDATE)) .writer(updateWriter()) .build(); } }
prodDataSource 是假設(shè)用戶已經(jīng)設(shè)置好的,如果不知道怎么配置,也可以參考之前的文章進(jìn)行配置:Springboot 集成 Mybatis。
reader(),這方法從文件中讀取數(shù)據(jù),并且設(shè)置了一些必要的參數(shù)。緊接著是寫操作 importWriter()
和 updateWriter()
,讀者看其中一個(gè)就好,因?yàn)槲疫@里是需要更新或者修改的,所以分為兩個(gè)。
processor(ProcessStatus status)
,該方法是對(duì)我們處理數(shù)據(jù)的類進(jìn)行實(shí)例化,這里我根據(jù) status 是 IMPORT 還是 UPDATE 來(lái)獲取不同的處理結(jié)果。
其他的看代碼就可以看懂了,哈哈,不詳細(xì)說(shuō)了。
4、將獲得的數(shù)據(jù)進(jìn)行清洗
UserMapItemProcessor.java
public class UserMapItemProcessor implements ItemProcessor{ private static final int MAX_TAG_LENGTH = 200; private ProcessStatus processStatus; public UserMapItemProcessor(ProcessStatus processStatus) { this.processStatus = processStatus; } @Autowired IUserMapService userMapService; private static final String TAG_PATTERN_STR = "^[a-zA-Z0-9\\u4E00-\\u9FA5_-]+$"; public static final Pattern TAG_PATTERN = Pattern.compile(TAG_PATTERN_STR); private static final Logger LOG = LoggerFactory.getLogger(UserMapItemProcessor.class); @Override public UserMap process(UserMap userMap) throws Exception { Long uid = userMap.getUid(); String tag = cleanTag(userMap.getTag()); Integer label = userMap.getType() == null ? Integer.valueOf(0) : userMap.getType(); if (StringUtils.isNotBlank(tag)) { Map params = new HashMap<>(); params.put("uid", uid); UserMap userMapFromDB = userMapService.selectOne(params); if (userMapFromDB == null) { if (this.processStatus == ProcessStatus.IMPORT) { return new UserMap(uid, tag, label); } } else { if (this.processStatus == ProcessStatus.UPDATE) { if (!tag.equals(userMapFromDB.getTag()) && !label.equals(userMapFromDB.getType())) { userMapFromDB.setType(label); userMapFromDB.setTag(tag); return userMapFromDB; } } } } return null; } /** * 清洗標(biāo)簽 * * @param tag * @return */ private static String cleanTag(String tag) { if (StringUtils.isNotBlank(tag)) { try { tag = tag.substring(tag.indexOf("{") + 1, tag.lastIndexOf("}")); String[] tagArray = tag.split(","); Optional reduce = Arrays.stream(tagArray).parallel() .map(str -> str.split(":")[0]) .map(str -> str.replaceAll("\'", "")) .map(str -> str.replaceAll(" ", "")) .filter(str -> TAG_PATTERN.matcher(str).matches()) .reduce((x, y) -> x + "," + y); Function str = (s -> s.length() > MAX_TAG_LENGTH ? s.substring(0, MAX_TAG_LENGTH) : s); return str.apply(reduce.get()); } catch (Exception e) { LOG.error(e.getMessage(), e); } } return null; } protected enum ProcessStatus { IMPORT, UPDATE; } public static void main(String[] args) { String distinctTag = cleanTag("Counter({'《重新定義》': 3, '輕想上的輕小說(shuō)': 3, '小說(shuō)': 2, 'Fate': 2, '同人小說(shuō)': 2, '雪狼八組': 1, " + "'社會(huì)': 1, '人文': 1, '短篇': 1, '重新定義': 1, 'AMV': 1, '《FBD》': 1, '《雪狼六組》': 1, '戰(zhàn)爭(zhēng)': 1, '《灰羽聯(lián)盟》': 1, " + "'誰(shuí)說(shuō)輕想沒人寫小說(shuō)': 1})"); System.out.println(distinctTag); } }
讀取到的數(shù)據(jù)格式如 main()
方法所示,清理之后的結(jié)果如:
輕想上的輕小說(shuō),小說(shuō),Fate,同人小說(shuō),雪狼八組,社會(huì),人文,短篇,重新定義,AMV,戰(zhàn)爭(zhēng),誰(shuí)說(shuō)輕想沒人寫小說(shuō) 。
去掉了特殊符號(hào)以及數(shù)字等。使用了 Java8 的 Lambda 表達(dá)式。
并且這里在處理的時(shí)候,判斷如果該數(shù)據(jù)用戶已經(jīng)存在,則進(jìn)行更新,如果不存在,則新增。
5、Job 執(zhí)行結(jié)束回調(diào)類
JobCompletionNotificationListener.java
@Component public class JobCompletionNotificationListener extends JobExecutionListenerSupport { private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class); private final JdbcTemplate jdbcTemplate; @Autowired public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } @Override public void afterJob(JobExecution jobExecution) { System.out.println("end ....."); } }
具體的邏輯可自行實(shí)現(xiàn)。
完成以上幾個(gè)步驟,運(yùn)行項(xiàng)目,就可以讀取并寫入數(shù)據(jù)到數(shù)據(jù)庫(kù)了。
關(guān)于使用Spring Batch如何實(shí)現(xiàn)將txt文件寫入數(shù)據(jù)庫(kù)問題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識(shí)。