真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Serverless如何解決數(shù)據(jù)采集分析痛點

本篇文章給大家分享的是有關(guān)Serverless如何解決數(shù)據(jù)采集分析痛點,小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

成都創(chuàng)新互聯(lián)主營本溪網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,成都APP應(yīng)用開發(fā),本溪h5小程序開發(fā)搭建,本溪網(wǎng)站營銷推廣歡迎本溪等地區(qū)企業(yè)咨詢

簡介:眾所周知,游戲行業(yè)在當(dāng)今的互聯(lián)網(wǎng)行業(yè)中算是一棵常青樹。在疫情之前的 2019 年,中國游戲市場營收規(guī)模約 2884.8 億元,同比增長 17.1%。2020 年因為疫情,游戲行業(yè)更是突飛猛進(jìn)。玩游戲本就是中國網(wǎng)民最普遍的娛樂方式之一,疫情期間更甚。據(jù)不完全統(tǒng)計,截至 2019 年,中國移動游戲用戶規(guī)模約 6.6 億人,占中國總網(wǎng)民規(guī)模 8.47 億的 77.92%,可見游戲作為一種低門檻、低成本的娛樂手段,已成為大部分人生活中習(xí)以為常的一部分。

眾所周知,游戲行業(yè)在當(dāng)今的互聯(lián)網(wǎng)行業(yè)中算是一棵常青樹。在疫情之前的 2019 年,中國游戲市場營收規(guī)模約 2884.8 億元,同比增長 17.1%。2020 年因為疫情,游戲行業(yè)更是突飛猛進(jìn)。玩游戲本就是中國網(wǎng)民最普遍的娛樂方式之一,疫情期間更甚。據(jù)不完全統(tǒng)計,截至 2019 年,中國移動游戲用戶規(guī)模約 6.6 億人,占中國總網(wǎng)民規(guī)模 8.47 億的 77.92%,可見游戲作為一種低門檻、低成本的娛樂手段,已成為大部分人生活中習(xí)以為常的一部分。

對于玩家而言,市面上的游戲數(shù)量多如牛毛,那么玩家如何能發(fā)現(xiàn)和認(rèn)知到一款游戲,并且持續(xù)的玩下去恐怕是所有游戲廠商需要思考的問題。加之 2018 年游戲版號停發(fā)事件,游戲廠商更加珍惜每一個已獲得版號的游戲產(chǎn)品,所以這也使得“深度打磨產(chǎn)品質(zhì)量”和“提高運(yùn)營精細(xì)程度”這兩個游戲產(chǎn)業(yè)發(fā)展方向成為廣大游戲廠商的發(fā)展思路,無論是新游戲還是老游戲都在努力落實這兩點:

  • 新游戲:面向玩家需要提供更充足的推廣資源和更完整的游戲內(nèi)容。

  • 老游戲:通過用戶行為分析,投入更多的精力和成本,制作更優(yōu)質(zhì)的版本內(nèi)容。

這里我們重點來看新游戲。一家游戲企業(yè)辛辛苦苦研發(fā)三年,等著新游戲發(fā)售時一飛沖天。那么問題來了,新游戲如何被廣大玩家看到?

首先來看看游戲行業(yè)公司的分類:

  • 游戲研發(fā)商:研發(fā)游戲的公司,生產(chǎn)和制作游戲內(nèi)容。比如王者榮耀的所有英雄設(shè)計、游戲戰(zhàn)斗場景、戰(zhàn)斗邏輯等,全部由游戲研發(fā)公司提供。

  • 游戲發(fā)行商:游戲發(fā)行商的主要工作分三大塊:市場工作、運(yùn)營工作、客服工作。游戲發(fā)行商把控游戲命脈,市場工作核心是導(dǎo)入玩家,運(yùn)營工作核心是將用戶價值最大化、賺取更多利益。

  • 游戲平臺/渠道商:游戲平臺和渠道商的核心目的就是曝光游戲,讓盡量多的人能發(fā)現(xiàn)你的游戲。

這三種類型的業(yè)務(wù),有專注于其中某一領(lǐng)域的獨立公司,也有能承接全部業(yè)務(wù)的公司,但無論那一種,這三者之間的關(guān)系是不會變的:

