這篇文章主要講解了“Elasticsearch reindex及Java使用sliceScorll優(yōu)化查詢的方法”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Elasticsearch reindex及Java使用sliceScorll優(yōu)化查詢的方法”吧!
創(chuàng)新互聯(lián)主要從事成都做網(wǎng)站、成都網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)興縣,10多年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):18980820575
Reindex會(huì)將一個(gè)索引的數(shù)據(jù)復(fù)制到另一個(gè)已存在的索引,但是并不會(huì)復(fù)制原索引的mapping(映射)、shard(分片)、replicas(副本)等配置信息。
簡(jiǎn)單實(shí)例如下
POST _reindex { "source": { "remote": { "host": "http://otherhost:9200", // 遠(yuǎn)程es的ip和port列表 "socket_timeout": "1m", "connect_timeout": "10s" // 超時(shí)時(shí)間設(shè)置 }, "index": "my_index_name", // 源索引名稱 "query": { // 滿足條件的數(shù)據(jù) "match": { "test": "data" } } }, "dest": { "index": "dest_index_name" // 目標(biāo)索引名稱 } }
具體詳細(xì)的使用參考
ElasticSearch 6.3版本 Document APIs之Reindex API
elasticsearch 基礎(chǔ) —— ReIndex
在java中對(duì)于reindexapi沒有找到,于是作者采用了別名轉(zhuǎn)換和全I(xiàn)ndex查詢加上bulk插入的方式對(duì)于索引進(jìn)行遷移。
但是轉(zhuǎn)移數(shù)據(jù)實(shí)在太慢,所以使用了slice對(duì)scorll查詢進(jìn)行優(yōu)化
多線程reindex
具體開啟線程數(shù)根據(jù)Index分片數(shù)進(jìn)行調(diào)整,最好和主分片數(shù)相同,本例子為五個(gè)分片,同時(shí)還使用了別名轉(zhuǎn)換對(duì)索引進(jìn)行無縫銜接避免數(shù)據(jù)正常插入讀取
//建新索引 createUserRecordIndex(newIndexName, typeName); //篩選時(shí)間 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); RangeQueryBuilder rangeQueryBuilder; rangeQueryBuilder = QueryBuilders.rangeQuery("createTime") .gte(DateUtil.format(DateUtil.parse(createBeginDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT)) .lte(DateUtil.format(DateUtil.parse(createEndDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT)); boolQueryBuilder.must(rangeQueryBuilder); try { //多線程處理查詢請(qǐng)求 Listlist = new ArrayList<>(); for (int i = 0; i < 5; i++) { SliceBuilder sliceBuilder = new SliceBuilder(i, 5); SearchResponse response = EsBuildersServiceUtil.getESClient() .prepareSearch(userRecordAlias) .setTypes(userRecordType) .setQuery(boolQueryBuilder) .setSize(1000).setScroll(new TimeValue(10000)) .slice(sliceBuilder) .execute() .actionGet(); SliceQuery sliceQuery = new SliceQuery(newIndexName, typeName, response); Future submit = threadPoolTaskExecutor.submit(sliceQuery); list.add(submit); } for (Future future : list) { future.get(); } } catch (Exception e) { log.error("reindex error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_INDEX_CONVERT_ERROR); } try { //別名轉(zhuǎn)換 EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().removeAlias(oldIndexName, userRecordAlias).execute().actionGet(); EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().addAlias(newIndexName, userRecordAlias).execute().actionGet(); } catch (Exception e) { log.error(" convertAlias error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_ALIASES_CONVERT_ERROR); }
slice線程
class SliceQuery implements Callable { private String newIndexName; private String typeName; private SearchResponse response; private SliceQuery(String newIndexName, String typeName, SearchResponse response) { this.newIndexName = newIndexName; this.typeName = typeName; this.response = response; } @Override public Void call() { //獲取總數(shù)量 long totalCount = response.getHits().getTotalHits(); //計(jì)算總次數(shù),每次搜索數(shù)量為分片數(shù)*設(shè)置的size大小 int page = (int) totalCount / 1000; operateRecordList(response, newIndexName, typeName); for (int i = 0; i < page; i++) { //再次發(fā)送請(qǐng)求,并使用上次搜索結(jié)果的ScrollId response = EsBuildersServiceUtil.getESClient().prepareSearchScroll(response.getScrollId()) .setScroll(new TimeValue(10000)).execute() .actionGet(); operateRecordList(response, newIndexName, typeName); } return null; } }
批量插入
/** * 從查詢數(shù)據(jù)中獲取并批量插入Index * * @param response * @param indexName * @param typeName */ private void operateRecordList(SearchResponse response, String indexName, String typeName) { try { SearchHits hits = response.getHits(); Listlist = new ArrayList<>(); for (SearchHit hit : hits) { String sourceAsString = hit.getSourceAsString(); list.add(JSON.parseObject(sourceAsString, AddUserRecordRequest.class)); } //批量插入 saveBulkRecord(list, indexName, typeName); } catch (Exception e) { log.error("operateRecordList error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR); } } /** * 批量插入 * * @param list * @param indexName * @param typeName */ private void saveBulkRecord(List list, String indexName, String typeName) { try { BulkRequestBuilder bulkRequest = EsBuildersServiceUtil.getESClient().prepareBulk(); for (AddUserRecordRequest recordRequest : list) { JSONObject json = JSONObject.fromObject(recordRequest); bulkRequest.add(EsBuildersServiceUtil.getESClient() .prepareIndex(indexName, typeName) .setSource(json)); } if (list.size() > 0) { bulkRequest.execute().actionGet(); } } catch (Exception e) { log.error("saveBulkRecord error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR); } }
感謝各位的閱讀,以上就是“Elasticsearch reindex及Java使用sliceScorll優(yōu)化查詢的方法”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)Elasticsearch reindex及Java使用sliceScorll優(yōu)化查詢的方法這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!