新聞中心
如今大數(shù)據(jù)在各行業(yè)的應用越來越廣泛:運營基于數(shù)據(jù)關注運營效果,產品基于數(shù)據(jù)分析關注轉化率情況,開發(fā)基于數(shù)據(jù)衡量系統(tǒng)優(yōu)化效果等。

美圖公司有美拍、美圖秀秀、美顏相機等十幾個 app,每個 app 都會基于數(shù)據(jù)做個性化推薦、搜索、報表分析、反作弊、廣告等,整體對數(shù)據(jù)的業(yè)務需求比較多、應用也比較廣泛。
因此美圖數(shù)據(jù)技術團隊的業(yè)務背景主要體現(xiàn)在:業(yè)務線多以及應用比較廣泛。這也是促使我們搭建數(shù)據(jù)平臺的一個最主要的原因,由業(yè)務驅動。
美圖數(shù)據(jù)平臺整體架構
如圖所示是我們數(shù)據(jù)平臺的整體架構。在數(shù)據(jù)收集這部分,我們構建一套采集服務端日志系統(tǒng) Arachnia,支持各 app 集成的客戶端 SDK,負責收集 app 客戶端數(shù)據(jù);同時也有基于 DataX 實現(xiàn)的數(shù)據(jù)集成(導入導出);Mor 爬蟲平臺支持可配置的爬取公網數(shù)據(jù)的任務開發(fā)。
數(shù)據(jù)存儲層主要是根據(jù)業(yè)務特點來選擇不同的存儲方案,目前主要有用到 HDFS、MongoDB、Hbase、ES 等。在數(shù)據(jù)計算部分,當前離線計算主要還是基于 Hive&MR、實時流計算是 Storm 、 Flink 以及還有另外一個自研的 bitmap 系統(tǒng) Naix。
在數(shù)據(jù)開發(fā)這塊我們構建了一套數(shù)據(jù)工坊、數(shù)據(jù)總線分發(fā)、任務調度等平臺。數(shù)據(jù)可視化與應用部分主要是基于用戶需求構建一系列數(shù)據(jù)應用平臺,包括:A/B 實驗平臺、渠道推廣跟蹤平臺、數(shù)據(jù)可視化平臺、用戶畫像等等。
右側展示的是一些各組件都可能依賴的基礎服務,包括地理位置、元數(shù)據(jù)管理、唯一設備標識等。
如下圖所示是基本的數(shù)據(jù)架構流圖,典型的 lamda 架構,從左端數(shù)據(jù)源收集開始,Arachnia、AppSDK 分別將服務端、客戶端數(shù)據(jù)上報到代理服務 collector,通過解析數(shù)據(jù)協(xié)議,把數(shù)據(jù)寫到 kafka,然后實時流會經過一層數(shù)據(jù)分發(fā),最終業(yè)務消費 kafka 數(shù)據(jù)進行實時計算。
離線會由 ETL 服務負責從 Kafka dump 數(shù)據(jù)到 HDFS,然后異構數(shù)據(jù)源(比如 MySQL、Hbase 等)主要基于 DataX 以及 Sqoop 進行數(shù)據(jù)的導入導出,最終通過 hive、kylin、spark 等計算把數(shù)據(jù)寫入到各類的存儲層,最后通過統(tǒng)一的對外 API 對接業(yè)務系統(tǒng)以及我們自己的可視化平臺等。
數(shù)據(jù)平臺的階段性發(fā)展
企業(yè)級數(shù)據(jù)平臺建設主要分三個階段:
- 剛開始是基本使用免費的第三方平臺,這個階段的特點是能快速集成并看到 app 的一些統(tǒng)計指標,但是缺點也很明顯,沒有原始數(shù)據(jù)除了那些第三方提供的基本指標其他分析、推薦等都無法實現(xiàn)。所以有從 0 到 1 的過程,讓我們自己有數(shù)據(jù)可以用;
- 在有數(shù)據(jù)可用后,因為業(yè)務線、需求量的爆發(fā),需要提高開發(fā)效率,讓更多的人參與數(shù)據(jù)開發(fā)、使用到數(shù)據(jù),而不僅僅局限于數(shù)據(jù)研發(fā)人員使用,所以就涉及到把數(shù)據(jù)、計算存儲能力開放給各個業(yè)務線,而不是握在自己手上;
- 在當數(shù)據(jù)開放了以后,業(yè)務方會要求數(shù)據(jù)任務能否跑得更快,能否秒出,能否更實時;另外一方面,為了滿足業(yè)務需求集群的規(guī)模越來越大,因此會開始考慮滿足業(yè)務的同時,如何實現(xiàn)更節(jié)省資源。
美圖現(xiàn)在是處于第二與第三階段的過渡期,在不斷完善數(shù)據(jù)開放的同時,也逐步提升查詢分析效率,以及開始考慮如何進行優(yōu)化成本。接下來會重點介紹 0 到 1 以及數(shù)據(jù)開放這兩個階段我們平臺的實踐以及優(yōu)化思路。
從 0 到 1
從 0 到 1 解決從數(shù)據(jù)采集到最終可以使用數(shù)據(jù)。如圖 4 所示是數(shù)據(jù)收集的演進過程,從剛開始使用類似 umeng、flurry 這類的免費第三方平臺,到后面快速使用 rsync 同步日志到一臺服務器上存儲、計算,再到后面快速開發(fā)了一個簡單的python腳本支持業(yè)務服務器上報日志,最終我們開發(fā)了服務端日志采集系統(tǒng) Arachnia 以及客戶端 AppSDK。
數(shù)據(jù)采集是數(shù)據(jù)的源頭,在整個數(shù)據(jù)鏈路中是相對重要的環(huán)節(jié),需要更多關注:數(shù)據(jù)是否完整、數(shù)據(jù)是否支持實時上報、數(shù)據(jù)埋點是否規(guī)范準確、以及維護管理成本。因此我們的日志采集系統(tǒng)需要滿足以下需求:
- 能集成管理維護,包括 Agent 能自動化部署安裝升級卸載、配置熱更、延遲方面的監(jiān)控;
- 在可靠性方面至少需要保證 at least once;
- 美圖現(xiàn)在有多 IDC 的情況,需要能支持多個 IDC 數(shù)據(jù)采集匯總到數(shù)據(jù)中心;
- 在資源消耗方面盡量小,盡量做到不影響業(yè)務。
基于以上需求我們沒有使用 flume、scribe、fluentd,最終選擇自己開發(fā)一套采集系統(tǒng) Arachnia。
上圖是 Arachnia 的簡易架構圖,它通過系統(tǒng)大腦進行集中式管理。puppet 模塊主要作為單個 IDC 內統(tǒng)一匯總 Agent 的 metrics,中轉轉發(fā)的 metrics 或者配置熱更命令。采集器 Agent 主要是運維平臺負責安裝、啟動后從 brain 拉取到配置,并開始采集上報數(shù)據(jù)到 collector。
接著看 Arachnia 的實踐優(yōu)化,首先是 at least once 的可靠性保證。不少的系統(tǒng)都是采用把上報失敗的數(shù)據(jù)通過 WAL 的方式記錄下來,重試再上報,以免上報失敗丟失。我們的實踐是去掉 WAL,增加了 coordinator 來統(tǒng)一的分發(fā)管理 tx 狀態(tài)。
開始采集前會從 coordinator 發(fā)出 txid,source 接收到信號后開始采集,并交由 sink 發(fā)送數(shù)據(jù),發(fā)送后會ack tx,告訴 coordinator 已經 commit。coordinator 會進行校驗確認,然后再發(fā)送 commit 的信號給 source、sink 更新狀態(tài),最終 tx 完 source 會更新采集進度到持久層(默認是本地 file)。該方式如果在前面 3 步有問題,則數(shù)據(jù)沒有發(fā)送成功,不會重復執(zhí)行;如果后面 4 個步驟失敗,則數(shù)據(jù)會重復,該 tx 會被重放。
基于上文的 at least once 可靠性保證,有些業(yè)務方是需要唯一性的,我們這邊支持為每條日志生成唯一 ID 標識。另外一個數(shù)據(jù)采集系統(tǒng)的主要實踐是:唯一定位一個文件以及給每條日志做唯一的 MsgID,方便業(yè)務方可以基于 MsgID 在發(fā)生日志重復時能在后面做清洗。
我們一開始是使用 filename,后面發(fā)現(xiàn) filename 很多業(yè)務方都會變更,所以改為 inode,但是 inode linux 會回收重復利用,最后是以 inode & 文件頭部內容做 hash 來作為fileID。而 MsgID 是通過 agentID & fileID & offset 來唯一確認。
數(shù)據(jù)上報之后由 collector 負責解析協(xié)議推送數(shù)據(jù)到 Kafka,那么 Kafka 如何落地到 HDFS 呢? 首先看美圖的訴求:
- 支持分布式處理;
- 涉及到較多業(yè)務線因此有多種數(shù)據(jù)格式,所以需要支持多種數(shù)據(jù)格式的序列化,包括 json、avro、特殊分隔符等;
- 支持因為機器故障、服務問題等導致的數(shù)據(jù)落地失敗重跑,而且需要能有比較快速的重跑能力,因為一旦這塊故障,會影響到后續(xù)各個業(yè)務線的數(shù)據(jù)使用;
- 支持可配置的 HDFS 分區(qū)策略,能支持各個業(yè)務線相對靈活的、不一樣的分區(qū)配置;
- 支持一些特殊的業(yè)務邏輯處理,包括:數(shù)據(jù)校驗、過期過濾、測試數(shù)據(jù)過濾、注入等;
基于上述訴求痛點,美圖從 Kafka 落地到 HDFS 的數(shù)據(jù)服務實現(xiàn)方式如圖 7 所示。
基于 Kafka 和 MR 的特點,針對每個 kafka topic 的 partition,組裝 mapper 的 inputsplit,然后起一個 mapper 進程處理消費這個批次的 kafka 數(shù)據(jù),經過數(shù)據(jù)解析、業(yè)務邏輯處理、校驗過濾、最終根據(jù)分區(qū)規(guī)則落地寫到目標 HDFS 文件。
落地成功后會把這次處理的 meta 信息(包括 topic、partition、開始的 offset、結束的offset)存儲到 MySQL。下次再處理的時候,會從上次處理的結束的 offset 開始讀取消息,開始新一批的數(shù)據(jù)消費落地。
實現(xiàn)了基本功能后難免會遇到一些問題,比如不同的業(yè)務 topic 的數(shù)據(jù)量級是不一樣的,這樣會導致一次任務需要等待 partition 數(shù)據(jù)量最多以及處理時間最長的 mapper 結束,才能結束整個任務。那我們怎么解決這個問題呢?系統(tǒng)設計中有個不成文原則是:分久必合、合久必分,針對數(shù)據(jù)傾斜的問題我們采用了類似的思路。
首先對數(shù)據(jù)量級較小的 partition 合并到一個 inputsplit,達到一個 mapper 可以處理多個業(yè)務的 partition 數(shù)據(jù),最終落地寫多份文件。
另外對數(shù)據(jù)量級較大的 partition 支持分段拆分,平分到多個 mapper 處理同一個 partition,這樣就實現(xiàn)了更均衡的 mapper 處理,能更好地應對業(yè)務量級的突增。
除了數(shù)據(jù)傾斜的問題,還出現(xiàn)各種原因導致數(shù)據(jù) dump 到 HDFS 失敗的情況,比如因為 kafka 磁盤問題、hadoop 集群節(jié)點宕機、網絡故障、外部訪問權限等導致該 ETL 程序出現(xiàn)異常,最終可能導致因為未 close HDFS 文件導致文件損壞等,需要重跑數(shù)據(jù)。那我們的數(shù)據(jù)時間分區(qū)基本都是以天為單位,用原來的方式可能會導致一個天粒度的文件損壞,解析無法讀取。
我們采用了分兩階段處理的方式:mapper 1 先把數(shù)據(jù)寫到一個臨時目錄,mapper 2 把 Hdfs 的臨時目錄的數(shù)據(jù) append 到目標文件。這樣當 mapper1 失敗的時候可以直接重跑這個批次,而不用重跑整天的數(shù)據(jù);當 mapper2 失敗的時候能直接從臨時目錄 merge 數(shù)據(jù)替換最終文件,減少了重新 ETL 天粒度的過程。
在數(shù)據(jù)的實時分發(fā)訂閱寫入到 kafka1 的數(shù)據(jù)基本是每個業(yè)務的全量數(shù)據(jù),但是針對需求方大部分業(yè)務都只關注某個事件、某小類別的數(shù)據(jù),而不是任何業(yè)務都消費全量數(shù)據(jù)做處理,所以我們增加了一個實時分發(fā) Databus 來解決這個問題。
Databus 支持業(yè)務方自定義分發(fā) rules 往下游的 kafka 集群寫數(shù)據(jù),方便業(yè)務方訂閱處理自己想要的數(shù)據(jù),并且支持更小粒度的數(shù)據(jù)重復利用。
上圖可以看出 Databus 的實現(xiàn)方式,它的主體基于 Storm 實現(xiàn)了 databus topology。Databus 有兩個 spout,一個支持拉取全量以及新增的 rules,然后更新到下游的分發(fā) bolt 更新緩存規(guī)則,另外一個是從 kafka 消費的 spout。而 distributionbolt 主要是負責解析數(shù)據(jù)、規(guī)則 match,以及把數(shù)據(jù)往下游的 kafka 集群發(fā)送。
數(shù)據(jù)開放
有了原始數(shù)據(jù)并且能做離線、實時的數(shù)據(jù)開發(fā)以后,隨之而來的是數(shù)據(jù)開發(fā)需求的井噴,數(shù)據(jù)研發(fā)團隊應接不暇。所以我們通過數(shù)據(jù)平臺的方式開放數(shù)據(jù)計算、存儲能力,賦予業(yè)務方有數(shù)據(jù)開發(fā)的能力。
對實現(xiàn)元數(shù)據(jù)管理、任務調度、數(shù)據(jù)集成、DAG 任務編排、可視化等不一一贅述,主要介紹數(shù)據(jù)開放后,美圖對穩(wěn)定性方面的實踐心得。
數(shù)據(jù)開放和系統(tǒng)穩(wěn)定性是相愛相殺的關系:一方面,開放了之后不再是有數(shù)據(jù)基礎的研發(fā)人員來做,經常會遇到提交非法、高資源消耗等問題的數(shù)據(jù)任務,給底層的計算、存儲集群的穩(wěn)定性造成了比較大的困擾;另外一方面,其實也是因為數(shù)據(jù)開放,才不斷推進我們必須提高系統(tǒng)穩(wěn)定性。
針對不少的高資源、非法的任務,我們首先考慮能否在 HiveSQL 層面能做一些校驗、限制。如圖 13 所示是 HiveSQL 的整個解析編譯為可執(zhí)行的 MR 的過程:
首先基于 Antlr 做語法的解析,生成 AST,接著做語義解析,基于AST 會生成 JAVA 對象 QueryBlock?;?QueryBlock 生成邏輯計劃后做邏輯優(yōu)化,最后生成物理計劃,進行物理優(yōu)化后,最終轉換為一個可執(zhí)行的 MR 任務。
我們主要在語義解析階段生成 QueryBlock 后,拿到它做了不少的語句校驗,包括:非法操作、查詢條件限制、高資源消耗校驗判斷等。
第二個在穩(wěn)定性方面的實踐,主要是對集群的優(yōu)化,包括:
- 我們完整地對 Hive、Hadoop 集群做了一次升級。主要是因為之前在低版本有 fix 一些問題以及合并一些社區(qū)的 patch,在后面新版本都有修復;另外一個原因是新版本的特性以及性能方面的優(yōu)化。我們把 Hive 從 0.13 版本升級到 2.1 版本,Hadoop 從 2.4 升級到 2.7;
- 對 Hive 做了 HA 的部署優(yōu)化。我們把 HiveServer 和 MetaStoreServer 拆分開來分別部署了多個節(jié)點,避免合并在一個服務部署運行相互影響;
- 之前執(zhí)行引擎基本都是 On MapReduce 的,我們也在做 Hive On Spark 的遷移,逐步把線上任務從 Hive On MR 切換到 Hive On Spark;
- 拉一個內部分支對平時遇到的一些問題做 bugfix 或合并社區(qū) patch 的特性;
在平臺穩(wěn)定性方面的實踐最后一部分是提高權限、安全性,防止對集群、數(shù)據(jù)的非法訪問、攻擊等。提高權限主要分兩塊:API 訪問與集群。
- API Server :上文提到我們有 OneDataAPI,提供給各個業(yè)務系統(tǒng)訪問數(shù)據(jù)的統(tǒng)一 API。這方面主要是額外實現(xiàn)了一個統(tǒng)一認證 CA 服務,業(yè)務系統(tǒng)必須接入 CA 拿到 token 后來訪問OneDataAPI,OneDataAPI 在 CA 驗證過后,合法的才允許真正訪問數(shù)據(jù),從而防止業(yè)務系統(tǒng)可以任意訪問所有數(shù)據(jù)指標。
- 集群:目前主要是基于 Apache Ranger 來統(tǒng)一各類集群,包括 Kafka、Hbase、Hadoop 等做集群的授權管理和維護;
以上就是美圖在搭建完數(shù)據(jù)平臺并開放給各個業(yè)務線使用后,對平臺穩(wěn)定性做的一些實踐和優(yōu)化。
總結
- 首先在搭建數(shù)據(jù)平臺之前,一定要先了解業(yè)務,看業(yè)務的整體體量是否比較大、業(yè)務線是否比較廣、需求量是否多到嚴重影響我們的生產力。如果都是肯定答案,那可以考慮盡快搭建數(shù)據(jù)平臺,以更高效、快速提高數(shù)據(jù)的開發(fā)應用效率。如果本身的業(yè)務量級、需求不多,不一定非得套大數(shù)據(jù)或者搭建多么完善的數(shù)據(jù)平臺,以快速滿足支撐業(yè)務優(yōu)先。
- 在平臺建設過程中,需要重點關注數(shù)據(jù)質量、平臺的穩(wěn)定性,比如關注數(shù)據(jù)源采集的完整性、時效性、設備的唯一標識,多在平臺的穩(wěn)定性方面做優(yōu)化和實踐,為業(yè)務方提供一個穩(wěn)定可靠的平臺。
- 在提高分析決策效率以及規(guī)模逐漸擴大后需要對成本、資源做一些優(yōu)化和思考。
分享名稱:架構師詳解:從0到1構建大數(shù)據(jù)平臺
路徑分享:http://fisionsoft.com.cn/article/dpoisgp.html


咨詢
建站咨詢