所以不難理解,想讓更多的玩家看到你的游戲,游戲發(fā)行和運(yùn)營是關(guān)鍵。通俗來講,如果你的游戲出現(xiàn)在目前所有大家熟知的平臺廣告中,那么最起碼游戲的新用戶注冊數(shù)量是很可觀的。因此這就引入了一個關(guān)鍵詞:買量

根據(jù)數(shù)據(jù)顯示,2019 年月均買量手游數(shù)達(dá) 6000+ 款,而 2018 年僅為 4200 款。另一方面,隨著抖音、微博等超級 APP 在游戲買量市場的資源傾斜,也助推手游買量的效果和效率有所提升,游戲廠商更愿意使用買量的方式來吸引用戶。

但需要注意的是,在游戲買量的精準(zhǔn)化程度不斷提高的同時,買量的成本也在節(jié)節(jié)攀升,唯有合理配置買量、渠道與整合營銷之間的關(guān)系,才能將宣發(fā)資源發(fā)揮到最大的效果。

通俗來講,買量其實就是在各大主流平臺投放廣告,廣大用戶看到游戲廣告后,有可能會點擊廣告,然后進(jìn)入游戲廠商的宣傳頁面,同時會采集用戶的一些信息,然后游戲廠商對采集到的用戶信息進(jìn)行大數(shù)據(jù)分析,進(jìn)行進(jìn)一步的定向推廣。

游戲運(yùn)營核心訴求

游戲廠商花錢買量,換來的用戶信息以及新用戶注冊信息是為持續(xù)的游戲運(yùn)營服務(wù)的,那么這個場景的核心訴求就是采集用戶信息的完整性。

比如說,某游戲廠商一天花 5000w 投放廣告,在某平臺某時段產(chǎn)生了每秒 1w 次的廣告點擊率,那么在這個時段內(nèi)每一個點擊廣告的用戶信息要完整的被采集到,然后入庫進(jìn)行后續(xù)分析。這就對數(shù)據(jù)采集系統(tǒng)提出了很高的要求。

這其中,最核心的一點就是系統(tǒng)暴露接口的環(huán)節(jié)要能夠平穩(wěn)承載買量期間不定時的流量脈沖。在買量期間,游戲廠商通常會在多個平臺投放廣告,每個平臺投放廣告的時間是不一樣的,所以就會出現(xiàn)全天不定時的流量脈沖現(xiàn)象。如果這個環(huán)節(jié)出現(xiàn)問題,那么相當(dāng)于買量的錢就打水漂了。

數(shù)據(jù)采集系統(tǒng)傳統(tǒng)架構(gòu)

上圖是一個相對傳統(tǒng)的數(shù)據(jù)采集系統(tǒng)架構(gòu),最關(guān)鍵的就是暴露 HTTP 接口回傳數(shù)據(jù)這部分,這部分如果出問題,那么采集數(shù)據(jù)的鏈路就斷了。但這部分往往會面臨兩個挑戰(zhàn):

  • 當(dāng)流量脈沖來的時候,這部分是否可以快速擴(kuò)容以應(yīng)對流量沖擊。

  • 游戲運(yùn)營具備潮汐特性,并非天天都在進(jìn)行,這就需要考慮如何優(yōu)化資源利用率。

通常情況下,在游戲有運(yùn)營活動之前,會提前通知運(yùn)維同學(xué),對這個環(huán)節(jié)的服務(wù)增加節(jié)點,但要增加多少其實是無法預(yù)估的,只能大概拍一個數(shù)字。這是在傳統(tǒng)架構(gòu)下經(jīng)常會出現(xiàn)的場景,這就會導(dǎo)致兩個問題:

  • 流量太大,節(jié)點加少了,導(dǎo)致一部分流量的數(shù)據(jù)沒有采集到。

  • 流量沒有預(yù)期那么大,節(jié)點加多了,導(dǎo)致資源浪費(fèi)。

數(shù)據(jù)采集系統(tǒng) Serverless 架構(gòu)

我們可以通過函數(shù)計算 FC 來取代傳統(tǒng)架構(gòu)中暴露 HTTP 回傳數(shù)據(jù)這部分,從而完美解決傳統(tǒng)架構(gòu)中存在問題。

傳統(tǒng)架構(gòu)中的兩個問題均可以通過函數(shù)計算百毫秒彈性的特性來解決。我們并不需要去估算營銷活動會帶來多大的流量,也不需要去擔(dān)心和考慮對數(shù)據(jù)采集系統(tǒng)的性能,運(yùn)維同學(xué)更不需要提前預(yù)備 ECS。

