压在透明的玻璃上c-国产精品国产一级A片精品免费-国产精品视频网-成人黄网站18秘 免费看|www.tcsft.com

魅族大數據之流平臺設計部署實踐

本篇文章內容來自第八期魅族開放日魅族數據架構師沈輝煌的現場分享,由IT大咖說提供現場速錄,由msup整理編輯。

沈輝煌

沈輝煌 ??魅族數據架構師

2010年加入魅族,負責大數據、云服務相關設計與研發;

專注于分布式服務、分布式存儲、海量數據下rdb與nosq融合等技術。

主要技術點:推薦算法、文本處理、ranking算法

導讀:魅族大數據的流平臺系統擁有自設計的采集SDK,自設計支持多種數據源采集的Agent組件,還結合了Flume、Spark、Metaq、Storm、Kafka、Hadoop等技術組件,本文就魅族流平臺對大量數據的采集、實時計算、系統分析方法,全球多機房數據采集等問題進行介紹。

流平臺是魅族大數據平臺的重要部分,包括數據采集、數據處理、數據存儲、數據計算等模塊,流平臺為大數據提供了強大的支撐能力。

文章還介紹了魅族大數據流平臺的架構、設計方式、常用組件、核心技術框架等方面的內容,還原魅族大數據平臺的搭建過程及遇到的問題。

  • 魅族大數據平臺架構

如圖所示便是魅族的大數據平臺架構。

1

左邊是多樣性的數據源接入;

右上是離線數據的采集;

下面是流平臺(也是今天分享的主角);

中間是集群的部署;

右邊是ETL的數據挖掘、算法庫和一些數據模型;

左上角是數據開發平臺,比如webIDE可以使得開發人員更便捷地做一些數據查詢和管理;

最右邊的是一個數據產品門戶,包括我們的用戶畫像、統計系統等,這里面包含大數據的很多組件,比如數據采集、數據處理、數據存儲、數據挖掘等,最后產生大數據的雛形。

  • 流平臺介紹

流平臺是大數據平臺一個比較重要的部分,主要包括四個部分:數據采集、數據處理、數據存儲、計算能力。

數據采集

?“誰擁有了整個世界的數據,他就是最大的贏家”,這句話雖然有點夸張,但是卻表達了數據采集的重要性。一個大數據平臺數據的多樣性、數據量的級別很大程度上決定了大數據的能力和豐富程度。

數據處理

這里講的數據處理并不是像末端那么專業的數據清洗,更多的是為后續入庫做一些簡單處理,以及實時計算。

數據存儲

計算能力,包括離線計算和實時計算

?

流平臺為大數據提供非常強大的支撐,數據統計分析、數據挖掘、神經網絡的圖形計算等都可以依靠計算能力進行。

實時計算是指在一定單位的時間延遲范圍內,基于增量的數據推算出結果,再結合歷史數據得到期望的分析結果。這個時間是根據業務需求而定。

  • 流平臺架構

?2

上圖是我們的流平臺架構圖

左邊是數據源,像NoSQL、RDB、文件類型;

最右邊是集群,下面還有其他的一些Hadoop(存儲);

中間的框是核心,也就是流平臺;

最上面的是AS-Manager(我們的流管理平臺),承載了非常多的管理功能;

下面是Zookeeper,這是一個非常流行的集成管理中心,魅族的一些架構都會用到它,流平臺也不例外,Zookeeper可以說貫穿了我們整個流平臺的架構;

最下面是AS-Protocol,我們自己設計的流平臺的數據對象協議,打通了整個流平臺的數據鏈路;

中間四個框是核心的四個模塊:采集模塊、數據中轉模塊、緩存模塊、實時計算模塊,也叫合并層

  • 具體架構介紹

?3

這是我們的具體架構圖。

業務規模:從這邊采集數據到經過流平臺最后經過實時計算或入庫,它的數據量量級在千億級別。

3、組件

數據源渠道

前面提到采集數據源渠道的多樣性決定了大數據平臺的相應能力和綜合程度。我們這邊首先會有一個文件類的業務數據,包括業務日志、業務數據、數據庫文件,這些都會經過采集服務采集。

