真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Python對ElasticSearch獲取數(shù)據(jù)及操作-創(chuàng)新互聯(lián)

使用Python對ElasticSearch獲取數(shù)據(jù)及操作,供大家參考,具體內(nèi)容如下

成都創(chuàng)新互聯(lián)公司10多年企業(yè)網(wǎng)站建設(shè)服務(wù);為您提供網(wǎng)站建設(shè),網(wǎng)站制作,網(wǎng)頁設(shè)計及高端網(wǎng)站定制服務(wù),企業(yè)網(wǎng)站建設(shè)及推廣,對成都自拌料攪拌車等多個方面擁有多年的網(wǎng)站營銷經(jīng)驗的網(wǎng)站建設(shè)公司。

Version

Python :2.7

ElasticSearch:6.3

代碼:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
  @Time  : 2018/7/4
  @Author : LiuXueWen
  @Site  : 
  @File  : ElasticSearchOperation.py
  @Software: PyCharm
  @Description: 對elasticsearch數(shù)據(jù)的操作,包括獲取數(shù)據(jù),發(fā)送數(shù)據(jù)
"""
import elasticsearch
import json

import Util_Ini_Operation

class elasticsearch_data():
  def __init__(self,hosts,username,password,maxsize,is_ssl):
    # 初始化ini操作腳本,獲取配置文件
    try:
      # 判斷請求方式是否ssl加密
      if is_ssl == "true":
        # 獲取證書地址
        cert_pem = Util_Ini_Operation.get_ini("config.ini").get_key_value("certs","certs")
        es_ssl = elasticsearch.Elasticsearch(
          # 地址
          hosts=hosts,
          # 用戶名密碼
          http_auth=(username,password),
          # 開啟ssl
          use_ssl=True,
          # 確認有加密證書
          verify_certs=True,
          # 對應(yīng)的加密證書地址
          client_cert=cert_pem
        )
        self.es = es_ssl
      elif is_ssl == "false":
        # 創(chuàng)建普通類型的ES客戶端
        es_ordinary = elasticsearch.Elasticsearch(hosts, http_auth=(username, password), maxsize=int(maxsize))
        self.es = es_ordinary
    except Exception as e:
      print(e)


  def query_data(self,keywords_list,date):
    gte = "now-"+str(date)
    query_data = {
      # 查詢語句
      "query": {
        "bool": {
          "must": [
            {
              "query_string": {
                "query": keywords_list,
                "analyze_wildcard": True
              }
            },
            {
              "range": {
                "@timestamp": {
                  "gte": gte,
                  "lte": "now",
                  "format": "epoch_millis"
                }
              }
            }
          ],
          "must_not": []
        }
      }
    }
    return query_data

  # 從es獲取數(shù)據(jù)
  def get_datas_by_query(self,index_name,keywords,param,date):
    '''
    :param index_name: 索引名稱
    :param keywords: 關(guān)鍵字詞,數(shù)組
    :param param: 需要數(shù)據(jù)條件,例如_source
    :param date: 過去時間范圍,字符串格式,例如過去30分鐘內(nèi)數(shù)據(jù),"30m"
    :return: all_datas 返回查詢到的所有數(shù)據(jù)(已經(jīng)過param過濾)
    '''

    all_datas = []
    # 遍歷所有的查詢條件
    for keywords_list in keywords:
      # DSL語句
      query_data = self.query_data(keywords_list,date)
      res = self.es.search(
        index=index_name,
        body=query_data
      )
      for hit in res['hits']['hits']:
        # 獲取指定的內(nèi)容
        response = hit[param]
        # 添加所有數(shù)據(jù)到數(shù)據(jù)集中
        all_datas.append(response)
    # 返回所有數(shù)據(jù)內(nèi)容
    return all_datas

  # 當(dāng)索引不存在創(chuàng)建索引
  def create_index(self,index_name):
    '''
    :param index_name: 索引名稱
    :return:如果創(chuàng)建成功返回創(chuàng)建結(jié)果信息,試過已經(jīng)存在創(chuàng)建新的index失敗返回index的名稱
    '''
    # 獲取索引的映射
    # index_mapping = IndexMapping.index_mapping
    # # 判斷索引是否存在
    # if self.es.indices.exists(index=index_name) is not True:
    #   # 創(chuàng)建索引
    #   res = self.es.indices.create(index=index_name,body=index_mapping)
    #   # 返回結(jié)果
    #   return res
    # else:
    #   # 返回索引名稱
    #   return index_name
    pass

  # 插入指定的單條數(shù)據(jù)內(nèi)容
  def insert_single_data(self,index_name,doc_type,data):
    '''
    :param index_name: 索引名稱
    :param doc_type: 文檔類型
    :param data: 需要插入的數(shù)據(jù)內(nèi)容
    :return: 執(zhí)行結(jié)果
    '''
    res = self.es.index(index=index_name,doc_type=doc_type,body=data)
    return res

  # 向ES中新增數(shù)據(jù),批量插入
  def insert_datas(self,index_name):
    '''
    :desc 通過讀取指定的文件內(nèi)容獲取需要插入的數(shù)據(jù)集
    :param index_name: 索引名稱
    :return: 插入成功的數(shù)據(jù)條數(shù)
    '''
    insert_datas = []
    # 判斷插入數(shù)據(jù)的索引是否存在
    self.createIndex(index_name=index_name)
    # 獲取插入數(shù)據(jù)的文件地址
    data_file_path = self.ini.get_key_value("datafile","datafilepath")
    # 獲取需要插入的數(shù)據(jù)集
    with open(data_file_path,"r+") as data_file:
      # 獲取文件所有數(shù)據(jù)
      data_lines = data_file.readlines()
      for data_line in data_lines:
        # string to json
        data_line = json.loads(data_line)
        insert_datas.append(data_line)
    # 批量處理
    res = self.es.bulk(index=index_name,body=insert_datas,raise_on_error=True)
    return res

  # 從ES中在指定的索引中刪除指定數(shù)據(jù)(根據(jù)id判斷)
  def delete_data_by_id(self,index_name,doc_type,id):
    '''
    :param index_name: 索引名稱
    :param index_type: 文檔類型
    :param id: 唯一標(biāo)識id
    :return: 刪除結(jié)果信息
    '''
    res = self.es.delete(index=index_name,doc_type=doc_type,id=id)
    return res

  # 根據(jù)條件刪除數(shù)據(jù)
  def delete_data_by_query(self,index_name,doc_type,param,gt_time,lt_time):
    '''
    :param index_name:索引名稱,為空查詢所有索引
    :param doc_type:文檔類型,為空查詢所有文檔類型
    :param param:過濾條件值
    :param gt_time:時間范圍,大于該時間
    :param lt_time:時間范圍,小于該時間
    :return:執(zhí)行條件刪除后的結(jié)果信息
    '''
    # DSL語句
    query_data = {
      # 查詢語句
      "query": {
        "bool": {
          "must": [
            {
              "query_string": {
                "query": param,
                "analyze_wildcard": True
              }
            },
            {
              "range": {
                "@timestamp": {
                  "gte": gt_time,
                  "lte": lt_time,
                  "format": "epoch_millis"
                }
              }
            }
          ],
          "must_not": []
        }
      }
    }
    res = self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=True)
    return res

  # 指定index中刪除指定時間段內(nèi)的全部數(shù)據(jù)
  def delete_all_datas(self,index_name,doc_type,gt_time,lt_time):
    '''
    :param index_name:索引名稱,為空查詢所有索引
    :param doc_type:文檔類型,為空查詢所有文檔類型
    :param gt_time:時間范圍,大于該時間
    :param lt_time:時間范圍,小于該時間
    :return:執(zhí)行條件刪除后的結(jié)果信息
    '''
    # DSL語句
    query_data = {
      # 查詢語句
      "query": {
        "bool": {
          "must": [
            {
              "match_all": {}
            },
            {
              "range": {
                "@timestamp": {
                  "gte": gt_time,
                  "lte": lt_time,
                  "format": "epoch_millis"
                }
              }
            }
          ],
          "must_not": []
        }
      }
    }
    res = self.es.delete_by_query(index=index_name, doc_type=doc_type, body=query_data, _source=True)
    return res

  # 修改ES中指定的數(shù)據(jù)
  def update_data_by_id(self,index_name,doc_type,id,data):
    '''
    :param index_name: 索引名稱
    :param doc_type: 文檔類型,為空表示所有類型
    :param id: 文檔唯一標(biāo)識編號
    :param data: 更新的數(shù)據(jù)
    :return: 更新結(jié)果信息
    '''
    res = self.es.update(index=index_name,doc_type=doc_type,id=id,body=data)
    return res

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。


標(biāo)題名稱:Python對ElasticSearch獲取數(shù)據(jù)及操作-創(chuàng)新互聯(lián)
本文URL:http://weahome.cn/article/dgihdo.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部