這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)碛嘘P(guān)使用java怎么對elasticsearch進(jìn)行操作,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
創(chuàng)新互聯(lián)公司堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都網(wǎng)站設(shè)計(jì)、成都做網(wǎng)站、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的常德網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
Java操作es集群步驟1:配置集群對象信息;2:創(chuàng)建客戶端;3:查看集群信息
1:集群名稱
默認(rèn)集群名為elasticsearch,如果集群名稱和指定的不一致則在使用節(jié)點(diǎn)資源時(shí)會(huì)報(bào)錯(cuò)。
2:嗅探功能
通過client.transport.sniff啟動(dòng)嗅探功能,這樣只需要指定集群中的某一個(gè)節(jié)點(diǎn)(不一定是主節(jié)點(diǎn)),然后會(huì)加載集群中的其他節(jié)點(diǎn),這樣只要程序不停即使此節(jié)點(diǎn)宕機(jī)仍然可以連接到其他節(jié)點(diǎn)。
3:查詢類型SearchType.QUERY_THEN_FETCH
es 查詢共有4種查詢類型
QUERY_AND_FETCH:
主節(jié)點(diǎn)將查詢請求分發(fā)到所有的分片中,各個(gè)分片按照自己的查詢規(guī)則即詞頻文檔頻率進(jìn)行打分排序,然后將結(jié)果返回給主節(jié)點(diǎn),主節(jié)點(diǎn)對所有數(shù)據(jù)進(jìn)行匯總排序然后再返回給客戶端,此種方式只需要和es交互一次。
這種查詢方式存在數(shù)據(jù)量和排序問題,主節(jié)點(diǎn)會(huì)匯總所有分片返回的數(shù)據(jù)這樣數(shù)據(jù)量會(huì)比較大,二是各個(gè)分片上的規(guī)則可能不一致。
QUERY_THEN_FETCH:
主節(jié)點(diǎn)將請求分發(fā)給所有分片,各個(gè)分片打分排序后將數(shù)據(jù)的id和分值返回給主節(jié)點(diǎn),主節(jié)點(diǎn)收到后進(jìn)行匯總排序再根據(jù)排序后的id到對應(yīng)的節(jié)點(diǎn)讀取對應(yīng)的數(shù)據(jù)再返回給客戶端,此種方式需要和es交互兩次。
這種方式解決了數(shù)據(jù)量問題但是排序問題依然存在而且是es的默認(rèn)查詢方式
DEF_QUERY_AND_FETCH: 和 DFS_QUERY_THEN_FETCH:
將各個(gè)分片的規(guī)則統(tǒng)一起來進(jìn)行打分。解決了排序問題但是DFS_QUERY_AND_FETCH仍然存在數(shù)據(jù)量問題,DFS_QUERY_THEN_FETCH兩種噢乖你問題都解決但是效率是最差的。
1, 獲取client, 兩種方式獲取
@Before public void before() throws Exception { Mapmap = new HashMap (); map.put("cluster.name", "elasticsearch_wenbronk"); Settings.Builder settings = Settings.builder().put(map); client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), Integer.parseInt("9300"))); }
@Before public void before11() throws Exception { // 創(chuàng)建客戶端, 使用的默認(rèn)集群名, "elasticSearch" // client = TransportClient.builder().build() // .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), 9300)); // 通過setting對象指定集群配置信息, 配置的集群名 Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch_wenbronk") // 設(shè)置集群名 // .put("client.transport.sniff", true) // 開啟嗅探 , 開啟后會(huì)一直連接不上, 原因未知 // .put("network.host", "192.168.50.37") .put("client.transport.ignore_cluster_name", true) // 忽略集群名字驗(yàn)證, 打開后集群名字不對也能連接上 // .put("client.transport.nodes_sampler_interval", 5) //報(bào)錯(cuò), // .put("client.transport.ping_timeout", 5) // 報(bào)錯(cuò), ping等待時(shí)間, .build(); client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300))); // 默認(rèn)5s // 多久打開連接, 默認(rèn)5s System.out.println("success connect"); }
PS: 官網(wǎng)給的2種方式都不能用, 需要合起來才能用, 浪費(fèi)老子一下午...
其他參數(shù)的意義:
代碼:
package com.wenbronk.javaes; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkProcessor.Listener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.script.Script; import org.junit.Before; import org.junit.Test; import com.alibaba.fastjson.JSONObject; /** * 使用java API操作elasticSearch * * @author 231 * */ public class JavaESTest { private TransportClient client; private IndexRequest source; /** * 獲取連接, 第一種方式 * @throws Exception */ // @Before public void before() throws Exception { Mapmap = new HashMap (); map.put("cluster.name", "elasticsearch_wenbronk"); Settings.Builder settings = Settings.builder().put(map); client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), Integer.parseInt("9300"))); } /** * 查看集群信息 */ @Test public void testInfo() { List nodes = client.connectedNodes(); for (DiscoveryNode node : nodes) { System.out.println(node.getHostAddress()); } } /** * 組織json串, 方式1,直接拼接 */ public String createJson1() { String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; return json; } /** * 使用map創(chuàng)建json */ public Map createJson2() { Map json = new HashMap (); json.put("user", "kimchy"); json.put("postDate", new Date()); json.put("message", "trying out elasticsearch"); return json; } /** * 使用fastjson創(chuàng)建 */ public JSONObject createJson3() { JSONObject json = new JSONObject(); json.put("user", "kimchy"); json.put("postDate", new Date()); json.put("message", "trying out elasticsearch"); return json; } /** * 使用es的幫助類 */ public XContentBuilder createJson4() throws Exception { // 創(chuàng)建json對象, 其中一個(gè)創(chuàng)建json的方式 XContentBuilder source = XContentFactory.jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying to out ElasticSearch") .endObject(); return source; } /** * 存入索引中 * @throws Exception */ @Test public void test1() throws Exception { XContentBuilder source = createJson4(); // 存json入索引中 IndexResponse response = client.prepareIndex("twitter", "tweet", "1").setSource(source).get(); // // 結(jié)果獲取 String index = response.getIndex(); String type = response.getType(); String id = response.getId(); long version = response.getVersion(); boolean created = response.isCreated(); System.out.println(index + " : " + type + ": " + id + ": " + version + ": " + created); } /** * get API 獲取指定文檔信息 */ @Test public void testGet() { // GetResponse response = client.prepareGet("twitter", "tweet", "1") // .get(); GetResponse response = client.prepareGet("twitter", "tweet", "1") .setOperationThreaded(false) // 線程安全 .get(); System.out.println(response.getSourceAsString()); } /** * 測試 delete api */ @Test public void testDelete() { DeleteResponse response = client.prepareDelete("twitter", "tweet", "1") .get(); String index = response.getIndex(); String type = response.getType(); String id = response.getId(); long version = response.getVersion(); System.out.println(index + " : " + type + ": " + id + ": " + version); } /** * 測試更新 update API * 使用 updateRequest 對象 * @throws Exception */ @Test public void testUpdate() throws Exception { UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("twitter"); updateRequest.type("tweet"); updateRequest.id("1"); updateRequest.doc(XContentFactory.jsonBuilder() .startObject() // 對沒有的字段添加, 對已有的字段替換 .field("gender", "male") .field("message", "hello") .endObject()); UpdateResponse response = client.update(updateRequest).get(); // 打印 String index = response.getIndex(); String type = response.getType(); String id = response.getId(); long version = response.getVersion(); System.out.println(index + " : " + type + ": " + id + ": " + version); } /** * 測試update api, 使用client * @throws Exception */ @Test public void testUpdate2() throws Exception { // 使用Script對象進(jìn)行更新 // UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1") // .setScript(new Script("hits._source.gender = \"male\"")) // .get(); // 使用XContFactory.jsonBuilder() 進(jìn)行更新 // UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1") // .setDoc(XContentFactory.jsonBuilder() // .startObject() // .field("gender", "malelelele") // .endObject()).get(); // 使用updateRequest對象及script // UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1") // .script(new Script("ctx._source.gender=\"male\"")); // UpdateResponse response = client.update(updateRequest).get(); // 使用updateRequest對象及documents進(jìn)行更新 UpdateResponse response = client.update(new UpdateRequest("twitter", "tweet", "1") .doc(XContentFactory.jsonBuilder() .startObject() .field("gender", "male") .endObject() )).get(); System.out.println(response.getIndex()); } /** * 測試update * 使用updateRequest * @throws Exception * @throws InterruptedException */ @Test public void testUpdate3() throws InterruptedException, Exception { UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1") .script(new Script("ctx._source.gender=\"male\"")); UpdateResponse response = client.update(updateRequest).get(); } /** * 測試upsert方法 * @throws Exception * */ @Test public void testUpsert() throws Exception { // 設(shè)置查詢條件, 查找不到則添加生效 IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "2") .source(XContentFactory.jsonBuilder() .startObject() .field("name", "214") .field("gender", "gfrerq") .endObject()); // 設(shè)置更新, 查找到更新下面的設(shè)置 UpdateRequest upsert = new UpdateRequest("twitter", "tweet", "2") .doc(XContentFactory.jsonBuilder() .startObject() .field("user", "wenbronk") .endObject()) .upsert(indexRequest); client.update(upsert).get(); } /** * 測試multi get api * 從不同的index, type, 和id中獲取 */ @Test public void testMultiGet() { MultiGetResponse multiGetResponse = client.prepareMultiGet() .add("twitter", "tweet", "1") .add("twitter", "tweet", "2", "3", "4") .add("anothoer", "type", "foo") .get(); for (MultiGetItemResponse itemResponse : multiGetResponse) { GetResponse response = itemResponse.getResponse(); if (response.isExists()) { String sourceAsString = response.getSourceAsString(); System.out.println(sourceAsString); } } } /** * bulk 批量執(zhí)行 * 一次查詢可以update 或 delete多個(gè)document */ @Test public void testBulk() throws Exception { BulkRequestBuilder bulkRequest = client.prepareBulk(); bulkRequest.add(client.prepareIndex("twitter", "tweet", "1") .setSource(XContentFactory.jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject())); bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(XContentFactory.jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject())); BulkResponse response = bulkRequest.get(); System.out.println(response.getHeaders()); } /** * 使用bulk processor * @throws Exception */ @Test public void testBulkProcessor() throws Exception { // 創(chuàng)建BulkPorcessor對象 BulkProcessor bulkProcessor = BulkProcessor.builder(client, new Listener() { public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) { // TODO Auto-generated method stub } // 執(zhí)行出錯(cuò)時(shí)執(zhí)行 public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) { // TODO Auto-generated method stub } public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) { // TODO Auto-generated method stub } }) // 1w次請求執(zhí)行一次bulk .setBulkActions(10000) // 1gb的數(shù)據(jù)刷新一次bulk .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) // 固定5s必須刷新一次 .setFlushInterval(TimeValue.timeValueSeconds(5)) // 并發(fā)請求數(shù)量, 0不并發(fā), 1并發(fā)允許執(zhí)行 .setConcurrentRequests(1) // 設(shè)置退避, 100ms后執(zhí)行, 最大請求3次 .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build(); // 添加單次請求 bulkProcessor.add(new IndexRequest("twitter", "tweet", "1")); bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2")); // 關(guān)閉 bulkProcessor.awaitClose(10, TimeUnit.MINUTES); // 或者 bulkProcessor.close(); } }
tes2代碼:
package com.wenbronk.javaes; import java.net.InetSocketAddress; import org.apache.lucene.queryparser.xml.FilterBuilderFactory; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings.Builder; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortParseElement; import org.junit.Before; import org.junit.Test; /** * 使用java API操作elasticSearch * search API * @author 231 * */ public class JavaESTest2 { private TransportClient client; /** * 獲取client對象 */ @Before public void testBefore() { Builder builder = Settings.settingsBuilder(); builder.put("cluster.name", "wenbronk_escluster"); // .put("client.transport.ignore_cluster_name", true); Settings settings = builder.build(); org.elasticsearch.client.transport.TransportClient.Builder transportBuild = TransportClient.builder(); TransportClient client1 = transportBuild.settings(settings).build(); client = client1.addTransportAddress((new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300)))); System.out.println("success connect to escluster"); } /** * 測試查詢 */ @Test public void testSearch() { // SearchRequestBuilder searchRequestBuilder = client.prepareSearch("twitter", "tweet", "1"); // SearchResponse response = searchRequestBuilder.setTypes("type1", "type2") // .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) // .setQuery(QueryBuilders.termQuery("user", "test")) // .setPostFilter(QueryBuilders.rangeQuery("age").from(0).to(1)) // .setFrom(0).setSize(2).setExplain(true) // .execute().actionGet(); SearchResponse response = client.prepareSearch() .execute().actionGet(); // SearchHits hits = response.getHits(); // for (SearchHit searchHit : hits) { // for(Iteratoriterator = searchHit.iterator(); iterator.hasNext(); ) { // SearchHitField next = iterator.next(); // System.out.println(next.getValues()); // } // } System.out.println(response); } /** * 測試scroll api * 對大量數(shù)據(jù)的處理更有效 */ @Test public void testScrolls() { QueryBuilder queryBuilder = QueryBuilders.termQuery("twitter", "tweet"); SearchResponse response = client.prepareSearch("twitter") .addSort(SortParseElement.DOC_FIELD_NAME, SortOrder.ASC) .setScroll(new TimeValue(60000)) .setQuery(queryBuilder) .setSize(100).execute().actionGet(); while(true) { for (SearchHit hit : response.getHits().getHits()) { System.out.println("i am coming"); } SearchResponse response2 = client.prepareSearchScroll(response.getScrollId()) .setScroll(new TimeValue(60000)).execute().actionGet(); if (response2.getHits().getHits().length == 0) { System.out.println("oh no====="); break; } } } /** * 測試multiSearch */ @Test public void testMultiSearch() { QueryBuilder qb1 = QueryBuilders.queryStringQuery("elasticsearch"); SearchRequestBuilder requestBuilder1 = client.prepareSearch().setQuery(qb1).setSize(1); QueryBuilder qb2 = QueryBuilders.matchQuery("user", "kimchy"); SearchRequestBuilder requestBuilder2 = client.prepareSearch().setQuery(qb2).setSize(1); MultiSearchResponse multiResponse = client.prepareMultiSearch().add(requestBuilder1).add(requestBuilder2) .execute().actionGet(); long nbHits = 0; for (MultiSearchResponse.Item item : multiResponse.getResponses()) { SearchResponse response = item.getResponse(); nbHits = response.getHits().getTotalHits(); SearchHit[] hits = response.getHits().getHits(); System.out.println(nbHits); } } /** * 測試聚合查詢 */ @Test public void testAggregation() { SearchResponse response = client.prepareSearch() .setQuery(QueryBuilders.matchAllQuery()) // 先使用query過濾掉一部分 .addAggregation(AggregationBuilders.terms("term").field("user")) .addAggregation(AggregationBuilders.dateHistogram("agg2").field("birth") .interval(DateHistogramInterval.YEAR)) .execute().actionGet(); Aggregation aggregation2 = response.getAggregations().get("term"); Aggregation aggregation = response.getAggregations().get("agg2"); // SearchResponse response2 = client.search(new SearchRequest().searchType(SearchType.QUERY_AND_FETCH)).actionGet(); } /** * 測試terminate */ @Test public void testTerminateAfter() { SearchResponse response = client.prepareSearch("twitter").setTerminateAfter(1000).get(); if (response.isTerminatedEarly()) { System.out.println("ternimate"); } } /** * 過濾查詢: 大于gt, 小于lt, 小于等于lte, 大于等于gte */ @Test public void testFilter() { SearchResponse response = client.prepareSearch("twitter") .setTypes("") .setQuery(QueryBuilders.matchAllQuery()) //查詢所有 .setSearchType(SearchType.QUERY_THEN_FETCH) // .setPostFilter(FilterBuilders.rangeFilter("age").from(0).to(19) // .includeLower(true).includeUpper(true)) // .setPostFilter(FilterBuilderFactory .rangeFilter("age").gte(18).lte(22)) .setExplain(true) //explain為true表示根據(jù)數(shù)據(jù)相關(guān)度排序,和關(guān)鍵字匹配最高的排在前面 .get(); } /** * 分組查詢 */ @Test public void testGroupBy() { client.prepareSearch("twitter").setTypes("tweet") .setQuery(QueryBuilders.matchAllQuery()) .setSearchType(SearchType.QUERY_THEN_FETCH) .addAggregation(AggregationBuilders.terms("user") .field("user").size(0) // 根據(jù)user進(jìn)行分組 // size(0) 也是10 ).get(); } }
上述就是小編為大家分享的使用java怎么對elasticsearch進(jìn)行操作了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。