下面這一塊包括一些網站的js訪問、手機各APP埋點、特點的應用日志文件(它會通過手機端的一些埋點上訪到我們的埋點服務)。

數據采集

數據采集分為兩個部分:采集服務、獨立部署的埋點服務。圖中只顯示了一個埋點服務,里面還會有很多的第三方業務,第三方業務通過這個紅色的插件接入我們的采集。

數據中轉

通過采集模塊把數據流轉到中轉模塊,中轉模塊采用的是目前比較流行的flume組件,紅色sink是我們自己開發的。

Cache

sink把前面的數據轉給緩存層,緩存層里有metaq和Kafka。

Streaming

實時計算模塊上線了Spark和Storm,較早上線的是Spark,目前兩個都在用的原因是它會適應不同的業務場景。

Store

最后面是我們提供給落地的store層,像HIVE、Hbase等等。

流管理平臺

最下面是流管理平臺,圖中有四條線連著四個核心模塊,對這四個模塊進行非常重要且非常豐富的邏輯管理,包括數據管理、對各節點的監控、治理、實時命令的下發等。

  • 流平臺設計

1、概念解讀

Message,就是一條消息,是最小的數據單位。業務方給的一條數據就是一個message;我們去采集文件的話,一行數據就是一個message。

AS-Protocol,是我們自己設計的流平臺數據的對象,它會對一批量的message進行打包,然后再加上一些必要的變量做一個封裝。

Evnet,會提供一個類似的標準接口,這個地方其實更多的是為了打通采集的流平臺。它最重要的一個變量是Topic,就是說我拿到了我的AS-Protocol就可以根據對應的Topic發到相應的登錄去緩存提取,因為我們的AS-Protocol除了起始端和結束端以外,中間層是不用解析協議的。

Type,數據格式目前是Json和Hive格式,可以根據業務去擴展。

Compress,Hive格式在空間上也是非常有優勢的,非常適合于網絡傳輸壓縮。當壓縮數據源質量沒有達到一定量的程度的時候會越壓越大,所以我們要判斷是否需要壓縮。我們壓縮采用的是一個全系統

Data_timestamp,數據的時間是最上面的message,每一個message會攜帶一個數據時間.這個比較好理解,就是入庫之后會用做數據統計和分析的

Send_timestamp,發送時間會攜帶在我們的AS-Protocol里,它聲明了每一個數據包發送的時間。

Unique Key,每一個數據包都有一個唯一的標識,這個也是非常重要的,它會跟著AS-Protocol和Event走通整個平臺的數據鏈路,在做數據定位、問題定位的時候非常有用,可以明確查到每個數據包在哪個鏈路經歷了什么事情。

Topic。這個不需多言。

Data_Group,數據分組是我們非常核心的一個設計思想,原則上我們是一個業務對應一個數據分組。

Protobuf序列化,我們會對Event數據做一個PT序列化,然后再往上面傳,這是為了節省數據流量。

  • 協議設計

?4

如圖所示為Event、As-Protocol和Message的關系。

最上層是Event,里面有一個Unique Key和Topic包括了我們的As-Protocol,然后是數據格式、發動時間是否壓縮、用什么方式壓縮,還攜帶一些額外的變量。最后面是一個Body,Body其實就是一個message的宿主,以字節流的方式存儲。這個就是我們一個數據對象的協議設計。

接下來看數據在整個架構里是如何流轉和傳輸的。

首先是數據源渠道,最左邊的是message,任何業務方的數據過來都是一條message,經過數據采集把一批message打包封裝成Event,再發給數據中轉模塊,也叫flume。把Event拆出來,有一個topic,最后把As-protocol放到相應位置緩存,消費對應的Topic,拿到對應的As-Protocol,并把這個數據包解析出來,得到一條一條的message,這時就可以進行處理、入庫或實時計算。

需要特別注意的是message和Event。每個Message的業務量級是不一樣的,有幾十B、幾百B、幾千B的差別,打包成As-Protocol的時候要試試批量的數目有多少,原則上壓縮后的數據有個建議值,這個建議值視業務而定,DataGroup打包的數量是可以配的。

  • 數據分組設計

