真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

SpringBatch如何向Elasticsearch批量導(dǎo)入數(shù)據(jù)

這篇文章主要介紹了Spring Batch如何向Elasticsearch批量導(dǎo)入數(shù)據(jù),具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

主要從事網(wǎng)頁(yè)設(shè)計(jì)、PC網(wǎng)站建設(shè)(電腦版網(wǎng)站建設(shè))、wap網(wǎng)站建設(shè)(手機(jī)版網(wǎng)站建設(shè))、響應(yīng)式網(wǎng)站開(kāi)發(fā)、程序開(kāi)發(fā)、微網(wǎng)站、微信小程序等,憑借多年來(lái)在互聯(lián)網(wǎng)的打拼,我們?cè)诨ヂ?lián)網(wǎng)網(wǎng)站建設(shè)行業(yè)積累了豐富的網(wǎng)站制作、成都網(wǎng)站設(shè)計(jì)、網(wǎng)絡(luò)營(yíng)銷(xiāo)經(jīng)驗(yàn),集策劃、開(kāi)發(fā)、設(shè)計(jì)、營(yíng)銷(xiāo)、管理等多方位專(zhuān)業(yè)化運(yùn)作于一體,具備承接不同規(guī)模與類(lèi)型的建設(shè)項(xiàng)目的能力。

1.介紹

當(dāng)系統(tǒng)有大量數(shù)據(jù)需要從數(shù)據(jù)庫(kù)導(dǎo)入Elasticsearch時(shí),使用Spring Batch可以提高導(dǎo)入的效率。Spring Batch使用ItemReader分頁(yè)讀取數(shù)據(jù),ItemWriter批量寫(xiě)數(shù)據(jù)。由于Spring Batch沒(méi)有提供Elastisearch的ItemWriter和ItemReader,本示例中自定義一個(gè)ElasticsearchItemWriter(ElasticsearchItemReader),用于批量導(dǎo)入。

2.示例

2.1 pom.xml

本文使用spring data jest連接ES(也可以使用spring data elasticsearch連接ES),ES版本為5.5.3



  4.0.0

  com.hfcsbc.estl
  es-etl
  0.0.1-SNAPSHOT
  jar

  es-etl
  Demo project for Spring Boot

  
    org.springframework.boot
    spring-boot-starter-parent
    2.0.0.M7
     
  

  
    UTF-8
    UTF-8
    1.8
  

  
    
      org.springframework.boot
      spring-boot-starter
    

    
      org.springframework.boot
      spring-boot-starter-data-jpa
    

    
      org.postgresql
      postgresql
    

    
      org.springframework.boot
      spring-boot-starter-batch
    

    
      com.github.vanroy
      spring-boot-starter-data-jest
      3.0.0.RELEASE
    

    
      io.searchbox
      jest
      5.3.2
    

    
      org.projectlombok
      lombok
    

    
      org.springframework.boot
      spring-boot-starter-test
      test
    
  

  
    
      
        org.springframework.boot
        spring-boot-maven-plugin
      
    
  

  
    
      spring-snapshots
      Spring Snapshots
      https://repo.spring.io/snapshot
      
        true
      
    
    
      spring-milestones
      Spring Milestones
      https://repo.spring.io/milestone
      
        false
      
    

  

  
    
      spring-snapshots
      Spring Snapshots
      https://repo.spring.io/snapshot
      
        true
      
    
    
      spring-milestones
      Spring Milestones
      https://repo.spring.io/milestone
      
        false
      
    
  

2.2 實(shí)體類(lèi)及repository

package com.hfcsbc.esetl.domain;
import lombok.Data;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.OneToOne;

/**
 * Create by pengchao on 2018/2/23
 */
@Document(indexName = "person", type = "person", shards = 1, replicas = 0, refreshInterval = "-1")
@Entity
@Data
public class Person {
  @Id
  private Long id;
  private String name;
  @OneToOne
  @Field(type = FieldType.Nested)
  private Address address;
}
package com.hfcsbc.esetl.domain;
import lombok.Data;
import javax.persistence.Entity;
import javax.persistence.Id;
/**
 * Create by pengchao on 2018/2/23
 */
@Entity
@Data
public class Address {
  @Id
  private Long id;
  private String name;
}
package com.hfcsbc.esetl.repository.jpa;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.data.jpa.repository.JpaRepository;
/**
 * Create by pengchao on 2018/2/23
 */
public interface PersonRepository extends JpaRepository {
}
package com.hfcsbc.esetl.repository.es;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
/**
 * Create by pengchao on 2018/2/23
 */
public interface EsPersonRepository extends ElasticsearchRepository {
}

2.3 配置elasticsearchItemWriter

package com.hfcsbc.esetl.itemWriter;
import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemWriter;
import java.util.List;
/**
 * Create by pengchao on 2018/2/23
 */
public class ElasticsearchItemWriter implements ItemWriter, ItemWriteListener, StepExecutionListener {

  private EsPersonRepository personRepository;

  public ElasticsearchItemWriter(EsPersonRepository personRepository) {
    this.personRepository = personRepository;
  }

  @Override
  public void beforeWrite(List items) {

  }

  @Override
  public void afterWrite(List items) {

  }

  @Override
  public void onWriteError(Exception exception, List items) {

  }

  @Override
  public void beforeStep(StepExecution stepExecution) {

  }