因為函數(shù)計算的極致彈性特性,當(dāng)沒有買量、沒有營銷活動的時候,函數(shù)計算的運(yùn)行實例是零。有買量活動時,在流量脈沖的情況下,函數(shù)計算會快速拉起實例來承載流量壓力;當(dāng)流量減少時,函數(shù)計算會及時釋放沒有請求的實例進(jìn)行縮容。所以 Serverless 架構(gòu)帶來的優(yōu)勢有以下三點:

  • 無需運(yùn)維介入,研發(fā)同學(xué)就可以很快的搭建出來。

  • 無論流量大小,均可以平穩(wěn)的承接。

  • 函數(shù)計算拉起的實例數(shù)量可以緊貼流量大小的曲線,做到資源利用率最優(yōu)化,再加上按量計費(fèi)的模式,可以最大程度優(yōu)化成本。

架構(gòu)解析

從上面的架構(gòu)圖可以看到,整個采集數(shù)據(jù)階段,分了兩個函數(shù)來實現(xiàn),第一個函數(shù)的作用是單純的暴露 HTTP 接口接收數(shù)據(jù),第二個函數(shù)用于處理數(shù)據(jù),然后將數(shù)據(jù)發(fā)送至消息隊列 Kafka 和數(shù)據(jù)庫 RDS。

1. 接收數(shù)據(jù)函數(shù)

我們打開函數(shù)計算控制臺,創(chuàng)建一個函數(shù):

  • 函數(shù)類型:HTTP(即觸發(fā)器為 HTTP)

  • 函數(shù)名稱:receiveData

  • 運(yùn)行環(huán)境:Python3

  • 函數(shù)實例類型:彈性實例

  • 函數(shù)執(zhí)行內(nèi)存:512MB

  • 函數(shù)運(yùn)行超時時間:60 秒

  • 函數(shù)單實例并發(fā)度:1

  • 觸發(fā)器類型:HTTP 觸發(fā)器

  • 觸發(fā)器名稱:defaultTrigger

  • 認(rèn)證方式:anonymous(即無需認(rèn)證)

  • 請求方式:GET,POST

創(chuàng)建好函數(shù)之后,我們通過在線編輯器編寫代碼:

# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
HELLO_WORLD = b'Hello world!\n'
def handler(environ, start_response):
    logger = logging.getLogger() 
    context = environ['fc.context']
    request_uri = environ['fc.request_uri']
    for k, v in environ.items():
      if k.startswith('HTTP_'):
        # process custom request headers
        pass
    try:        
        request_body_size = int(environ.get('CONTENT_LENGTH', 0))    
    except (ValueError):        
        request_body_size = 0   
    # 接收回傳的數(shù)據(jù)
    request_body = environ['wsgi.input'].read(request_body_size)  
    request_body_str = urllib.parse.unquote(request_body.decode("GBK"))
    request_body_obj = json.loads(request_body_str)
    logger.info(request_body_obj["action"])
    logger.info(request_body_obj["articleAuthorId"])

    status = '200 OK'
    response_headers = [('Content-type', 'text/plain')]
    start_response(status, response_headers)
    return [HELLO_WORLD]

此時的代碼非常簡單,就是接收用戶傳來的參數(shù),我們可以調(diào)用接口進(jìn)行驗證:

可以在函數(shù)的日志查詢中看到此次調(diào)用的日志:

同時,我們也可以查看函數(shù)的鏈路追蹤來分析每一個步驟的調(diào)用耗時,比如函數(shù)接到請求→冷啟動(無活躍實例時)→準(zhǔn)備代碼→執(zhí)行初始化方法→執(zhí)行入口函數(shù)邏輯這個過程:

從調(diào)用鏈路圖中可以看到,剛才的那次請求包含了冷啟動的時間,因為當(dāng)時沒有活躍實例,整個過程耗時 418 毫秒,真正執(zhí)行入口函數(shù)代碼的時間為 8 毫秒。

當(dāng)再次調(diào)用接口時,可以看到就直接執(zhí)行了入口函數(shù)的邏輯,因為此時已經(jīng)有實例在運(yùn)行,整個耗時只有 2.3 毫秒

2. 處理數(shù)據(jù)的函數(shù)