5

如圖所示是我們的DataGroup設計。首先看最上面,一個Topic可以定義N個DataGroup。往下是Topic和streaming Job一比一的關系,就是說一個實時的Group只需要對應一個Topic,如果兩個業務不相關就對應的兩個Topic,用兩個Job去處理,最后得到想要的關系。

 

從架構圖可以看到DataGroup的扭轉關系。最初數據采集每一個節點會聲明它是屬于哪一個DataGroup,上傳數據會處于這個DataGroup,經過數據中轉發給我們的分布式緩存也對應了Topic下面不同的分組數據。最后Streaming交給我Topic,我可以帥選出在最上面的關系,去配置DataGroup,可以非常靈活地組合。這就是DataGroup的設計思想。

  • 采集組件Agent
  • 概述

6

如圖所示,這是完全由我們自己設計和實現的一款組件。右邊是采集組件,分為兩部分:一個是基于java環境的獨立工作程序;另一個是jar插件。插件叫Agen-Stub.jar;獨立層是Agent-File.zip,Agent-File有一個paresr支持不同的文件類型,目前支持的file和Binlog,可擴展。根據需要可以增加parser,也是接入Agent-stub,擁有Agent-stub的一些特性。

如上圖右側的示意圖,Agent-stub接入多個Business,前面提到的一個埋點服務就是一個Business,它把數據交給Agent-stub,Agent-stub會往后發展,與file和mysQL相對應的是file parser,出來是Agent-stub,流程是一樣的。

2、Agent-Stub.jar

接下來看Agent-Stub是如何設計的。

多線程、異步。這個毫無疑問,做插件化肯定是這樣考慮的,不能阻塞上層業務。

內存小隊列+磁盤壓縮隊列。這是我們改進最大的一個地方,早期版本中我們采用的是內存大隊列,如果只有內存大隊列缺點非常明顯:

程序正常啟動的時候大隊列里的數據怎么辦?要等他發完嗎?還是不發完?當大隊列塞滿的時候,還有對上層業務的侵入性怎么辦?程序遇到問題時怎么辦?大隊列可能是50萬、100萬甚至更多。

采用了內存小隊列+磁盤壓縮隊列后可以解決正常程序的啟停,保證數據沒有問題,還可以解決空間的占用清空性的問題,以此同時,磁盤壓縮隊列還可以在程序出錯的時候加速發送。

解釋一下磁盤壓縮隊列, 這次我們設計協議的思想很簡單:壓縮之后得到一個字節速度,存在磁盤的文件里,這個文件按照小時存儲,這時對于二次發送帶來的損耗并不大,不需要重新阻斷數據也不需要解析和壓縮,只需要讀出來發出去。后面還有一個提升就是磁盤發送隊列跟內存發送隊列是單獨分開的,這樣更能提升二次數據的發送性能。

無損啟停。正常的啟動和停止,數據是不會停止不會丟失的。

Agent的版本號自動上報平臺。這個非常重要,我們早期的版本是沒有的,可以想象一下當你的Agent節點是幾千上萬,如果沒有一個平臺直觀地管理,那將是一個怎樣恐怖的局面。現在我們每一個Agent啟動的時候都會創建一個node path,把版本號放到path里,在管理平臺解析這個path,然后做分類,我們的版本就是這樣上報的。

自動識別接入源,智能歸類。這個其實和上面那點是一樣的,在早期版本中我們做一個Agent的標識,其實就是一個IP+一個POD,就是說你有幾千個IP+POD量表需要人工管理,工作量非常大且乏味。我們優化了一個自動識別,把DataGroup放到Agent的node path里,管理平臺可以做到自動識別。

Agent的全面實時監控。包括內存隊列數、磁盤隊列數、運行狀態、出錯狀態、qps等,都可以Agent上報,并且在管理平臺直觀地看到哪一個節點是什么樣子的。其做法也依賴于zookeeper的實現和承載,這里其實就是對zk node的應用,我們有一個定時線程收集當前Agent必要的數據,然后傳到node的data上去,管理平臺會獲取這些date,最后做一個平臺化的展示。

