本篇文章給大家分享的是有關(guān)Serverless如何解決數(shù)據(jù)采集分析痛點,小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
成都創(chuàng)新互聯(lián)主營本溪網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,成都APP應(yīng)用開發(fā),本溪h5小程序開發(fā)搭建,本溪網(wǎng)站營銷推廣歡迎本溪等地區(qū)企業(yè)咨詢
簡介:眾所周知,游戲行業(yè)在當(dāng)今的互聯(lián)網(wǎng)行業(yè)中算是一棵常青樹。在疫情之前的 2019 年,中國游戲市場營收規(guī)模約 2884.8 億元,同比增長 17.1%。2020 年因為疫情,游戲行業(yè)更是突飛猛進(jìn)。玩游戲本就是中國網(wǎng)民最普遍的娛樂方式之一,疫情期間更甚。據(jù)不完全統(tǒng)計,截至 2019 年,中國移動游戲用戶規(guī)模約 6.6 億人,占中國總網(wǎng)民規(guī)模 8.47 億的 77.92%,可見游戲作為一種低門檻、低成本的娛樂手段,已成為大部分人生活中習(xí)以為常的一部分。
眾所周知,游戲行業(yè)在當(dāng)今的互聯(lián)網(wǎng)行業(yè)中算是一棵常青樹。在疫情之前的 2019 年,中國游戲市場營收規(guī)模約 2884.8 億元,同比增長 17.1%。2020 年因為疫情,游戲行業(yè)更是突飛猛進(jìn)。玩游戲本就是中國網(wǎng)民最普遍的娛樂方式之一,疫情期間更甚。據(jù)不完全統(tǒng)計,截至 2019 年,中國移動游戲用戶規(guī)模約 6.6 億人,占中國總網(wǎng)民規(guī)模 8.47 億的 77.92%,可見游戲作為一種低門檻、低成本的娛樂手段,已成為大部分人生活中習(xí)以為常的一部分。
對于玩家而言,市面上的游戲數(shù)量多如牛毛,那么玩家如何能發(fā)現(xiàn)和認(rèn)知到一款游戲,并且持續(xù)的玩下去恐怕是所有游戲廠商需要思考的問題。加之 2018 年游戲版號停發(fā)事件,游戲廠商更加珍惜每一個已獲得版號的游戲產(chǎn)品,所以這也使得“深度打磨產(chǎn)品質(zhì)量”和“提高運(yùn)營精細(xì)程度”這兩個游戲產(chǎn)業(yè)發(fā)展方向成為廣大游戲廠商的發(fā)展思路,無論是新游戲還是老游戲都在努力落實這兩點:
新游戲:面向玩家需要提供更充足的推廣資源和更完整的游戲內(nèi)容。
老游戲:通過用戶行為分析,投入更多的精力和成本,制作更優(yōu)質(zhì)的版本內(nèi)容。
這里我們重點來看新游戲。一家游戲企業(yè)辛辛苦苦研發(fā)三年,等著新游戲發(fā)售時一飛沖天。那么問題來了,新游戲如何被廣大玩家看到?
首先來看看游戲行業(yè)公司的分類:
游戲研發(fā)商:研發(fā)游戲的公司,生產(chǎn)和制作游戲內(nèi)容。比如王者榮耀的所有英雄設(shè)計、游戲戰(zhàn)斗場景、戰(zhàn)斗邏輯等,全部由游戲研發(fā)公司提供。
游戲發(fā)行商:游戲發(fā)行商的主要工作分三大塊:市場工作、運(yùn)營工作、客服工作。游戲發(fā)行商把控游戲命脈,市場工作核心是導(dǎo)入玩家,運(yùn)營工作核心是將用戶價值最大化、賺取更多利益。
游戲平臺/渠道商:游戲平臺和渠道商的核心目的就是曝光游戲,讓盡量多的人能發(fā)現(xiàn)你的游戲。
這三種類型的業(yè)務(wù),有專注于其中某一領(lǐng)域的獨立公司,也有能承接全部業(yè)務(wù)的公司,但無論那一種,這三者之間的關(guān)系是不會變的:
所以不難理解,想讓更多的玩家看到你的游戲,游戲發(fā)行和運(yùn)營是關(guān)鍵。通俗來講,如果你的游戲出現(xiàn)在目前所有大家熟知的平臺廣告中,那么最起碼游戲的新用戶注冊數(shù)量是很可觀的。因此這就引入了一個關(guān)鍵詞:買量。
根據(jù)數(shù)據(jù)顯示,2019 年月均買量手游數(shù)達(dá) 6000+ 款,而 2018 年僅為 4200 款。另一方面,隨著抖音、微博等超級 APP 在游戲買量市場的資源傾斜,也助推手游買量的效果和效率有所提升,游戲廠商更愿意使用買量的方式來吸引用戶。
但需要注意的是,在游戲買量的精準(zhǔn)化程度不斷提高的同時,買量的成本也在節(jié)節(jié)攀升,唯有合理配置買量、渠道與整合營銷之間的關(guān)系,才能將宣發(fā)資源發(fā)揮到最大的效果。
通俗來講,買量其實就是在各大主流平臺投放廣告,廣大用戶看到游戲廣告后,有可能會點擊廣告,然后進(jìn)入游戲廠商的宣傳頁面,同時會采集用戶的一些信息,然后游戲廠商對采集到的用戶信息進(jìn)行大數(shù)據(jù)分析,進(jìn)行進(jìn)一步的定向推廣。
游戲廠商花錢買量,換來的用戶信息以及新用戶注冊信息是為持續(xù)的游戲運(yùn)營服務(wù)的,那么這個場景的核心訴求就是采集用戶信息的完整性。
比如說,某游戲廠商一天花 5000w 投放廣告,在某平臺某時段產(chǎn)生了每秒 1w 次的廣告點擊率,那么在這個時段內(nèi)每一個點擊廣告的用戶信息要完整的被采集到,然后入庫進(jìn)行后續(xù)分析。這就對數(shù)據(jù)采集系統(tǒng)提出了很高的要求。
這其中,最核心的一點就是系統(tǒng)暴露接口的環(huán)節(jié)要能夠平穩(wěn)承載買量期間不定時的流量脈沖。在買量期間,游戲廠商通常會在多個平臺投放廣告,每個平臺投放廣告的時間是不一樣的,所以就會出現(xiàn)全天不定時的流量脈沖現(xiàn)象。如果這個環(huán)節(jié)出現(xiàn)問題,那么相當(dāng)于買量的錢就打水漂了。
上圖是一個相對傳統(tǒng)的數(shù)據(jù)采集系統(tǒng)架構(gòu),最關(guān)鍵的就是暴露 HTTP 接口回傳數(shù)據(jù)這部分,這部分如果出問題,那么采集數(shù)據(jù)的鏈路就斷了。但這部分往往會面臨兩個挑戰(zhàn):
當(dāng)流量脈沖來的時候,這部分是否可以快速擴(kuò)容以應(yīng)對流量沖擊。
游戲運(yùn)營具備潮汐特性,并非天天都在進(jìn)行,這就需要考慮如何優(yōu)化資源利用率。
通常情況下,在游戲有運(yùn)營活動之前,會提前通知運(yùn)維同學(xué),對這個環(huán)節(jié)的服務(wù)增加節(jié)點,但要增加多少其實是無法預(yù)估的,只能大概拍一個數(shù)字。這是在傳統(tǒng)架構(gòu)下經(jīng)常會出現(xiàn)的場景,這就會導(dǎo)致兩個問題:
流量太大,節(jié)點加少了,導(dǎo)致一部分流量的數(shù)據(jù)沒有采集到。
流量沒有預(yù)期那么大,節(jié)點加多了,導(dǎo)致資源浪費(fèi)。
我們可以通過函數(shù)計算 FC 來取代傳統(tǒng)架構(gòu)中暴露 HTTP 回傳數(shù)據(jù)這部分,從而完美解決傳統(tǒng)架構(gòu)中存在問題。
傳統(tǒng)架構(gòu)中的兩個問題均可以通過函數(shù)計算百毫秒彈性的特性來解決。我們并不需要去估算營銷活動會帶來多大的流量,也不需要去擔(dān)心和考慮對數(shù)據(jù)采集系統(tǒng)的性能,運(yùn)維同學(xué)更不需要提前預(yù)備 ECS。
因為函數(shù)計算的極致彈性特性,當(dāng)沒有買量、沒有營銷活動的時候,函數(shù)計算的運(yùn)行實例是零。有買量活動時,在流量脈沖的情況下,函數(shù)計算會快速拉起實例來承載流量壓力;當(dāng)流量減少時,函數(shù)計算會及時釋放沒有請求的實例進(jìn)行縮容。所以 Serverless 架構(gòu)帶來的優(yōu)勢有以下三點:
無需運(yùn)維介入,研發(fā)同學(xué)就可以很快的搭建出來。
無論流量大小,均可以平穩(wěn)的承接。
函數(shù)計算拉起的實例數(shù)量可以緊貼流量大小的曲線,做到資源利用率最優(yōu)化,再加上按量計費(fèi)的模式,可以最大程度優(yōu)化成本。
從上面的架構(gòu)圖可以看到,整個采集數(shù)據(jù)階段,分了兩個函數(shù)來實現(xiàn),第一個函數(shù)的作用是單純的暴露 HTTP 接口接收數(shù)據(jù),第二個函數(shù)用于處理數(shù)據(jù),然后將數(shù)據(jù)發(fā)送至消息隊列 Kafka 和數(shù)據(jù)庫 RDS。
我們打開函數(shù)計算控制臺,創(chuàng)建一個函數(shù):
函數(shù)類型:HTTP(即觸發(fā)器為 HTTP)
函數(shù)名稱:receiveData
運(yùn)行環(huán)境:Python3
函數(shù)實例類型:彈性實例
函數(shù)執(zhí)行內(nèi)存:512MB
函數(shù)運(yùn)行超時時間:60 秒
函數(shù)單實例并發(fā)度:1
觸發(fā)器類型:HTTP 觸發(fā)器
觸發(fā)器名稱:defaultTrigger
認(rèn)證方式:anonymous(即無需認(rèn)證)
請求方式:GET,POST
創(chuàng)建好函數(shù)之后,我們通過在線編輯器編寫代碼:
# -*- coding: utf-8 -*- import logging import json import urllib.parse HELLO_WORLD = b'Hello world!\n' def handler(environ, start_response): logger = logging.getLogger() context = environ['fc.context'] request_uri = environ['fc.request_uri'] for k, v in environ.items(): if k.startswith('HTTP_'): # process custom request headers pass try: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except (ValueError): request_body_size = 0 # 接收回傳的數(shù)據(jù) request_body = environ['wsgi.input'].read(request_body_size) request_body_str = urllib.parse.unquote(request_body.decode("GBK")) request_body_obj = json.loads(request_body_str) logger.info(request_body_obj["action"]) logger.info(request_body_obj["articleAuthorId"]) status = '200 OK' response_headers = [('Content-type', 'text/plain')] start_response(status, response_headers) return [HELLO_WORLD]
此時的代碼非常簡單,就是接收用戶傳來的參數(shù),我們可以調(diào)用接口進(jìn)行驗證:
可以在函數(shù)的日志查詢中看到此次調(diào)用的日志:
同時,我們也可以查看函數(shù)的鏈路追蹤來分析每一個步驟的調(diào)用耗時,比如函數(shù)接到請求→冷啟動(無活躍實例時)→準(zhǔn)備代碼→執(zhí)行初始化方法→執(zhí)行入口函數(shù)邏輯這個過程:
從調(diào)用鏈路圖中可以看到,剛才的那次請求包含了冷啟動的時間,因為當(dāng)時沒有活躍實例,整個過程耗時 418 毫秒,真正執(zhí)行入口函數(shù)代碼的時間為 8 毫秒。
當(dāng)再次調(diào)用接口時,可以看到就直接執(zhí)行了入口函數(shù)的邏輯,因為此時已經(jīng)有實例在運(yùn)行,整個耗時只有 2.3 毫秒
2. 處理數(shù)據(jù)的函數(shù)
第一個函數(shù)是通過在函數(shù)計算控制臺在界面上創(chuàng)建的,選擇了運(yùn)行環(huán)境是 Python3,我們可以在官方文檔中查看預(yù)置的 Python3 運(yùn)行環(huán)境內(nèi)置了哪些模塊,因為第二個函數(shù)要操作 Kafka 和 RDS,所以需要我們確認(rèn)對應(yīng)的模塊。
從文檔中可以看到,內(nèi)置的模塊中包含 RDS 的 SDK 模塊,但是沒有 Kafka 的 SDK 模塊,此時就需要我們手動安裝 Kafka SDK 模塊,并且創(chuàng)建函數(shù)也會使用另一種方式。
Funcraft 是一個用于支持 Serverless 應(yīng)用部署的命令行工具,能幫助我們便捷地管理函數(shù)計算、API 網(wǎng)關(guān)、日志服務(wù)等資源。它通過一個資源配置文件(template.yml),協(xié)助我們進(jìn)行開發(fā)、構(gòu)建、部署操作。
所以第二個函數(shù)我們需要使用 Fun 來進(jìn)行操作,整個操作分為四個步驟:
安裝 Fun 工具。
編寫 template.yml 模板文件,用來描述函數(shù)。
安裝我們需要的第三方依賴。
上傳部署函數(shù)。
Fun 提供了三種安裝方式:
通過 npm 包管理安裝 —— 適合所有平臺(Windows/Mac/Linux)且已經(jīng)預(yù)裝了 npm 的開發(fā)者。
通過下載二進(jìn)制安裝 —— 適合所有平臺(Windows/Mac/Linux)。
通過 Homebrew 包管理器安裝 —— 適合 Mac 平臺,更符合 MacOS 開發(fā)者習(xí)慣。
文本示例環(huán)境為 Mac,所以使用 npm 方式安裝,非常的簡單,一行命令搞定:
sudo npm install @alicloud/fun -g
安裝完成之后。在控制終端輸入 fun 命令可以查看版本信息:
$ fun --version 3.6.20
在第一次使用 fun 之前需要先執(zhí)行 fun config 命令進(jìn)行配置,按照提示,依次配置 Account ID、Access Key Id、Secret Access Key、 Default Region Name 即可。其中 Account ID、Access Key Id 你可以從函數(shù)計算控制臺首頁的右上方獲得:
fun config
? Aliyun Account ID *01
? Aliyun Access Key ID *qef6j
? Aliyun Access Key Secret *UFJG
? Default region name cn-hangzhou
? The timeout in seconds for each SDK client invoking 60
? The maximum number of retries for each SDK client 3
新建一個目錄,在該目錄下創(chuàng)建一個名為 template.yml 的 YAML 文件,該文件主要描述要創(chuàng)建的函數(shù)的各項配置,說白了就是將函數(shù)計算控制臺上配置的那些配置信息以 YAML 格式寫在文件里:
ROSTemplateFormatVersion: '2015-09-01' Transform: 'Aliyun::Serverless-2018-04-03' Resources: FCBigDataDemo: Type: 'Aliyun::Serverless::Service' Properties: Description: 'local invoke demo' VpcConfig: VpcId: 'vpc-xxxxxxxxxxx' VSwitchIds: [ 'vsw-xxxxxxxxxx' ] SecurityGroupId: 'sg-xxxxxxxxx' LogConfig: Project: fcdemo Logstore: fc_demo_store dataToKafka: Type: 'Aliyun::Serverless::Function' Properties: Initializer: index.my_initializer Handler: index.handler CodeUri: './' Description: '' Runtime: python3
我們來解析以上文件的核心內(nèi)容:
FCBigDataDemo:自定義的服務(wù)名稱。通過下面的 Type 屬性標(biāo)明是服務(wù),即 Aliyun::Serverless::Service。
Properties:Properties 下的屬性都是該服務(wù)的各配置項。
VpcConfig:服務(wù)的 VPC 配置,包含:VpcId:VPC ID。VSwitchIds:交換機(jī) ID,這里是數(shù)組,可以配置多個交換機(jī)。SecurityGroupId:安全組 ID。
LogConfig:服務(wù)綁定的日志服務(wù)(SLS)配置,包含:Project:日志服務(wù)項目。Logstore:LogStore 名稱。
dataToKafka:該服務(wù)下自定義的函數(shù)名稱。通過下面的 Type 屬性標(biāo)明是函數(shù),即 Aliyun::Serverless::Function。
Properties:Properties下的屬性都是該函數(shù)的各配置項。
Initializer:配置初始化函數(shù)。
Handler:配置入口函數(shù)。
Runtime:函數(shù)運(yùn)行環(huán)境。
服務(wù)和函數(shù)的模板創(chuàng)建好之后,我們來安裝需要使用的第三方依賴。在這個示例的場景中,第二個函數(shù)需要使用 Kafka SDK,所以可以通過 fun 工具結(jié)合 Python 包管理工具 pip 進(jìn)行安裝:
fun install --runtime python3 --package-type pip kafka-python
執(zhí)行命令后有提示信息
此時我們會發(fā)現(xiàn)在目錄下會生成一個.fun文件夾 ,我們安裝的依賴包就在該目錄下:
現(xiàn)在編寫好了模板文件以及安裝好了我們需要的 Kafka SDK 后,還需要添加我們的代碼文件 index.py,代碼內(nèi)容如下:
# -*- coding: utf-8 -*- import logging import json import urllib.parse from kafka import KafkaProducer producer = None def my_initializer(context): logger = logging.getLogger() logger.info("init kafka producer") global producer producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092') def handler(event, context): logger = logging.getLogger() # 接收回傳的數(shù)據(jù) event_str = json.loads(event) event_obj = json.loads(event_str) logger.info(event_obj["action"]) logger.info(event_obj["articleAuthorId"]) # 向Kafka發(fā)送消息 global producer producer.send('ikf-demo', json.dumps(event_str).encode('utf-8')) producer.close() return 'hello world'
代碼很簡單,這里做以簡單的解析:
my_initializer:函數(shù)實例被拉起時會先執(zhí)行該函數(shù),然后再執(zhí)行 handler 函數(shù) ,當(dāng)函數(shù)實例在運(yùn)行時,之后的請求都不會執(zhí)行 my_initializer 函數(shù) 。一般用于各種連接的初始化工作,這里將初始化 Kafka Producer 的方法放在了這里,避免反復(fù)初始化 Produer。
handler:該函數(shù)只有兩個邏輯,接收回傳的數(shù)據(jù)和將數(shù)據(jù)發(fā)送至 Kafka 的指定 Topic。
下面通過 fun deploy 命令部署函數(shù),該命令會做兩件事:根據(jù) template.yml 中的配置創(chuàng)建服務(wù)和函數(shù)。將 index.py 和 .fun 上傳至函數(shù)中。
登錄函數(shù)計算控制臺,可以看到通過 fun 命令部署的服務(wù)和函數(shù)
進(jìn)入函數(shù),也可以清晰的看到第三方依賴包的目錄結(jié)構(gòu)
目前兩個函數(shù)都創(chuàng)建好了,下面的工作就是由第一個函數(shù)接收到數(shù)據(jù)后拉起第二個函數(shù)發(fā)送消息給 Kafka。我們只需要對第一個函數(shù)做些許改動即可:
# -*- coding: utf-8 -*- import logging import json import urllib.parse import fc2 HELLO_WORLD = b'Hello world!\n' client = None def my_initializer(context): logger = logging.getLogger() logger.info("init fc client") global client client = fc2.Client( endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com", accessKeyID="your_ak", accessKeySecret="your_sk" ) def handler(environ, start_response): logger = logging.getLogger() context = environ['fc.context'] request_uri = environ['fc.request_uri'] for k, v in environ.items(): if k.startswith('HTTP_'): # process custom request headers pass try: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except (ValueError): request_body_size = 0 # 接收回傳的數(shù)據(jù) request_body = environ['wsgi.input'].read(request_body_size) request_body_str = urllib.parse.unquote(request_body.decode("GBK")) request_body_obj = json.loads(request_body_str) logger.info(request_body_obj["action"]) logger.info(request_body_obj["articleAuthorId"]) global client client.invoke_function( 'FCBigDataDemo', 'dataToKafka', payload=json.dumps(request_body_str), headers = {'x-fc-invocation-type': 'Async'} ) status = '200 OK' response_headers = [('Content-type', 'text/plain')] start_response(status, response_headers) return [HELLO_WORLD]
如上面代碼所示,對第一個函數(shù)的代碼做了三個地方的改動:
導(dǎo)入函數(shù)計算的庫:import fc2
添加初始化方法,用于創(chuàng)建函數(shù)計算 Client:
def my_initializer(context): logger = logging.getLogger() logger.info("init fc client") global client client = fc2.Client( endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com", accessKeyID="your_ak", accessKeySecret="your_sk" )
這里需要注意的時,當(dāng)我們在代碼里增加了初始化方法后,需要在函數(shù)配置中指定初始化方法的入口
通過函數(shù)計算 Client 調(diào)用第二個函數(shù)
global client client.invoke_function( 'FCBigDataDemo', 'dataToKafka', payload=json.dumps(request_body_str), headers = {'x-fc-invocation-type': 'Async'} )
invoke_function 函數(shù)有四個參數(shù):
第一個參數(shù):調(diào)用函數(shù)所在的服務(wù)名稱。
第二個參數(shù):調(diào)用函數(shù)的函數(shù)名稱。
第三個參數(shù):向調(diào)用函數(shù)傳的數(shù)據(jù)。
第四個參數(shù):調(diào)用第二個函數(shù) Request Header 信息。這里主要通過 x-fc-invocation-type 這個 Key 來設(shè)置是同步調(diào)用還是異步調(diào)用。這里設(shè)置 Async 為異步調(diào)用。
如此設(shè)置,我們便可以驗證通過第一個函數(shù)提供的 HTTP 接口發(fā)起請求→采集數(shù)據(jù)→調(diào)用第二個函數(shù)→將數(shù)據(jù)作為消息傳給 Kafka 這個流程了。
到這里有些同學(xué)可能會有疑問,為什么需要兩個函數(shù),而不在第一個函數(shù)里直接向 Kafka 發(fā)送數(shù)據(jù)呢?
當(dāng)我們使用異步調(diào)用函數(shù)時,在函數(shù)內(nèi)部會默認(rèn)先將請求的數(shù)據(jù)放入消息隊列進(jìn)行第一道削峰填谷,然后每一個隊列在對應(yīng)函數(shù)實例,通過函數(shù)實例的彈性拉起多個實例進(jìn)行第二道削峰填谷。所以這也就是為什么這個架構(gòu)能穩(wěn)定承載大并發(fā)請求的核心原因之一。
在游戲運(yùn)營這個場景中,數(shù)據(jù)量是比較大的,所以對 Kafka 的性能要求也是比較高的,相比開源自建,使用云上的 Kafka 省去很多的運(yùn)維操作,比如:
我們不再需要再維護(hù) Kafka 集群的各個節(jié)點。
不需要關(guān)心主從節(jié)點數(shù)據(jù)同步問題。
可以快速、動態(tài)擴(kuò)展 Kafka 集群規(guī)格,動態(tài)增加 Topic,動態(tài)增加分區(qū)數(shù)。
完善的指標(biāo)監(jiān)控功能,消息查詢功能。
總的來說,就是一切 SLA 都有云上兜底,我們只需要關(guān)注在消息發(fā)送和消息消費(fèi)即可。
所以我們可以打開 Kafka 開通界面,根據(jù)實際場景的需求一鍵開通 Kafka 實例,開通 Kafka 后登錄控制臺,在基本信息中可以看到 Kafka 的接入點:
默認(rèn)接入點:走 VPC 內(nèi)網(wǎng)場景的接入點。
SSL 接入點:走公網(wǎng)場景的接入點。
將默認(rèn)接入點配置到函數(shù)計算的第二個函數(shù)中即可。
.... producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092') ....
然后點擊左側(cè)控制臺 Topic 管理,創(chuàng)建 Topic
將創(chuàng)建好的 Topic 配置到函數(shù)計算的第二個函數(shù)中即可。
... # 第一個參數(shù)為Topic名稱 producer.send('ikf-demo', json.dumps(event_str).encode('utf-8')) ...
上文已經(jīng)列舉過云上 Kafka 的優(yōu)勢,比如動態(tài)增加 Topic 的分區(qū)數(shù),我們可以在 Topic 列表中,對 Topic 的分區(qū)數(shù)進(jìn)行動態(tài)調(diào)整。
單 Topic 最大支持到 360 個分區(qū),這是開源自建無法做到的。
接下來點擊控制臺左側(cè) Consumer Group 管理,創(chuàng)建 Consumer Group。
至此,云上的 Kafka 就算配置完畢了,即 Producer 可以往剛剛創(chuàng)建的 Topic 中發(fā)消息了,Consumer 可以設(shè)置剛剛創(chuàng)建的 GID 以及訂閱 Topic 進(jìn)行消息接受和消費(fèi)。
在這個場景中,Kafka 后面往往會跟著 Flink,所以這里簡要給大家介紹一下在 Flink 中如何創(chuàng)建 Kafka Consumer 并消費(fèi)數(shù)據(jù)。代碼片段如下:
final ParameterTool parameterTool = ParameterTool.fromArgs(args); String kafkaTopic = parameterTool.get("kafka-topic","ikf-demo"); String brokers = parameterTool.get("brokers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092"); Properties kafkaProps = new Properties(); kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "ikf-demo"); FlinkKafkaConsumerkafka = new FlinkKafkaConsumer<>(kafkaTopic, new UserBehaviorEventSchema(), kafkaProps); kafka.setStartFromLatest(); kafka.setCommitOffsetsOnCheckpoints(false); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource dataStreamByEventTime = env.addSource(kafka);
以上就是構(gòu)建 Flink Kafka Consumer 和添加 Kafka Source 的代碼片段,還是非常簡單的。
至此,整個數(shù)據(jù)采集的架構(gòu)就搭建完畢了,下面我們通過壓測來檢驗一下整個架構(gòu)的性能。這里使用阿里云 PTS 來進(jìn)行壓測。
打開 PTS 控制臺,點擊左側(cè)菜單創(chuàng)建壓測/創(chuàng)建 PTS 場景
在場景配置中,將第一個函數(shù)計算函數(shù)暴露的 HTTP 接口作為串聯(lián)鏈路。
接口配置完后,我們來配置施壓
壓力模式:并發(fā)模式:指定有多少并發(fā)用戶同時發(fā)請求。RPS模式:指定每秒有多少請求數(shù)。
遞增模式:在壓測過程中可以通過手動調(diào)節(jié)壓力,也可以自動按百分比遞增壓力。
最大并發(fā):同時有多少個虛擬用戶發(fā)起請求。
遞增百分比:如果是自動遞增的話,按這里的百分比遞增。
單量級持續(xù)時長:在未完全達(dá)到壓力全量的時候,每一級梯度的壓力保持的時長。
壓測總時長:一共需要壓測的時長。
這里因為資源成本原因,并發(fā)用戶數(shù)設(shè)置為 2500 來進(jìn)行驗證。
從上圖壓測中的情況來看,TPS 達(dá)到了 2w 的封頂,549w+ 的請求,99.99% 的請求是成功的,那 369 個異常也可以點擊查看,都是壓測工具請求超時導(dǎo)致的。
至此,整個基于 Serverless 搭建的大數(shù)據(jù)采集傳輸?shù)募軜?gòu)就搭建好了,并且進(jìn)行了壓測驗證,整體的性能也是不錯的,并且整個架構(gòu)搭建起來也非常簡單和容易理解。這個架構(gòu)不光適用于游戲運(yùn)營行業(yè),其實任何大數(shù)據(jù)采集傳輸?shù)膱鼍岸际沁m用的,目前也已經(jīng)有很多客戶正在基于 Serverless 的架構(gòu)跑在生產(chǎn)環(huán)境,或者正走在改造 Serverless 架構(gòu)的路上。
以上就是Serverless如何解決數(shù)據(jù)采集分析痛點,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。