第一個函數(shù)是通過在函數(shù)計算控制臺在界面上創(chuàng)建的,選擇了運(yùn)行環(huán)境是 Python3,我們可以在官方文檔中查看預(yù)置的 Python3 運(yùn)行環(huán)境內(nèi)置了哪些模塊,因為第二個函數(shù)要操作 Kafka 和 RDS,所以需要我們確認(rèn)對應(yīng)的模塊。

從文檔中可以看到,內(nèi)置的模塊中包含 RDS 的 SDK 模塊,但是沒有 Kafka 的 SDK 模塊,此時就需要我們手動安裝 Kafka SDK 模塊,并且創(chuàng)建函數(shù)也會使用另一種方式。

1)Funcraft

Funcraft 是一個用于支持 Serverless 應(yīng)用部署的命令行工具,能幫助我們便捷地管理函數(shù)計算、API 網(wǎng)關(guān)、日志服務(wù)等資源。它通過一個資源配置文件(template.yml),協(xié)助我們進(jìn)行開發(fā)、構(gòu)建、部署操作。
所以第二個函數(shù)我們需要使用 Fun 來進(jìn)行操作,整個操作分為四個步驟:

  • 安裝 Fun 工具。

  • 編寫 template.yml 模板文件,用來描述函數(shù)。

  • 安裝我們需要的第三方依賴。

  • 上傳部署函數(shù)。

2)安裝 Fun

Fun 提供了三種安裝方式:

  • 通過 npm 包管理安裝 —— 適合所有平臺(Windows/Mac/Linux)且已經(jīng)預(yù)裝了 npm 的開發(fā)者。

  • 通過下載二進(jìn)制安裝 —— 適合所有平臺(Windows/Mac/Linux)。

  • 通過 Homebrew 包管理器安裝 —— 適合 Mac 平臺,更符合 MacOS 開發(fā)者習(xí)慣。

文本示例環(huán)境為 Mac,所以使用 npm 方式安裝,非常的簡單,一行命令搞定:

sudo npm install @alicloud/fun -g

安裝完成之后。在控制終端輸入 fun 命令可以查看版本信息:

$ fun --version
3.6.20

在第一次使用 fun 之前需要先執(zhí)行 fun config 命令進(jìn)行配置,按照提示,依次配置 Account ID、Access Key Id、Secret Access Key、 Default Region Name 即可。其中 Account ID、Access Key Id 你可以從函數(shù)計算控制臺首頁的右上方獲得:

fun config

? Aliyun Account ID *01
? Aliyun Access Key ID *qef6j
? Aliyun Access Key Secret *UFJG
? Default region name cn-hangzhou
? The timeout in seconds for each SDK client invoking 60
? The maximum number of retries for each SDK client 3

3)編寫 template.yml

新建一個目錄,在該目錄下創(chuàng)建一個名為 template.yml 的 YAML 文件,該文件主要描述要創(chuàng)建的函數(shù)的各項配置,說白了就是將函數(shù)計算控制臺上配置的那些配置信息以 YAML 格式寫在文件里:

ROSTemplateFormatVersion: '2015-09-01'
Transform: 'Aliyun::Serverless-2018-04-03'
Resources:
FCBigDataDemo:
Type: 'Aliyun::Serverless::Service'
Properties:
Description: 'local invoke demo'
VpcConfig:
VpcId: 'vpc-xxxxxxxxxxx'
VSwitchIds: [ 'vsw-xxxxxxxxxx' ]
SecurityGroupId: 'sg-xxxxxxxxx'
LogConfig:
Project: fcdemo
Logstore: fc_demo_store
dataToKafka:
Type: 'Aliyun::Serverless::Function'
Properties:
Initializer: index.my_initializer
Handler: index.handler
CodeUri: './'
Description: ''
Runtime: python3

我們來解析以上文件的核心內(nèi)容:

  • FCBigDataDemo:自定義的服務(wù)名稱。通過下面的 Type 屬性標(biāo)明是服務(wù),即 Aliyun::Serverless::Service。

  • Properties:Properties 下的屬性都是該服務(wù)的各配置項。

  • VpcConfig:服務(wù)的 VPC 配置,包含:VpcId:VPC ID。VSwitchIds:交換機(jī) ID,這里是數(shù)組,可以配置多個交換機(jī)。SecurityGroupId:安全組 ID。

  • LogConfig:服務(wù)綁定的日志服務(wù)(SLS)配置,包含:Project:日志服務(wù)項目。Logstore:LogStore 名稱。

  • dataToKafka:該服務(wù)下自定義的函數(shù)名稱。通過下面的 Type 屬性標(biāo)明是函數(shù),即 Aliyun::Serverless::Function。

  • Properties:Properties下的屬性都是該函數(shù)的各配置項。

  • Initializer:配置初始化函數(shù)。

  • Handler:配置入口函數(shù)。

  • Runtime:函數(shù)運(yùn)行環(huán)境。