  @Override
  public ExitStatus afterStep(StepExecution stepExecution) {
    return null;
  }

  @Override
  public void write(List items) throws Exception {
    //實(shí)現(xiàn)類(lèi)AbstractElasticsearchRepository的saveAll方法調(diào)用的是elasticsearchOperations.bulkIndex(queries),為批量索引
    personRepository.saveAll(items);
  }
}

2.4 配置ElasticsearchItemReader(本示例未使用,僅供參考)

package com.hfcsbc.esetl.itemReader;
import org.springframework.batch.item.data.AbstractPaginatedDataItemReader;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import java.util.Iterator;
/**
 * Create by pengchao on 2018/2/24
 */
public class ElasticsearchItemReader extends AbstractPaginatedDataItemReader implements InitializingBean {

  private final ElasticsearchOperations elasticsearchOperations;

  private final SearchQuery query;

  private final Class targetType;

  public ElasticsearchItemReader(ElasticsearchOperations elasticsearchOperations, SearchQuery query, Class targetType) {
    this.elasticsearchOperations = elasticsearchOperations;
    this.query = query;
    this.targetType = targetType;
  }

  @Override
  protected Iterator doPageRead() {
    return (Iterator)elasticsearchOperations.queryForList(query, targetType).iterator();
  }

  @Override
  public void afterPropertiesSet() throws Exception {
  }
}

2.5 配置spring batch需要的配置

package com.hfcsbc.esetl.config;
import com.hfcsbc.esetl.itemWriter.ElasticsearchItemWriter;
import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import javax.persistence.EntityManagerFactory;
import javax.sql.DataSource;
/**
 * Create by pengchao on 2018/2/23
 */
@Configuration
@EnableBatchProcessing
public class BatchConfig {
  @Autowired
  private EsPersonRepository personRepository;

  @Bean
  public ItemReader orderItemReader(EntityManagerFactory entityManagerFactory){
    JpaPagingItemReader reader = new JpaPagingItemReader();
    String sqlQuery = "select * from person";
    try {
      JpaNativeQueryProvider queryProvider = new JpaNativeQueryProvider();
      queryProvider.setSqlQuery(sqlQuery);
      queryProvider.setEntityClass(Person.class);
      queryProvider.afterPropertiesSet();
      reader.setEntityManagerFactory(entityManagerFactory);
      reader.setPageSize(10000);
      reader.setQueryProvider(queryProvider);
      reader.afterPropertiesSet();
      reader.setSaveState(true);
    } catch (Exception e) {
      e.printStackTrace();
    }

    return reader;
  }

  @Bean
  public ElasticsearchItemWriter itemWriter(){
    return new ElasticsearchItemWriter(personRepository);
  }

  @Bean
  public Step step(StepBuilderFactory stepBuilderFactory,
           ItemReader itemReader,
           ItemWriter itemWriter){
    return stepBuilderFactory
        .get("step1")
        .chunk(10000)
        .reader(itemReader)
        .writer(itemWriter)
        .build();
  }

  @Bean
  public Job job(JobBuilderFactory jobBuilderFactory, Step step){
    return jobBuilderFactory
        .get("importJob")
        .incrementer(new RunIdIncrementer())
        .flow(step)
        .end()
        .build();
  }

  /**
   * spring batch執(zhí)行時(shí)會(huì)創(chuàng)建一些自身需要的表,這里指定表創(chuàng)建的位置:dataSource
   * @param dataSource
   * @param manager
   * @return
   */
  @Bean
  public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager manager){
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
    jobRepositoryFactoryBean.setDataSource(dataSource);
    jobRepositoryFactoryBean.setTransactionManager(manager);
    jobRepositoryFactoryBean.setDatabaseType("postgres");
    try {
      return jobRepositoryFactoryBean.getObject();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return null;
  }
}

2.6配置數(shù)據(jù)庫(kù)及es的連接地址

spring:
 redis:
  host: 192.168.1.222
 data:
  jest:
   uri: http://192.168.1.222:9200
   username: elastic
   password: changeme

 jpa:
  database: POSTGRESQL
  show-sql: true
  hibernate:
   ddl-auto: update

 datasource:
  platform: postgres
  url: jdbc:postgresql://192.168.1.222:5433/person
  username: hfcb
  password: hfcb
  driver-class-name: org.postgresql.Driver
  max-active: 2

spring.batch.initialize-schema: always

2.7 配置入口類(lèi)

package com.hfcsbc.esetl;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;

@SpringBootApplication(exclude = {ElasticsearchAutoConfiguration.class, ElasticsearchDataAutoConfiguration.class})
@EnableElasticsearchRepositories(basePackages = "com.hfcsbc.esetl.repository")
@EnableJpaRepositories(basePackages = "com.hfcsbc.esetl.repository.jpa")
public class EsEtlApplication {

  public static void main(String[] args) {
    SpringApplication.run(EsEtlApplication.class, args);
  }
}

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“Spring Batch如何向Elasticsearch批量導(dǎo)入數(shù)據(jù)”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來(lái)學(xué)習(xí)!


分享標(biāo)題:SpringBatch如何向Elasticsearch批量導(dǎo)入數(shù)據(jù)
標(biāo)題網(wǎng)址:http://weahome.cn/article/iegjid.html

其他資訊

在線咨詢(xún)

微信咨詢(xún)

電話咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部