支持實時命令。包括括限流,恢復限流、停止、調整心跳值等,大大提高了運維能力。其實現原理也是依賴于Agent,這里我們創建一個Data Group,通過管理平臺操作之后把數據放到Data Group里,然后會有一個監聽者去監聽獲取數據的變化并作出相應的邏輯。

兼容Docker。目前魅族在用Doker,Doker對我們這邊的Agent來講是一個挑戰,它的啟動和停止是非常態化的,就是你可能認為相同的Docker容器不會重啟第二次。

3、Agent-File.zip

接入Agent-Stub。Agent-file首先是接入Agent-stub,擁有Agent-stub的一些特性。

兼容Docker。因為啟動和停止的常態,假設我們剛剛一個業務接入了Agent-stub,那停止的時候它會通知我,Agent-stub會把小隊列里的數據抓到磁盤壓縮隊列里去。但是這里需要注意的是:磁盤壓縮隊列不能放到Docker自己的文件系統里,不然它停了之后數據就沒有人能夠得到了。

當Agent-stub停的時候,會有一個標識說磁盤要做隊列,我們的數據有沒有發完,磁盤壓縮隊列里有一個評級的標識文件,這時要用到Agent-file,Agent-file有一個單獨的掃描線程一個個地去掃描Docker目錄,掃到這個文件的時候判斷其數據有沒有發完,如果沒發完就只能當做一個發送者。

支持重發歷史數據。做大數據的可能都知道這些名詞,比如昨天的數據已經采集完了,但由于某些原因有可能數據有遺漏,需要再跑一次后端的補貼邏輯,或者上馬訓練,這時就要做數據重發。我們在管理平臺上就會有一個支持這種特定文件或特定時間段的選擇,Agent接收到這個命令的時候會把相應的數據發上去,當然前提是數據不要被清了。

管理平臺自助升級。這個可以理解成軟件升級,Agent可以說是非常常見的組件,但是我們重新設計時把自動升級考慮在內,這也是我們為什么設計自己做而不是用開源的組件。這樣做帶來的好處是非常大的,我們幾千個Agent在平臺里只需要一鍵就可以完成自動升級。

文件名正則表達式匹配。文件名的掃描是用自動表達式。

源目錄定時掃描 and Jnotify。重點介紹文件掃描機制。早期的版本是基于Agent-fire和KO-F兩者結合做的數據采集:Agent-file是加碼里對文件變更的事件鑒定,包括重命名、刪除、創建都有一個事件產生;KO-F是拿到文件下的最佳數據。假設源目錄里有一千個文件,KO-F現場就是一千個,Agent-file對應的文件變革賦予的追加、重命名等都可能會產生一系列事件,邏輯復雜。

所以我們設計了源目錄定時掃描的機制,首先有一個目標,就是我們的文件隊列,包括為未讀文件、已讀文件做區別,區別之后掃描,當然還會有像文件摘要等的存在這里不細講,掃描之后更新未讀文件、已讀文件列表。

之所以加Jnotify是因為我們發現只用定制掃描不能解決所有業務場景的問題,jootify在這里起到補充定制掃描的作用,解決文件風險和文件產程的問題。

單文件讀取。早期版本中這一點依賴于文件列表,當文件非常多時程序變得非常不穩定,因為可能要開幾百個或幾千個線程。后來我們改成了單文件的讀取,上文提到的掃描機制會產生一個文件隊列,然后從文件隊列里讀取,這樣一個個文件、一段段圖,程序就非常穩定了。

文件方式存儲offset,無損啟停。早期采用切入式PTE做存儲,銜接非常重,后來我們改成文件方式存儲,設計非常簡單就只有兩個文件:一個是目錄下面所有文件的offset;一個是正在讀的文件的offset。這里涉及到無損啟停和策略的問題,我們定了一個5次算法:就是每讀了5次就會刷盤一次,但只刷在讀文件,別的文件不會變化,所以可以想象得到,當這個程序被替換走的時候,最多也就是重復5條數據,大會導致數據丟失。

  • Agent示意圖