4)安裝第三方依賴

服務(wù)和函數(shù)的模板創(chuàng)建好之后,我們來安裝需要使用的第三方依賴。在這個示例的場景中,第二個函數(shù)需要使用 Kafka SDK,所以可以通過 fun 工具結(jié)合 Python 包管理工具 pip 進(jìn)行安裝:

fun install --runtime python3 --package-type pip kafka-python

執(zhí)行命令后有提示信息

此時我們會發(fā)現(xiàn)在目錄下會生成一個.fun文件夾 ,我們安裝的依賴包就在該目錄下:

5)部署函數(shù)

現(xiàn)在編寫好了模板文件以及安裝好了我們需要的 Kafka SDK 后,還需要添加我們的代碼文件 index.py,代碼內(nèi)容如下:

# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
from kafka import KafkaProducer
producer = None
def my_initializer(context):    
    logger = logging.getLogger() 
    logger.info("init kafka producer")
    global producer
    producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092')
def handler(event, context):
    logger = logging.getLogger()   
    # 接收回傳的數(shù)據(jù)
    event_str = json.loads(event)
    event_obj = json.loads(event_str)
    logger.info(event_obj["action"])
    logger.info(event_obj["articleAuthorId"])
    # 向Kafka發(fā)送消息
    global producer
    producer.send('ikf-demo', json.dumps(event_str).encode('utf-8'))
    producer.close()
    return 'hello world'

代碼很簡單,這里做以簡單的解析:

  • my_initializer:函數(shù)實例被拉起時會先執(zhí)行該函數(shù),然后再執(zhí)行 handler 函數(shù) ,當(dāng)函數(shù)實例在運(yùn)行時,之后的請求都不會執(zhí)行 my_initializer 函數(shù) 。一般用于各種連接的初始化工作,這里將初始化 Kafka Producer 的方法放在了這里,避免反復(fù)初始化 Produer。

  •  handler:該函數(shù)只有兩個邏輯,接收回傳的數(shù)據(jù)和將數(shù)據(jù)發(fā)送至 Kafka 的指定 Topic。

  • 下面通過 fun deploy 命令部署函數(shù),該命令會做兩件事:根據(jù) template.yml 中的配置創(chuàng)建服務(wù)和函數(shù)。將 index.py 和 .fun 上傳至函數(shù)中。

登錄函數(shù)計算控制臺,可以看到通過 fun 命令部署的服務(wù)和函數(shù)

進(jìn)入函數(shù),也可以清晰的看到第三方依賴包的目錄結(jié)構(gòu)

3. 函數(shù)之間調(diào)用

目前兩個函數(shù)都創(chuàng)建好了,下面的工作就是由第一個函數(shù)接收到數(shù)據(jù)后拉起第二個函數(shù)發(fā)送消息給 Kafka。我們只需要對第一個函數(shù)做些許改動即可:

# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
import fc2
HELLO_WORLD = b'Hello world!\n'
client = None
def my_initializer(context):    
    logger = logging.getLogger() 
    logger.info("init fc client")
    global client
    client = fc2.Client(
        endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com",
        accessKeyID="your_ak",
        accessKeySecret="your_sk"
    )
def handler(environ, start_response):
    logger = logging.getLogger() 
    context = environ['fc.context']
    request_uri = environ['fc.request_uri']
    for k, v in environ.items():
      if k.startswith('HTTP_'):
        # process custom request headers
        pass
    try:        
        request_body_size = int(environ.get('CONTENT_LENGTH', 0))    
    except (ValueError):        
        request_body_size = 0   
    # 接收回傳的數(shù)據(jù)
    request_body = environ['wsgi.input'].read(request_body_size)  
    request_body_str = urllib.parse.unquote(request_body.decode("GBK"))
    request_body_obj = json.loads(request_body_str)
    logger.info(request_body_obj["action"])
    logger.info(request_body_obj["articleAuthorId"])
    global client
    client.invoke_function(
        'FCBigDataDemo',
        'dataToKafka',
        payload=json.dumps(request_body_str),
        headers = {'x-fc-invocation-type': 'Async'}
    )

    status = '200 OK'
    response_headers = [('Content-type', 'text/plain')]
    start_response(status, response_headers)
    return [HELLO_WORLD]

