這篇文章主要介紹了Spring Batch如何向Elasticsearch批量導(dǎo)入數(shù)據(jù),具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
主要從事網(wǎng)頁(yè)設(shè)計(jì)、PC網(wǎng)站建設(shè)(電腦版網(wǎng)站建設(shè))、wap網(wǎng)站建設(shè)(手機(jī)版網(wǎng)站建設(shè))、響應(yīng)式網(wǎng)站開(kāi)發(fā)、程序開(kāi)發(fā)、微網(wǎng)站、微信小程序等,憑借多年來(lái)在互聯(lián)網(wǎng)的打拼,我們?cè)诨ヂ?lián)網(wǎng)網(wǎng)站建設(shè)行業(yè)積累了豐富的網(wǎng)站制作、成都網(wǎng)站設(shè)計(jì)、網(wǎng)絡(luò)營(yíng)銷(xiāo)經(jīng)驗(yàn),集策劃、開(kāi)發(fā)、設(shè)計(jì)、營(yíng)銷(xiāo)、管理等多方位專(zhuān)業(yè)化運(yùn)作于一體,具備承接不同規(guī)模與類(lèi)型的建設(shè)項(xiàng)目的能力。
1.介紹
當(dāng)系統(tǒng)有大量數(shù)據(jù)需要從數(shù)據(jù)庫(kù)導(dǎo)入Elasticsearch時(shí),使用Spring Batch可以提高導(dǎo)入的效率。Spring Batch使用ItemReader分頁(yè)讀取數(shù)據(jù),ItemWriter批量寫(xiě)數(shù)據(jù)。由于Spring Batch沒(méi)有提供Elastisearch的ItemWriter和ItemReader,本示例中自定義一個(gè)ElasticsearchItemWriter(ElasticsearchItemReader),用于批量導(dǎo)入。
2.示例
2.1 pom.xml
本文使用spring data jest連接ES(也可以使用spring data elasticsearch連接ES),ES版本為5.5.3
4.0.0 com.hfcsbc.estl es-etl 0.0.1-SNAPSHOT jar es-etl Demo project for Spring Boot org.springframework.boot spring-boot-starter-parent 2.0.0.M7 UTF-8 UTF-8 1.8 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-data-jpa org.postgresql postgresql org.springframework.boot spring-boot-starter-batch com.github.vanroy spring-boot-starter-data-jest 3.0.0.RELEASE io.searchbox jest 5.3.2 org.projectlombok lombok org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin spring-snapshots Spring Snapshots https://repo.spring.io/snapshot true spring-milestones Spring Milestones https://repo.spring.io/milestone false spring-snapshots Spring Snapshots https://repo.spring.io/snapshot true spring-milestones Spring Milestones https://repo.spring.io/milestone false
2.2 實(shí)體類(lèi)及repository
package com.hfcsbc.esetl.domain; import lombok.Data; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.FieldType; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.OneToOne; /** * Create by pengchao on 2018/2/23 */ @Document(indexName = "person", type = "person", shards = 1, replicas = 0, refreshInterval = "-1") @Entity @Data public class Person { @Id private Long id; private String name; @OneToOne @Field(type = FieldType.Nested) private Address address; }
package com.hfcsbc.esetl.domain; import lombok.Data; import javax.persistence.Entity; import javax.persistence.Id; /** * Create by pengchao on 2018/2/23 */ @Entity @Data public class Address { @Id private Long id; private String name; }
package com.hfcsbc.esetl.repository.jpa; import com.hfcsbc.esetl.domain.Person; import org.springframework.data.jpa.repository.JpaRepository; /** * Create by pengchao on 2018/2/23 */ public interface PersonRepository extends JpaRepository{ }
package com.hfcsbc.esetl.repository.es; import com.hfcsbc.esetl.domain.Person; import org.springframework.data.elasticsearch.repository.ElasticsearchRepository; /** * Create by pengchao on 2018/2/23 */ public interface EsPersonRepository extends ElasticsearchRepository{ }
2.3 配置elasticsearchItemWriter
package com.hfcsbc.esetl.itemWriter; import com.hfcsbc.esetl.repository.es.EsPersonRepository; import com.hfcsbc.esetl.domain.Person; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.ItemWriteListener; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; import org.springframework.batch.item.ItemWriter; import java.util.List; /** * Create by pengchao on 2018/2/23 */ public class ElasticsearchItemWriter implements ItemWriter, ItemWriteListener , StepExecutionListener { private EsPersonRepository personRepository; public ElasticsearchItemWriter(EsPersonRepository personRepository) { this.personRepository = personRepository; } @Override public void beforeWrite(List extends Person> items) { } @Override public void afterWrite(List extends Person> items) { } @Override public void onWriteError(Exception exception, List extends Person> items) { } @Override public void beforeStep(StepExecution stepExecution) { } @Override public ExitStatus afterStep(StepExecution stepExecution) { return null; } @Override public void write(List extends Person> items) throws Exception { //實(shí)現(xiàn)類(lèi)AbstractElasticsearchRepository的saveAll方法調(diào)用的是elasticsearchOperations.bulkIndex(queries),為批量索引 personRepository.saveAll(items); } }
2.4 配置ElasticsearchItemReader(本示例未使用,僅供參考)
package com.hfcsbc.esetl.itemReader; import org.springframework.batch.item.data.AbstractPaginatedDataItemReader; import org.springframework.beans.factory.InitializingBean; import org.springframework.data.elasticsearch.core.ElasticsearchOperations; import org.springframework.data.elasticsearch.core.query.SearchQuery; import java.util.Iterator; /** * Create by pengchao on 2018/2/24 */ public class ElasticsearchItemReaderextends AbstractPaginatedDataItemReader implements InitializingBean { private final ElasticsearchOperations elasticsearchOperations; private final SearchQuery query; private final Class extends Person> targetType; public ElasticsearchItemReader(ElasticsearchOperations elasticsearchOperations, SearchQuery query, Class extends Person> targetType) { this.elasticsearchOperations = elasticsearchOperations; this.query = query; this.targetType = targetType; } @Override protected Iterator doPageRead() { return (Iterator )elasticsearchOperations.queryForList(query, targetType).iterator(); } @Override public void afterPropertiesSet() throws Exception { } }
2.5 配置spring batch需要的配置
package com.hfcsbc.esetl.config; import com.hfcsbc.esetl.itemWriter.ElasticsearchItemWriter; import com.hfcsbc.esetl.repository.es.EsPersonRepository; import com.hfcsbc.esetl.domain.Person; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.database.JpaPagingItemReader; import org.springframework.batch.item.database.orm.JpaNativeQueryProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; import javax.persistence.EntityManagerFactory; import javax.sql.DataSource; /** * Create by pengchao on 2018/2/23 */ @Configuration @EnableBatchProcessing public class BatchConfig { @Autowired private EsPersonRepository personRepository; @Bean public ItemReaderorderItemReader(EntityManagerFactory entityManagerFactory){ JpaPagingItemReader reader = new JpaPagingItemReader (); String sqlQuery = "select * from person"; try { JpaNativeQueryProvider queryProvider = new JpaNativeQueryProvider (); queryProvider.setSqlQuery(sqlQuery); queryProvider.setEntityClass(Person.class); queryProvider.afterPropertiesSet(); reader.setEntityManagerFactory(entityManagerFactory); reader.setPageSize(10000); reader.setQueryProvider(queryProvider); reader.afterPropertiesSet(); reader.setSaveState(true); } catch (Exception e) { e.printStackTrace(); } return reader; } @Bean public ElasticsearchItemWriter itemWriter(){ return new ElasticsearchItemWriter(personRepository); } @Bean public Step step(StepBuilderFactory stepBuilderFactory, ItemReader itemReader, ItemWriter itemWriter){ return stepBuilderFactory .get("step1") .chunk(10000) .reader(itemReader) .writer(itemWriter) .build(); } @Bean public Job job(JobBuilderFactory jobBuilderFactory, Step step){ return jobBuilderFactory .get("importJob") .incrementer(new RunIdIncrementer()) .flow(step) .end() .build(); } /** * spring batch執(zhí)行時(shí)會(huì)創(chuàng)建一些自身需要的表,這里指定表創(chuàng)建的位置:dataSource * @param dataSource * @param manager * @return */ @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager manager){ JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(manager); jobRepositoryFactoryBean.setDatabaseType("postgres"); try { return jobRepositoryFactoryBean.getObject(); } catch (Exception e) { e.printStackTrace(); } return null; } }
2.6配置數(shù)據(jù)庫(kù)及es的連接地址
spring: redis: host: 192.168.1.222 data: jest: uri: http://192.168.1.222:9200 username: elastic password: changeme jpa: database: POSTGRESQL show-sql: true hibernate: ddl-auto: update datasource: platform: postgres url: jdbc:postgresql://192.168.1.222:5433/person username: hfcb password: hfcb driver-class-name: org.postgresql.Driver max-active: 2 spring.batch.initialize-schema: always
2.7 配置入口類(lèi)
package com.hfcsbc.esetl; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration; import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration; import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories; import org.springframework.data.jpa.repository.config.EnableJpaRepositories; @SpringBootApplication(exclude = {ElasticsearchAutoConfiguration.class, ElasticsearchDataAutoConfiguration.class}) @EnableElasticsearchRepositories(basePackages = "com.hfcsbc.esetl.repository") @EnableJpaRepositories(basePackages = "com.hfcsbc.esetl.repository.jpa") public class EsEtlApplication { public static void main(String[] args) { SpringApplication.run(EsEtlApplication.class, args); } }
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“Spring Batch如何向Elasticsearch批量導(dǎo)入數(shù)據(jù)”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來(lái)學(xué)習(xí)!