Serverless在游戏运营行业进行数据采集分析的最佳实践 链接:
作者:計緣,阿里云解決方案架構師
眾所周知,游戲行業在當今的互聯網行業中算是一棵常青樹。在疫情之前的2019年,中國游戲市場營收規模約2884.8億元,同比增長17.1%。2020年因為疫情,游戲行業更是突飛猛進。玩游戲本就是中國網民最普遍的娛樂方式之一,疫情期間更甚。據不完全統計,截至2019年,中國移動游戲用戶規模約6.6億人,占中國總網民規模8.47億的77.92%,可見游戲作為一種低門檻、低成本的娛樂手段,已成為大部分人生活中習以為常的一部分。
對于玩家而言,市面上的游戲數量多如牛毛,那么玩家如何能發現和認知到一款游戲,并且持續的玩下去恐怕是所有游戲廠商需要思考的問題。加之2018年游戲版號停發事件,游戲廠商更加珍惜每一個已獲得版號的游戲產品,所以這也使得“深度打磨產品質量”和“提高運營精細程度”這兩個游戲產業發展方向成為廣大游戲廠商的發展思路,無論是新游戲還是老游戲都在努力落實這兩點:
? 新游戲:面向玩家需要提供更充足的推廣資源和更完整的游戲內容。
? 老游戲:通過用戶行為分析,投入更多的精力和成本,制作更優質的版本內容。
這里我們重點來看新游戲。一家游戲企業辛辛苦苦研發三年,等著新游戲發售時一飛沖天。那么問題來了,新游戲如何被廣大玩家看到?
首先來看看游戲行業公司的分類:
? 游戲研發商:研發游戲的公司,生產和制作游戲內容。比如王者榮耀的所有英雄設計、游戲戰斗場景、戰斗邏輯等,全部由游戲研發公司提供。
? 游戲發行商:游戲發行商的主要工作分三大塊:市場工作、運營工作、客服工作。游戲發行商把控游戲命脈,市場工作核心是導入玩家,運營工作核心是將用戶價值最大化、賺取更多利益。
? 游戲平臺/渠道商:游戲平臺和渠道商的核心目的就是曝光游戲,讓盡量多的人能發現你的游戲。
這三種類型的業務,有專注于其中某一領域的獨立公司,也有能承接全部業務的公司,但無論那一種,這三者之間的關系是不會變的:
所以不難理解,想讓更多的玩家看到你的游戲,游戲發行和運營是關鍵。通俗來講,如果你的游戲出現在目前所有大家熟知的平臺廣告中,那么最起碼游戲的新用戶注冊數量是很可觀的。因此這就引入了一個關鍵詞:買量。
根據數據顯示,2019年月均買量手游數達6000+款,而2018年僅為4200款。另一方面,隨著抖音、微博等超級APP在游戲買量市場的資源傾斜,也助推手游買量的效果和效率有所提升,游戲廠商更愿意使用買量的方式來吸引用戶。
但需要注意的是,在游戲買量的精準化程度不斷提高的同時,買量的成本也在節節攀升,唯有合理配置買量、渠道與整合營銷之間的關系,才能將宣發資源發揮到最大的效果。
通俗來講,買量其實就是在各大主流平臺投放廣告,廣大用戶看到游戲廣告后,有可能會點擊廣告,然后進入游戲廠商的宣傳頁面,同時會采集用戶的一些信息,然后游戲廠商對采集到的用戶信息進行大數據分析,進行進一步的定向推廣。
游戲運營核心訴求
游戲廠商花錢買量,換來的用戶信息以及新用戶注冊信息是為持續的游戲運營服務的,那么這個場景的核心訴求就是采集用戶信息的完整性。比如說,某游戲廠商一天花5000w投放廣告,在某平臺某時段產生了每秒1w次的廣告點擊率,那么在這個時段內每一個點擊廣告的用戶信息要完整的被采集到,然后入庫進行后續分析。這就對數據采集系統提出了很高的要求。這其中,最核心的一點就是系統暴露接口的環節要能夠平穩承載買量期間不定時的流量脈沖。在買量期間,游戲廠商通常會在多個平臺投放廣告,每個平臺投放廣告的時間是不一樣的,所以就會出現全天不定時的流量脈沖現象。如果這個環節出現問題,那么相當于買量的錢就打水漂了。
數據采集系統傳統架構
上圖是一個相對傳統的數據采集系統架構,最關鍵的就是暴露HTTP接口回傳數據這部分,這部分如果出問題,那么采集數據的鏈路就斷了。但這部分往往會面臨兩個挑戰:
? 當流量脈沖來的時候,這部分是否可以快速擴容以應對流量沖擊。
? 游戲運營具備潮汐特性,并非天天都在進行,這就需要考慮如何優化資源利用率。
通常情況下,在游戲有運營活動之前,會提前通知運維同學,對這個環節的服務增加節點,但要增加多少其實是無法預估的,只能大概拍一個數字。這是在傳統架構下經常會出現的場景,這就會導致兩個問題:
? 流量太大,節點加少了,導致一部分流量的數據沒有采集到。
? 流量沒有預期那么大,節點加多了,導致資源浪費。
數據采集系統Serverless架構
我們可以通過 函數計算(函數計算的基本概念可以參考這篇文章)來取代傳統架構中暴露HTTP回傳數據這部分,從而完美解決傳統架構中存在問題,先來看架構圖:
傳統架構中的兩個問題均可以通過函數計算百毫秒彈性的特性來解決。我們并不需要去估算營銷活動會帶來多大的流量,也不需要去擔心和考慮對數據采集系統的性能,運維同學更不需要提前預備ECS。
因為函數計算的極致彈性特性,當沒有買量、沒有營銷活動的時候,函數計算的運行實例是零。有買量活動時,在流量脈沖的情況下,函數計算會快速拉起實例來承載流量壓力;當流量減少時,函數計算會及時釋放沒有請求的實例進行縮容。所以Serverless架構帶來的優勢有以下三點:
? 無需運維介入,研發同學就可以很快的搭建出來。
? 無論流量大小,均可以平穩的承接。
? 函數計算拉起的實例數量可以緊貼流量大小的曲線,做到資源利用率最優化,再加上按量計費的模式,可以最大程度優化成本。
架構解析
從上面的架構圖可以看到,整個采集數據階段,分了兩個函數來實現,第一個函數的作用是單純的暴露HTTP接口接收數據,第二個函數用于處理數據,然后將數據發送至消息隊列Kafka和數據庫RDS。
1.接收數據函數
我們打開函數計算控制臺,創建一個函數:
? 函數類型:HTTP(即觸發器為HTTP)
? 函數名稱:receiveData
? 運行環境:Python3
? 函數實例類型:彈性實例
? 函數執行內存:512MB
? 函數運行超時時間:60秒
? 函數單實例并發度:1
? 觸發器類型:HTTP觸發器
? 觸發器名稱:defaultTrigger
? 認證方式:anonymous(即無需認證)
? 請求方式:GET,POST
創建好函數之后,我們通過在線編輯器編寫代碼:
# -*- coding: utf-8 -*- import logging import json import urllib.parse HELLO_WORLD = b'Hello world!\n' def handler(environ, start_response):logger = logging.getLogger() context = environ['fc.context']request_uri = environ['fc.request_uri']for k, v in environ.items():if k.startswith('HTTP_'):# process custom request headerspasstry: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except (ValueError): request_body_size = 0 # 接收回傳的數據request_body = environ['wsgi.input'].read(request_body_size) request_body_str = urllib.parse.unquote(request_body.decode("GBK"))request_body_obj = json.loads(request_body_str)logger.info(request_body_obj["action"])logger.info(request_body_obj["articleAuthorId"])status = '200 OK'response_headers = [('Content-type', 'text/plain')]start_response(status, response_headers)return [HELLO_WORLD]此時的代碼非常簡單,就是接收用戶傳來的參數,我們可以調用接口進行驗證:
可以在函數的日志查詢中看到此次調用的日志:
同時,我們也可以查看函數的鏈路追蹤來分析每一個步驟的調用耗時,比如函數接到請求→冷啟動(無活躍實例時)→準備代碼→執行初始化方法→執行入口函數邏輯這個過程:
從調用鏈路圖中可以看到,剛才的那次請求包含了冷啟動的時間,因為當時沒有活躍實例,整個過程耗時418毫秒,真正執行入口函數代碼的時間為8毫秒。
當再次調用接口時,可以看到就直接執行了入口函數的邏輯,因為此時已經有實例在運行,整個耗時只有2.3毫秒:
2.處理數據的函數
第一個函數是通過在函數計算控制臺在界面上創建的,選擇了運行環境是Python3,我們可以在官方文檔中查看預置的Python3運行環境內置了哪些模塊,因為第二個函數要操作Kafka和RDS,所以需要我們確認對應的模塊。
從文檔中可以看到,內置的模塊中包含RDS的SDK模塊,但是沒有Kafka的SDK模塊,此時就需要我們手動安裝Kafka SDK模塊,并且創建函數也會使用另一種方式。
Funcraft
Funcraft是一個用于支持 Serverless 應用部署的命令行工具,能幫助我們便捷地管理函數計算、API 網關、日志服務等資源。它通過一個資源配置文件(template.yml),協助我們進行開發、構建、部署操作。
所以第二個函數我們需要使用Fun來進行操作,整個操作分為四個步驟:
? 安裝fun工具。
? 編寫template.yml模板文件,用來描述函數。
? 安裝我們需要的第三方依賴。
? 上傳部署函數。
安裝Fun
Fun提供了三種安裝方式:
? 通過 npm 包管理安裝 —— 適合所有平臺(Windows/Mac/Linux)且已經預裝了 npm 的開發者。
? 通過下載二進制安裝 —— 適合所有平臺(Windows/Mac/Linux)。
? 通過 Homebrew 包管理器安裝 —— 適合 Mac 平臺,更符合 MacOS 開發者習慣。
文本示例環境為Mac,所以使用npm方式安裝,非常的簡單,一行命令搞定:
sudo npm install @alicloud/fun -g
安裝完成之后。在控制終端輸入 fun 命令可以查看版本信息:
$ fun --version 3.6.20在第一次使用 fun 之前需要先執行 fun config 命令進行配置,按照提示,依次配置 Account ID、Access Key Id、Secret Access Key、 Default Region Name 即可。其中 Account ID、Access Key Id 你可以從函數計算控制臺首頁的右上方獲得:
fun config
? Aliyun Account ID 01
? Aliyun Access Key ID qef6j
? Aliyun Access Key Secret ***UFJG
? Default region name cn-hangzhou
? The timeout in seconds for each SDK client invoking 60
? The maximum number of retries for each SDK client 3
編寫template.yml
新建一個目錄,在該目錄下創建一個名為template.yml的YAML文件,該文件主要描述要創建的函數的各項配置,說白了就是將函數計算控制臺上配置的那些配置信息以YAML格式寫在文件里:
ROSTemplateFormatVersion: '2015-09-01' Transform: 'Aliyun::Serverless-2018-04-03' Resources: FCBigDataDemo: Type: 'Aliyun::Serverless::Service' Properties: Description: 'local invoke demo' VpcConfig: VpcId: 'vpc-xxxxxxxxxxx' VSwitchIds: [ 'vsw-xxxxxxxxxx' ] SecurityGroupId: 'sg-xxxxxxxxx' LogConfig: Project: fcdemo Logstore: fc_demo_store dataToKafka: Type: 'Aliyun::Serverless::Function' Properties: Initializer: index.my_initializer Handler: index.handler CodeUri: './' Description: '' Runtime: python3我們來解析以上文件的核心內容:
? FCBigDataDemo:自定義的服務名稱。通過下面的Type屬性標明是服務,即Aliyun::Serverless::Service。
? Properties:Properties下的屬性都是該服務的各配置項。
? VpcConfig:服務的VPC配置,包含:
? LogConfig:服務綁定的日志服務(SLS)配置,包含:
? dataToKafka:該服務下自定義的函數名稱。通過下面的Type屬性標明是函數,即Aliyun::Serverless::Function。
? Properties:Properties下的屬性都是該函數的各配置項。
? Initializer:配置初始化函數。
? Handler:配置入口函數。
? Runtime:函數運行環境。
目錄結構為:
安裝第三方依賴
服務和函數的模板創建好之后,我們來安裝需要使用的第三方依賴。在這個示例的場景中,第二個函數需要使用Kafka SDK,所以可以通過fun工具結合Python包管理工具pip進行安裝:
fun install --runtime python3 --package-type pip kafka-python
執行命令后有如下提示信息:
此時我們會發現在目錄下會生成一個.fun文件夾 ,我們安裝的依賴包就在該目錄下:
部署函數
現在編寫好了模板文件以及安裝好了我們需要的Kafka SDK后,還需要添加我們的代碼文件index.py,代碼內容如下:
# -*- coding: utf-8 -*- import logging import json import urllib.parse from kafka import KafkaProducer producer = None def my_initializer(context): logger = logging.getLogger() logger.info("init kafka producer")global producerproducer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092') def handler(event, context):logger = logging.getLogger() # 接收回傳的數據event_str = json.loads(event)event_obj = json.loads(event_str)logger.info(event_obj["action"])logger.info(event_obj["articleAuthorId"])# 向Kafka發送消息global producerproducer.send('ikf-demo', json.dumps(event_str).encode('utf-8'))producer.close()return 'hello world'代碼很簡單,這里做以簡單的解析:
? my_initializer:函數實例被拉起時會先執行該函數,然后再執行handler函數 ,當函數實例在運行時,之后的請求都不會執行my_initializer函數 。一般用于各種連接的初始化工作,這里將初始化Kafka Producer的方法放在了這里,避免反復初始化Produer。
? handler:該函數只有兩個邏輯,接收回傳的數據和將數據發送至Kafka的指定Topic。
下面通過fun deploy命令部署函數,該命令會做兩件事:
? 根據template.yml中的配置創建服務和函數。
? 將index.py和.fun上傳至函數中。
登錄函數計算控制臺,可以看到通過fun命令部署的服務和函數:
進入函數,也可以清晰的看到第三方依賴包的目錄結構:
3.函數之間調用
目前兩個函數都創建好了,下面的工作就是由第一個函數接收到數據后拉起第二個函數發送消息給Kafka。我們只需要對第一個函數做些許改動即可:
# -*- coding: utf-8 -*- import logging import json import urllib.parse import fc2 HELLO_WORLD = b'Hello world!\n' client = None def my_initializer(context): logger = logging.getLogger() logger.info("init fc client")global clientclient = fc2.Client(endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com",accessKeyID="your_ak",accessKeySecret="your_sk") def handler(environ, start_response):logger = logging.getLogger() context = environ['fc.context']request_uri = environ['fc.request_uri']for k, v in environ.items():if k.startswith('HTTP_'):# process custom request headerspasstry: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except (ValueError): request_body_size = 0 # 接收回傳的數據request_body = environ['wsgi.input'].read(request_body_size) request_body_str = urllib.parse.unquote(request_body.decode("GBK"))request_body_obj = json.loads(request_body_str)logger.info(request_body_obj["action"])logger.info(request_body_obj["articleAuthorId"])global clientclient.invoke_function('FCBigDataDemo','dataToKafka',payload=json.dumps(request_body_str),headers = {'x-fc-invocation-type': 'Async'})status = '200 OK'response_headers = [('Content-type', 'text/plain')]start_response(status, response_headers)return [HELLO_WORLD]如上面代碼所示,對第一個函數的代碼做了三個地方的改動:
? 導入函數計算的庫:import fc2
? 添加初始化方法,用于創建函數計算Client:
這里需要注意的時,當我們在代碼里增加了初始化方法后,需要在函數配置中指定初始化方法的入口:
? 通過函數計算Client調用第二個函數:
global clientclient.invoke_function('FCBigDataDemo','dataToKafka',payload=json.dumps(request_body_str),headers = {'x-fc-invocation-type': 'Async'} )invoke_function函數有四個參數:
? 第一個參數:調用函數所在的服務名稱。
? 第二個參數:調用函數的函數名稱。
? 第三個參數:向調用函數傳的數據。
? 第四個參數:調用第二個函數Request Header信息。這里主要通過x-fc-invocation-type這個Key來設置是同步調用還是異步調用。這里設置Async為異步調用。
如此設置,我們便可以驗證通過第一個函數提供的HTTP接口發起請求→采集數據→調用第二個函數→將數據作為消息傳給Kafka這個流程了。
使用兩個函數的目的
到這里有些同學可能會有疑問,為什么需要兩個函數,而不在第一個函數里直接向Kafka發送數據呢?我們先來看這張圖:
當我們使用異步調用函數時,在函數內部會默認先將請求的數據放入消息隊列進行第一道削峰填谷,然后每一個隊列在對應函數實例,通過函數實例的彈性拉起多個實例進行第二道削峰填谷。所以這也就是為什么這個架構能穩定承載大并發請求的核心原因之一。
4.配置Kafka
在游戲運營這個場景中,數據量是比較大的,所以對Kafka的性能要求也是比較高的,相比開源自建,使用云上的Kafka省去很多的運維操作,比如:
? 我們不再需要再維護Kafka集群的各個節點。
? 不需要關心主從節點數據同步問題。
? 可以快速、動態擴展Kafka集群規格,動態增加Topic,動態增加分區數。
? 完善的指標監控功能,消息查詢功能。
總的來說,就是一切SLA都有云上兜底,我們只需要關注在消息發送和消息消費即可。
所以我們可以打開Kafka開通界面,根據實際場景的需求一鍵開通Kafka實例,開通Kafka后登錄控制臺,在基本信息中可以看到Kafka的接入點:
? 默認接入點:走VPC內網場景的接入點。
? SSL接入點:走公網場景的接入點。
將默認接入點配置到函數計算的第二個函數中即可。
.... producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092') ....然后點擊左側控制臺Topic管理,創建Topic:
將創建好的Topic配置到函數計算的第二個函數中即可。
... # 第一個參數為Topic名稱 producer.send('ikf-demo', json.dumps(event_str).encode('utf-8')) ...上文已經列舉過云上Kafka的優勢,比如動態增加Topic的分區數,我們可以在Topic列表中,對Topic的分區數進行動態調整:
單Topic最大支持到360個分區,這是開源自建無法做到的。
接下來點擊控制臺左側Consumer Group管理,創建Consumer Group:
至此,云上的Kafka就算配置完畢了,即Producer可以往剛剛創建的Topic中發消息了,Consumer可以設置剛剛創建的GID以及訂閱Topic進行消息接受和消費。
Flink Kafka消費者
在這個場景中,Kafka后面往往會跟著Flink,所以這里簡要給大家介紹一下在Flink中如何創建Kafka Consumer并消費數據。代碼片段如下:
final ParameterTool parameterTool = ParameterTool.fromArgs(args); String kafkaTopic = parameterTool.get("kafka-topic","ikf-demo"); String brokers = parameterTool.get("brokers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092"); Properties kafkaProps = new Properties(); kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "ikf-demo"); FlinkKafkaConsumer<UserBehaviorEvent> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new UserBehaviorEventSchema(), kafkaProps); kafka.setStartFromLatest(); kafka.setCommitOffsetsOnCheckpoints(false); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<UserBehaviorEvent> dataStreamByEventTime = env.addSource(kafka);以上就是構建Flink Kafka Consumer和添加Kafka Source的代碼片段,還是非常簡單的。
壓測驗證
至此,整個數據采集的架構就搭建完畢了,下面我們通過壓測來檢驗一下整個架構的性能。這里使用阿里云PTS來進行壓測。
創建壓測場景
打開PTS控制臺,點擊左側菜單創建壓測/創建PTS場景:
在場景配置中,將第一個函數計算函數暴露的HTTP接口作為串聯鏈路,配置如下圖所示:
接口配置完后,我們來配置施壓:
? 壓力模式:
? 并發模式:指定有多少并發用戶同時發請求。
? RPS模式:指定每秒有多少請求數。
? 遞增模式:在壓測過程中可以通過手動調節壓力,也可以自動按百分比遞增壓力。
? 最大并發:同時有多少個虛擬用戶發起請求。
? 遞增百分比:如果是自動遞增的話,按這里的百分比遞增。
? 單量級持續時長:在未完全達到壓力全量的時候,每一級梯度的壓力保持的時長。
? 壓測總時長:一共需要壓測的時長。
這里因為資源成本原因,并發用戶數設置為2500來進行驗證。
從上圖壓測中的情況來看,TPS達到了2w的封頂,549w+的請求,99.99%的請求是成功的,那369個異常也可以點擊查看,都是壓測工具請求超時導致的。
總結
至此,整個基于Serverless搭建的大數據采集傳輸的架構就搭建好了,并且進行了壓測驗證,整體的性能也是不錯的,并且整個架構搭建起來也非常簡單和容易理解。這個架構不光適用于游戲運營行業,其實任何大數據采集傳輸的場景都是適用的,目前也已經有很多客戶正在基于Serverless的架構跑在生產環境,或者正走在改造Serverless 架構的路上。
基于Serverless還有很多其他的應用場景,之后我會一一分享給大家,大家如果有任何疑問也可以加入釘釘群:35712134來尋找答案,我們不見不散!
掃碼了解更多技術內容與客戶案例:
原文鏈接:https://developer.aliyun.com/article/781506?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。 與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的Serverless在游戏运营行业进行数据采集分析的最佳实践 链接:的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 解读云原生下的可观察性发展方向
- 下一篇: 核桃编程 | 前端可观测性建设之路