如上面代碼所示,對第一個函數(shù)的代碼做了三個地方的改動:

  • 導(dǎo)入函數(shù)計算的庫:import fc2

  • 添加初始化方法,用于創(chuàng)建函數(shù)計算 Client:

def my_initializer(context):
        logger = logging.getLogger()
        logger.info("init fc client")
        global client
        client = fc2.Client(
            endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com",
            accessKeyID="your_ak",
            accessKeySecret="your_sk"
)

這里需要注意的時,當(dāng)我們在代碼里增加了初始化方法后,需要在函數(shù)配置中指定初始化方法的入口

  • 通過函數(shù)計算 Client 調(diào)用第二個函數(shù)

global client
    client.invoke_function(
            'FCBigDataDemo',
            'dataToKafka',
          payload=json.dumps(request_body_str),
            headers = {'x-fc-invocation-type': 'Async'}
)

invoke_function 函數(shù)有四個參數(shù):

  • 第一個參數(shù):調(diào)用函數(shù)所在的服務(wù)名稱。

  • 第二個參數(shù):調(diào)用函數(shù)的函數(shù)名稱。

  • 第三個參數(shù):向調(diào)用函數(shù)傳的數(shù)據(jù)。

  • 第四個參數(shù):調(diào)用第二個函數(shù) Request Header 信息。這里主要通過 x-fc-invocation-type 這個 Key 來設(shè)置是同步調(diào)用還是異步調(diào)用。這里設(shè)置 Async 為異步調(diào)用。

如此設(shè)置,我們便可以驗證通過第一個函數(shù)提供的 HTTP 接口發(fā)起請求→采集數(shù)據(jù)→調(diào)用第二個函數(shù)→將數(shù)據(jù)作為消息傳給 Kafka 這個流程了。

使用兩個函數(shù)的目的

到這里有些同學(xué)可能會有疑問,為什么需要兩個函數(shù),而不在第一個函數(shù)里直接向 Kafka 發(fā)送數(shù)據(jù)呢?

當(dāng)我們使用異步調(diào)用函數(shù)時,在函數(shù)內(nèi)部會默認(rèn)先將請求的數(shù)據(jù)放入消息隊列進(jìn)行第一道削峰填谷,然后每一個隊列在對應(yīng)函數(shù)實例,通過函數(shù)實例的彈性拉起多個實例進(jìn)行第二道削峰填谷。所以這也就是為什么這個架構(gòu)能穩(wěn)定承載大并發(fā)請求的核心原因之一。

4. 配置 Kafka

在游戲運(yùn)營這個場景中,數(shù)據(jù)量是比較大的,所以對 Kafka 的性能要求也是比較高的,相比開源自建,使用云上的 Kafka 省去很多的運(yùn)維操作,比如:

  • 我們不再需要再維護(hù) Kafka 集群的各個節(jié)點。

  • 不需要關(guān)心主從節(jié)點數(shù)據(jù)同步問題。

  • 可以快速、動態(tài)擴(kuò)展 Kafka 集群規(guī)格,動態(tài)增加 Topic,動態(tài)增加分區(qū)數(shù)。

  • 完善的指標(biāo)監(jiān)控功能,消息查詢功能。

總的來說,就是一切 SLA 都有云上兜底,我們只需要關(guān)注在消息發(fā)送和消息消費(fèi)即可。

所以我們可以打開 Kafka 開通界面,根據(jù)實際場景的需求一鍵開通 Kafka 實例,開通 Kafka 后登錄控制臺,在基本信息中可以看到 Kafka 的接入點:

  • 默認(rèn)接入點:走 VPC 內(nèi)網(wǎng)場景的接入點。

  • SSL 接入點:走公網(wǎng)場景的接入點。

將默認(rèn)接入點配置到函數(shù)計算的第二個函數(shù)中即可。

....
producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092')
....

然后點擊左側(cè)控制臺 Topic 管理,創(chuàng)建 Topic

將創(chuàng)建好的 Topic 配置到函數(shù)計算的第二個函數(shù)中即可。