7?

如圖是Agent示意圖。上面是Agent-file和數據對象。Agent啟動的時候要把里面的offset文件取來,就會產生未讀文件和已讀文件列表,掃描文件目錄,然后更新文件隊列,還有一個fileJNotify是相對應的文件隊列。然后有一個比較重要的fileReader,我會先從文件隊列里拿到再去讀實際文件,讀完刷盤之后這一塊就成功了,我會根據我的刷盤去刷新offset。

上圖左邊有一個業務加了一個Agent-stub,最后變成flume,這里有一個QueueReceiver(隊列接收者),filereader和業務方的DataSender會把message發過來,QueueReceiver接受的數據就是一條條的message,然后發送到內存小隊列里,當這邊的小隊列滿了怎么辦呢?中間有一個額外的固定大小的性能提升的地方用于message歸類,當這個fIieReader往這個內存小隊列發的時候發現塞不進去了,就會在規定大小的隊列里發,當一個固定大小的隊列滿了之后就會打包壓縮,以字節處理的方式存到磁盤壓縮隊列。

再來說說我們為什么會提出二次數據的發送,其實就是多了一個countsender即壓縮隊列的發送者,直接的數據來源是磁盤壓縮隊列,與上面的并生沒有任何沖突。Countsender的數據對賬功能是我們整個平臺的核心功能之一,基于這個統計的數據確保了其完整性,少一條數據我們都知道,在采集層有一個countsender,以另外一個渠道發出去,和真正的數據源渠道不一樣,會更加的輕量化更加可靠,且數值非常小。 ?

最后是前文提到的監控和命令的實現,一邊是Agentnode,一邊是數據管理。

5、Agent的坑

丟數據。如前文提到內存大隊列帶來的問題。

版本管理的問題。

tailf -f的問題。

網絡原因導致zk刪節點問題。網絡不穩定的時候,ZK會有一個節點的心跳檢測,不穩定的時候監測會以為節點已經不存在了而把節點刪掉,這會導致管理平臺的節點監控、文件下發全部都失效。解決辦法就是在message加一層控制檢查線程,發現節點不在了再創建一遍。

亂碼的問題。可能會跟一些遠程訪問的軟件相關,原則上我們假設第二次啟動的時候沒有配置我們的編碼,默認與系統一致,但當遠程軟件啟動的時候可能會發生不一樣的地方,所以不要依賴于默認值,一定要在啟動程序里設置希望的編碼。

日志問題,在插件化的時候肯定要考慮到業務方的日志,我們把業務方的日志刷死了,當網絡出現問題的時候每發送一條就失敗一條,那是不是都要打印出來?我們的考慮是第一條不打印,后面可能十條打印一次,一百條打印一次,一千條打印一次,這個量取決于業務。補充一點,我們有一個統計線程,可以根據統計線程觀察Agent的正常與否。

  • 流管理平臺

?8

如圖所示,我們的流管理平臺界面比較簡單,但功能非常豐富,包括:

接入業務的管理、發布、上線;

對Agent節點進行實時監測、管理、命令;

對Flume進行監測、管理;

對實時計算的job的管理;

對全鏈路的數據流量對帳,這是我們自檢的功能;

智能監控報警,我們有一個非常人性化的報警閥值的建議。取一個平均值,比如一周或一天,設定一個閥值,比如一天的流量訪問次數可能是一千次,我們設計的報警是2000次,當連續一周都是2000次的時候就得改進。

  • 數據中轉

1、背景

業務發展可能從1到100再到1000,或者當公司互聯網發展到一定程度的時候業務可能遍布世界各地,魅族的云服務數據分為海外服務和國內服務,我們把業務拆分開來,大數據采集肯定也要跟著走,這就面臨著數據中轉的問題。

9

如圖所示是我們兩個案例的示意圖。黑色的是內網的線,橙色的是跨界性的線,有公網的、云端的、專線的,各種各樣的網絡情況。

