真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

heka從kalka中讀取數(shù)據(jù)的示例分析

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)碛嘘P(guān)heka從kalka中讀取數(shù)據(jù)的示例分析,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

創(chuàng)新互聯(lián)是專業(yè)的如皋網(wǎng)站建設(shè)公司,如皋接單;提供網(wǎng)站制作、成都做網(wǎng)站,網(wǎng)頁設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行如皋網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來合作!

heka從kalka中讀取數(shù)據(jù)。

配置:

[hekad]
maxprocs = 2

[KafkaInputExample]
type = "KafkaInput"
topic = "test"
addrs = ["localhost:9092"]

[RstEncoder]

[LogOutput]
message_matcher = "TRUE"
encoder = "RstEncoder"

上述配置只有從kalfka中讀取數(shù)據(jù)并顯示到console,寫到kalfka中數(shù)據(jù),

heka從kalka中讀取數(shù)據(jù)的示例分析

結(jié)果

:Timestamp: 2016-07-21 09:39:46.342093657 +0000 UTC
:Type: heka.kafka
:Hostname: master
:Pid: 0
:Uuid: 501b0a0e-63a9-4eee-b9ca-ab572c17d273
:Logger: KafkaInputExample
:Payload: {"msg":"Start Request","event":"artemis.web.ensure-running1","userid":"12","extra":{"workspace-id":"cN907xLngi"},"time":"2015-05-06T    20:40:05.509926234Z","severity":1}
:EnvVersion: 
:Severity: 7
:Fields:
    | name:"Key" type:bytes value:
    | name:"Topic" type:string value:"test"
    | name:"Partition" type:integer value:0
    | name:"Offset" type:integer value:8

讀取出來的數(shù)據(jù)放到了payload中,而fileds中存放了讀取kalkfa中的一些信息。那么可以使用jsondecoder進(jìn)行解析。

[hekad]
maxprocs = 2

[KafkaInputExample]
type = "KafkaInput"
topic = "test"
addrs = ["localhost:9092"]
decoder="JsonDecoder"

[JsonDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/json.lua"

        [JsonDecoder.config]
        type = "artemis"
        payload_keep = true
        map_fields = true
        Severity = "severity"

[RstEncoder]

[LogOutput]
message_matcher = "TRUE"
encoder = "RstEncoder"

結(jié)果如下:

:Timestamp: 2016-07-21 09:42:34 +0000 UTC
:Type: artemis
:Hostname: master
:Pid: 0
:Uuid: 3965285c-70ac-4069-a1a3-a9bcf518d3e8
:Logger: KafkaInputExample
:Payload: {"msg":"Start Request","event":"artemis.web.ensure-running2","userid":"11","extra":{"workspace-id":"cN907xLngi"},"time":"2015-05-06T    20:40:05.509926234Z","severity":1}
:EnvVersion: 
:Severity: 1
:Fields:
    | name:"time" type:string value:"2015-05-06T    20:40:05.509926234Z"
    | name:"msg" type:string value:"Start Request"
    | name:"userid" type:string value:"11"
    | name:"event" type:string value:"artemis.web.ensure-running2"
    | name:"extra.workspace-id" type:string value:"cN907xLngi"

經(jīng)過decoder解析之后,fileds發(fā)生了改變,但是我們可以看到Logger顯示的還是KafkaInputExample,說明數(shù)據(jù)不是decoder產(chǎn)生,而是Input產(chǎn)生,只不過使用了decoder進(jìn)行了解析,重寫改寫了fields而已。

接下來,把數(shù)據(jù)錄入都es中吧。
[hekad]
maxprocs = 2

[KafkaInputExample]
type = "KafkaInput"
topic = "test"
addrs = ["localhost:9092"]
decoder="JsonDecoder"

[JsonDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/json.lua"

        [JsonDecoder.config]
        type = "artemis"
        payload_keep = true
        map_fields = true
        Severity = "severity"

[ESJsonEncoder]
index = "%{Type}-%{%Y.%m.%d}"
es_index_from_timestamp = true
type_name = "%{Type}"
    [ESJsonEncoder.field_mappings]
    Timestamp = "@timestamp"
    Severity = "level"

[ElasticSearchOutput]
message_matcher = "TRUE"
encoder = "ESJsonEncoder"
flush_interval = 1

導(dǎo)入到es中,也需要json,所以使用ESJsonEncoder,同時(shí)指定索引名字和類型。執(zhí)行結(jié)果如下,

heka從kalka中讀取數(shù)據(jù)的示例分析

可以看到,除了heka中元數(shù)據(jù)field之外,還有JsonDecoder生成field啊,其實(shí)是截取JsonDecoder中的fields屬性中拿出。注意,Payload不解析。

:Fields:
    | name:"time" type:string value:"2015-05-06T    20:40:05.509926234Z"
    | name:"msg" type:string value:"Start Request"
    | name:"userid" type:string value:"11"
    | name:"event" type:string value:"artemis.web.ensure-running2"
    | name:"extra.workspace-id" type:string value:"cN907xLngi"

這些field當(dāng)然隨著數(shù)據(jù)不同而不同,那么稱之為dynamic fileds。

入es的時(shí)候,可以指定提取哪些dynamic fields,

fields=["Timestamp","Uuid","Type","Logger","Pid","Hostname","DynamicFields"]
dynamic_fields=["msg","userid"]

只要使用dynamic_fileds,就必須要在fields中指定DynamicFields。

如果沒有dynamic_fileds,那么fields只能列舉幾個(gè)固定的屬性,參照官方文檔即可。

完成的列子:

[hekad]
maxprocs = 2

[KafkaInputExample]
type = "KafkaInput"
topic = "test"
addrs = ["localhost:9092"]
decoder="JsonDecoder"

[JsonDecoder]
type = "SandboxDecoder"
[hekad]
maxprocs = 2

[KafkaInputExample]
type = "KafkaInput"
topic = "test"
addrs = ["localhost:9092"]
decoder="JsonDecoder"

[JsonDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/json.lua"

        [JsonDecoder.config]
        type = "artemis"
        payload_keep = true
        map_fields = true
        Severity = "severity"

[ESJsonEncoder]
index = "%{Type}-%{%Y.%m.%d}"
es_index_from_timestamp = true
type_name = "%{Type}"
fields=["Timestamp","Uuid","Type","Logger","Pid","Hostname","DynamicFields"]
dynamic_fields=["msg","userid"]

raw_bytes_fields=["Payload"]
    [ESJsonEncoder.field_mappings]
    Timestamp = "@timestamp"
    Severity = "level"

[ElasticSearchOutput]
message_matcher = "TRUE"
encoder = "ESJsonEncoder"
flush_interval = 1

結(jié)果如下,

heka從kalka中讀取數(shù)據(jù)的示例分析

上述就是小編為大家分享的heka從kalka中讀取數(shù)據(jù)的示例分析了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


當(dāng)前題目:heka從kalka中讀取數(shù)據(jù)的示例分析
標(biāo)題網(wǎng)址:http://weahome.cn/article/jgchjs.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部