...
# 第一個參數(shù)為Topic名稱
producer.send('ikf-demo', json.dumps(event_str).encode('utf-8'))
...

上文已經(jīng)列舉過云上 Kafka 的優(yōu)勢,比如動態(tài)增加 Topic 的分區(qū)數(shù),我們可以在 Topic 列表中,對 Topic 的分區(qū)數(shù)進(jìn)行動態(tài)調(diào)整。

單 Topic 最大支持到 360 個分區(qū),這是開源自建無法做到的。

接下來點擊控制臺左側(cè) Consumer Group 管理,創(chuàng)建 Consumer Group。

至此,云上的 Kafka 就算配置完畢了,即 Producer 可以往剛剛創(chuàng)建的 Topic 中發(fā)消息了,Consumer 可以設(shè)置剛剛創(chuàng)建的 GID 以及訂閱 Topic 進(jìn)行消息接受和消費(fèi)。

Flink Kafka 消費(fèi)者

在這個場景中,Kafka 后面往往會跟著 Flink,所以這里簡要給大家介紹一下在 Flink 中如何創(chuàng)建 Kafka Consumer 并消費(fèi)數(shù)據(jù)。代碼片段如下:

final ParameterTool parameterTool = ParameterTool.fromArgs(args);
String kafkaTopic = parameterTool.get("kafka-topic","ikf-demo");
String brokers = parameterTool.get("brokers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092");
Properties kafkaProps = new Properties();
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "ikf-demo");
FlinkKafkaConsumer kafka = new FlinkKafkaConsumer<>(kafkaTopic, new UserBehaviorEventSchema(), kafkaProps);
kafka.setStartFromLatest();
kafka.setCommitOffsetsOnCheckpoints(false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource dataStreamByEventTime = env.addSource(kafka);

以上就是構(gòu)建 Flink Kafka Consumer 和添加 Kafka Source 的代碼片段,還是非常簡單的。

壓測驗證

至此,整個數(shù)據(jù)采集的架構(gòu)就搭建完畢了,下面我們通過壓測來檢驗一下整個架構(gòu)的性能。這里使用阿里云 PTS 來進(jìn)行壓測。

創(chuàng)建壓測場景

打開 PTS 控制臺,點擊左側(cè)菜單創(chuàng)建壓測/創(chuàng)建 PTS 場景

在場景配置中,將第一個函數(shù)計算函數(shù)暴露的 HTTP 接口作為串聯(lián)鏈路。

接口配置完后,我們來配置施壓

  • 壓力模式:并發(fā)模式:指定有多少并發(fā)用戶同時發(fā)請求。RPS模式:指定每秒有多少請求數(shù)。

  • 遞增模式:在壓測過程中可以通過手動調(diào)節(jié)壓力,也可以自動按百分比遞增壓力。

  • 最大并發(fā):同時有多少個虛擬用戶發(fā)起請求。

  • 遞增百分比:如果是自動遞增的話,按這里的百分比遞增。

  • 單量級持續(xù)時長:在未完全達(dá)到壓力全量的時候,每一級梯度的壓力保持的時長。

  • 壓測總時長:一共需要壓測的時長。

這里因為資源成本原因,并發(fā)用戶數(shù)設(shè)置為 2500 來進(jìn)行驗證。

從上圖壓測中的情況來看,TPS 達(dá)到了 2w 的封頂,549w+ 的請求,99.99% 的請求是成功的,那 369 個異常也可以點擊查看,都是壓測工具請求超時導(dǎo)致的。

至此,整個基于 Serverless 搭建的大數(shù)據(jù)采集傳輸?shù)募軜?gòu)就搭建好了,并且進(jìn)行了壓測驗證,整體的性能也是不錯的,并且整個架構(gòu)搭建起來也非常簡單和容易理解。這個架構(gòu)不光適用于游戲運(yùn)營行業(yè),其實任何大數(shù)據(jù)采集傳輸?shù)膱鼍岸际沁m用的,目前也已經(jīng)有很多客戶正在基于 Serverless 的架構(gòu)跑在生產(chǎn)環(huán)境,或者正走在改造 Serverless 架構(gòu)的路上。

以上就是Serverless如何解決數(shù)據(jù)采集分析痛點,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


分享標(biāo)題:Serverless如何解決數(shù)據(jù)采集分析痛點
新聞來源:http://weahome.cn/article/gpjssj.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部