上面的是Agent集群,B-IDC也有一個Agent集群,直接訪問我們登錄的集群。

這里第一個問題是我們的連接非常多,訪問Agent節點的時候有幾千個Agent節點就得訪問幾千個節點,這是不太友好的事情。另一個問題是當我們做升級遷移的時候,Agent要做修改和配置,必須得重啟,當整個B-IDC遷移到A-IDC,我們加了一個Flyme集群。同樣是一個Agent集群,下面有一個Flume集群,這樣的好處:一是里面的連接非常少,線上的Flume一個ID就三臺;二是這邊承載了所有的Agent,除了Agent還有其他的采集都在A-IDC里中轉,當這個片區要做升級的時候上面的業務是透明的,靈活性非常高。

  • Flume介紹
  • 10

Flume里有三個核心的部分:Source、Channel、Sink,Source是數據結構源;Channel相當于內存大隊列,Sink是輸出到不同的目標。官方提供了很多組件:Avro、HTTP、Thrift、Memory、File、Spillable Memory、Avro、Thrift、Hdfs、Hive。

3、Flume實踐

無Group,采用Zookeeper做集群

Agent采用LB做負載均衡,動態感知。結合Zookeeper可以感知到Agent列表,這時會采用負載均衡的做法找到當前的那個Flume,到后端的Flume直接變化的時候可以感知到從而下線。

硬盤緩存、無損啟停。采用memory可能會帶來些不好的問題,如果內存隊列改成文件就沒有這個問題。因為內存速度快,存儲強制刷新的時候就沒有數據了,所以我們做了優化:還是采用memory,在Flume停的時候把數據采集下來,下一次啟動的時候把數據發出去,這時就可以做到無損啟停,但是有一點千萬要注意:磁盤其實是固化在機器里面,當這臺機器停下不再啟動的時候,別忘了把數據移走發出去。

停止順序優化。在做優化的時候遇到源碼的修改,其實就是Flume停止順序的優化。原生里好像先停止Channel,然后提高sink,這就會導致想要做這個功能的時候做不到。我們應該先把這個數據改掉再去停止sink最后停止Channel,這樣就保證Channel里的數據可以全部固化到硬盤里。

多種轉發方式。我們現在是全球的RBC,支持公網、內網、跨域性專線,我們提供一個非常好的功能:http sink,它也是一個安全的支持ssl的轉換方式。

自定義Sink,多線程發送(channel的get只能單線程)。

  • 停止順序

?11

如圖是停止順序的修改。這是一個sourceRunner、sink、channel。

  • Memory的capacity

選擇內存之后,這個內存大小到底多少比較合適?如圖所示,左邊Flume是從500-1000,channel容量是5萬、10萬,還有Agent的個數、線程,我們發現在10萬的時候它的fullGC是非常頻繁的,所以我們最后定的大小是5萬。當然不同的機器根據不同的測試得到自己的值,這個值不是恒定的。

12 13

包大小從10K到30K到50K有什么不一樣呢?很明顯TPS從1萬多降到了2000多,因為包越大網卡就越慢了,這里看到其實已經到了200兆(雙網卡),把網卡跑滿了。我們做流平臺設計的時候,不希望鏈路被跑滿,所以我們給了個建議值,大小在5-10K。當然,線上我們采用的萬兆網卡。

  • 實時計算

1、實時計算集群

?

在SparkZK里直接寫HA,可以減少不必要的MR提高IO,減少IO消耗。

Kafka+Strom (ZK)

2、Spark實踐

直接寫HDFS底層文件

自動創建不存在的Hive分區

相應Metaq的日志切割,這一點上現在的Kafka是沒有問題的,當時的日志切割會導致網絡連接超時,我們查看源代碼發現確實會堵塞,我們的解決方法是把切割調成多色或分區調多。

?

不要定時的killJob。早期的Spark版本因為大批量的killJob導致一些不穩定的情況,某些job其實是沒有被完全覆蓋,假死在那里的。

上一篇:Joomla! 3.7.0 出現 SQL 注入漏洞

下一篇:WannaCry 不相信眼淚 它需要你的安全防御與響應能力