阿里Blink 正式開源重要優化點解讀
Apache Flink 是德國柏林工業大學的幾個博士生和研究生從學校開始做起來的項目,早期叫做Stratosphere。2014 年,StratoSphere 項目中的核心成員從學校出來開發了Flink,同時將Flink 計算的主流方向定位為流計算,並在同年將Flink 捐贈Apache,後來快速孵化成為Apache 的頂級項目。
阿里巴巴在2015 年開始嘗試使用Flink。但是阿里的業務體量非常龐大,挑戰也很多。彼時的Flink 不管是規模還是穩定性尚未經歷實踐,成熟度有待商榷。為了把這麼大的業務體量支持好,我們不得不在Flink 之上做了一系列的改進,所以阿里巴巴維護了一個內部版本的Flink,它的名字叫做Blink。
基於Blink 的計算平台於2016 年正式上線。截至目前,阿里絕大多數的技術部門都在使用Blink。Blink 一直在阿里內部錯綜複雜的業務場景中鍛煉成長著。對於內部用戶反饋的各種性能、資源使用率、易用性等諸多方面的問題,Blink 都做了針對性的改進。
雖然現在Blink 在阿里內部用的最多的場景主要還是流計算,但是在批計算場景也有不少業務上線使用了。例如,搜索和推薦的算法業務平台就同時將Blink 用於流計算和批處理。Blink 被用來實現了流批一體化的樣本生成和特徵抽取流程,能夠處理的特徵數達到了數千億,而且每秒鐘能處理數億條消息。在這個場景的批處理中,我們單個作業處理的數據量已經超過400T,並且為了節省資源,我們的批處理作業是和流計算作業以及搜索的在線引擎運行在同樣的機器上。流批一體化已經在阿里巴巴取得了極大的成功,我們希望這種成功以及阿里巴巴內部的經驗都能夠帶回給社區。
Blink 開源的背景
其實從我們選擇Flink 的第一天開始,我們就一直和社區緊密合作。過去的這幾年我們也一直在把阿里對Flink 的改進推回社區。從2016 年開始我們已經將流計算SQL 的大部分功能、針對runtime 的穩定性和性能優化做的若干重要設計都推回了社區。但是Blink 本身發展迭代的速度非常快,而社區有自己的步伐,很多時候可能無法把我們的變更及時推回去。對於社區來說,一些大的功能和重構,需要達成共識後,才能被接受,這樣才能更好地保證開源項目的質量,但是同時就會導致推入的速度變得相對較慢。
經過這幾年的迭代,我們這邊和社區之間的差距已經變得比較大了。Blink有一些很好的新功能,比如性能優越的批處理功能,在社區的版本是沒有的。在過去這段時間裡,我們不斷聽到有人在詢問Blink的各種新功能,期望Blink盡快開源的呼聲越來越大。我們一直在思考如何開源的問題。一種方案就是和以前一樣,繼續把各種功能和優化分解,逐個和社區討論,慢慢地推回Flink,但這顯然不是大家所期待的。另一個方案,就是先完整地盡可能多地把代碼開源,讓社區的開發者能夠盡快試用起來。第二個方案很快收到社區廣大用戶的支持。因此,從2018年年中開始我們就開始做開源的相關準備。經過半年的努力,我們終於把大部分Blink的功能梳理好,開源了出來。
Blink 開源的方式
**我們把代碼貢獻出來,是為了讓大家能先嘗試一些他們感興趣的功能。Blink 永遠不會單獨成為一個獨立的開源項目來運作,它一定是Flink 的一部分。開源後我們期望能找到辦法以最快的方式將Blink merge 到Flink 中去。**Blink 開源只有一個目的,就是希望Flink 做得更好。
Apache Flink 是一個社區項目,Blink 以什麼樣的形式進入Flink 是最合適的,怎麼貢獻是社區最希望的方式,我們都要和社區一起討論。在過去的一段時間內,我們在Flink 社區徵求了廣泛的意見,大家一致認為將本次開源的Blink 代碼作為Flink 的一個branch 直接推回到Apache Flink 項目中是最合適的方式。並且我們和社區也一起討論規劃出一套能夠快速merge Blink 到Flink master 中的方案(具體細節可以查看Flink 社區正在討論的FLIP32)。
我們期望這個merge能夠在很短的時間內完成。這樣我們之後的Machine Learning等其他新功能就可以直接推回到Flink master。相信用不了多久,Flink和Blink就完全合二為一了。在那之後,阿里巴巴將直接使用Flink用於生產,並同時協助社區一起來維護Flink。
本次開源的Blink 的主要功能和優化點
本次開源的Blink 代碼在Flink1.5.1 版本之上,加入了大量的新功能,以及在性能和穩定性上的各種優化。
主要貢獻包括:阿里巴巴在流計算上積累的一些新功能和性能的優化,一套完整的(能夠跑通全部TPC-H/TPC-DS,能夠讀取Hive meta 和data)高性能Batch SQL,以及一些以提升易用性為主的功能(包括支持更高效的interactive programming,與zeppelin 更緊密的結合,以及體驗和性能更佳的Flink web)。
未來我們還將繼續給Flink貢獻在AI、IoT以及其他新領域的功能和優化。更多的關於這一版本Blink release的細節,請參考Blink代碼根目錄下的README.md文檔。下面,我來分模塊介紹下Blink主要的新的功能和優化點。
Runtime
為了更好地支持batch processing,以及解決阿里巴巴大規模生產場景中遇到的各種挑戰,Blink 對Runtime 架構、效率、穩定性方面都做了大量改進。
在架構方面,首先Blink 引入了Pluggable Shuffle Architecture,開發者可以根據不同的計算模型或者新硬件的需要實現不同的shuffle 策略進行適配。此外Blink 還引入新的調度架構,容許開發者根據計算模型自身的特點定制不同調度器。為了優化性能,Blink 可以讓算子更加靈活的chain 在一起,避免了不必要的數據傳輸開銷。在Pipeline Shuffle 模式中,使用了ZeroCopy 減少了網絡層內存消耗。在BroadCast Shuffle 模式中,Blink 優化掉了大量的不必要的序列化和反序列化開銷。此外,Blink 提供了全新的JM FailOver 機制,JM 發生錯誤之後,新的JM 會重新接管整個JOB 而不是重啟JOB,從而大大減少了JM FailOver 對JOB 的影響。
最後,Blink 也開發了對Kubernetes 的支持。不同於Standalone 模式在Kubernetes 上的拉起方式,在基於Flink FLIP6 的架構上基礎之上,Blink 根據job 的資源需求動態的申請/ 釋放Pod 來運行TaskExecutor,實現了資源彈性,提升了資源的利用率。
SQL/TableAPI
SQL/TableAPI 架構上的重構和性能的優化是Blink 本次開源版本的一個重大貢獻。
首先,我們對SQL engine 的架構做了較大的調整。提出了全新的Query Processor(QP), 它包括了一個優化層(Query Optimizer)和一個算子層(Query Executor)。這樣一來,流計算和批計算的在這兩層大部分的設計工作就能做到盡可能地複用。另外,SQL 和TableAPI 的程序最終執行的時候將不會翻譯到DataStream 和DataSet 這兩個API 上,而是直接構建到可運行的DAG 上來,這樣就使得物理執行算子的設計不完全依賴底層的API,有了更大的靈活度,同時執行代碼也能夠被靈活的codegen 出來。
唯一的一個影響就是這個版本的SQL 和TableAPI 不能和DataSet 這個API 進行互相轉換,但仍然保留了和DataStream API 互相轉換的能力(將DataStream 註冊成表,或將Table 轉成DataStream 後繼續操作)。未來,我們計劃把dataset 的功能慢慢都在DataStream 和TableAPI 上面實現。到那時DataStream 和SQL 以及tableAPI 一樣,是一個可以同時描述bounded 以及unbounded processing 的API。
除了架構上的重構,Blink 還在具體實現上做了較多比較大的重構。
首先,Blink 引入了二進制的數據結構BinaryRow,極大的減少了數據存儲上的開銷以及數據在序列化和反序列化上計算的開銷。
其次,在算子的實現層面,Blink 在更廣範圍內引入了CodeGen 技術。由於預先知道算子需要處理的數據的類型,在QP 層內部就可以直接生成更有針對性更高效的執行代碼。Blink 的算子會動態的申請和使用資源,能夠更好的利用資源,提升效率,更加重要的是這些算子對資源有著比較好的控制,不會發生OutOfMemory 的問題。
此外,針對流計算場景,Blink 加入了miniBatch 的執行模式,在aggregate、join 等需要和state 頻繁交互且往往又能先做部分reduce 的場景中,使用miniBatch 能夠極大的減少IO,從而成數量級的提升性能。除了上面提到的這些重要的重構和功能點,Blink 還實現了完整的SQL DDL,帶emit 策略的流計算DML,若干重要的SQL 功能,以及大量的性能優化策略。
有了上面提到的諸多架構和實現上的重構。Blink 的SQL/tableAPI 在功能和性能方面都取得了脫胎換骨的變化。在批計算方面,首先Blink batch SQL 能夠完整地跑通TPC-H 和TPC-DS,且性能上有了極大的提升。
如上圖所示,是這次開源的Blink版本和spark 2.3.1的TPC-DS的benchmark性能對比。柱狀圖的高度代表了運行的總時間,高度越低說明性能越好。可以看出,Blink在TPC-DS上和Spark相比有著非常明顯的性能優勢,而且這種性能優勢隨著數據量的增加而變得越來越大。在實際的場景這種優勢已經超過Spark三倍,在流計算性能上我們也取得了類似的提升。我們線上的很多典型作業,性能是原來的3到5倍。在有數據傾斜的場景,以及若干比較有挑戰的TPC-H query,流計算性能甚至得到了數十倍的提升。
除了標準的Relational SQL API。TableAPI 在功能上是SQL 的超集,因此在SQL 上所有新加的功能,我們在tableAPI 也添加了相對應的API。除此之外,我們還在TableAPI 上引入了一些新的功能。其中一個比較重要是cache 功能。在批計算場景下,用戶可以根據需要來cache 計算的中間結果,從而避免不必要的重複計算,它極大地增強了interactive programming 體驗。我們後續會在tableAPI 上添加更多有用的功能。其實很多新功能已經在社區展開討論並被社區接受,例如我們在tableAPI 增加了對一整行操作的算子map/flatMap/aggregate/flatAggregate (Flink FLIP29) 等等。
Hive 的兼容性
我們這次開源的版本實現了在元數據(meta data)和數據層將Flink 和Hive 對接和打通。國內外很多公司都還在用Hive 在做自己的批處理。對於這些用戶,現在使用這次Blink 開源的版本,就可以直接用Flink SQL 去查詢Hive 的數據,真正能夠做到在Hive 引擎和Flink 引擎之間的自由切換。
為了打通元數據,我們重構了Flink catalog 的實現,並且增加了兩種catalog,一個是基於內存存儲的FlinkInMemoryCatalog,另外一個是能夠橋接Hive metaStore 的HiveCatalog。有了這個HiveCatalog,Flink 作業就能讀取Hive 的metaData。為了打通數據,我們實現了HiveTableSource,使得Flink job 可以直接讀取Hive 中普通表和分區表的數據。因此,通過這個版本,用戶可以使用Flink SQL 讀取已有的Hive meta 和data,做數據處理。未來我們將在Flink 上繼續加大對Hive 兼容性的支持,包括支持Hive 特有的query,data type,和Hive UDF 等等。
Zeppelin for Flink
為了提供更好的可視化和交互式體驗,我們做了大量的工作讓Zeppelin 能夠更好的支持Flink。這些改動有些是在Flink 上的,有些是在Zeppelin 上的。在這些改動全部推回Flink 和Zeppelin 社區之前,大家可以使用這個Zeppelin image (具體細節請參考Blink 代碼裡的docs/quickstart/zeppelin_quickstart.md) 來測試和使用這些功能。
這個用於測試的Zeppelin 版本,首先很好地融合和集成了Flink 的多種運行模式以及運維界面。使用文本SQL 和tableAPI 可以自如的查詢Flink 的static table 和dynamic table。此外,針對Flink 的流計算的特點,這一版Zeppelin 也很好地支持了savepoint,用戶可以在界面上暫停作業,然後再從savepoint 恢復繼續運行作業。
在數據展示方面,除了傳統的數據分析界面,我們也添加了流計算的翻牌器和時間序列展示等等功能。為了方便用戶試用,我們在這一版zeppelin 中提供3 個built-in 的Flink tutorial 的例子: 一個是做Streaming ETL 的例子, 另外兩個分別是做Flink Batch, Flink Stream 的基礎樣例。
Flink Web
我們對Flink Web 的易用性與性能等多個方面做了大量的改進,從資源使用、作業調優、日誌查詢等維度新增了大量功能,使得用戶可以更方便的對Flink 作業進行運維。
在資源使用方面,新增了Cluster、TaskManager 與Job 三個級別的資源信息,使得資源的申請與使用情況一目了然。作業的拓撲關係及數據流向可以追溯至Operator 級別,Vertex 增加了InQueue,OutQueue 等多項指標,可以方便的追踪數據的反壓、過濾及傾斜情況。TaskManager 和JobManager 的日誌功能得到大幅度加強,從Job、Vertex、SubTask 等多個維度都可以關聯至對應日誌,提供多日誌文件訪問入口,以及分頁展示查詢和日誌高亮功能。
另外,我們使用了較新的Angular 7.0對Flink web進行了全面重構,頁面運行性能有了一倍以上的提升。在大數據量情況下也不會發生頁面卡死或者卡頓情況。同時對頁面的交互邏輯進行了整體優化,絕大部分關聯信息在單個頁面就可以完成查詢和比對工作,減少了大量不必要的跳轉。
未來的規劃
Blink 邁出了全面開源的第一步,接下來我們會和社區合作,盡可能以最快的方式將Blink 的功能和性能上的優化merge 回Flink。
本次的開源版本一方面貢獻了Blink多年在流計算的積累,另一方面又重磅推出了在批處理上的成果。接下來,我們會持續給Flink社區貢獻其他方面的功能。我們期望每過幾個月就能看到技術上有一個比較大的亮點貢獻到社區。下一個亮點應該是對機器學習的支持。
要把機器學習支持好,有一系列的工作要做,包括引擎的功能、性能和易用性。這裡面大部分的工作我們已經開發完成,並且很多功能都已經在阿里巴巴內部服務上線了。除了技術上創新以及新功能之外,Flink的易用性和外圍生態也非常重要。我們已經啟動了若干這方面的項目,包括Python以及Go等多語言支持、Flink集群管理、Notebook以及機器學習平台等等。這些項目有些會成為Flink自身的一部分貢獻回社區,有些不是。但它們都基於Flink,是Flink生態的一個很好的補充。獨立於Flink之外的那些項目,我們都也在認真的考慮開源出來。
總之,Blink 在開源的第一天起,就已經完全all-in 的融入了Flink 社區,我們希望所有的開發者看到我們的誠意和決心。未來,無論是功能還是生態,我們都會在Flink 社區加大投入,我們也將投入力量做Flink 社區的運營,讓Flink 真正在中國、乃至全世界大規模地使用起來。我們衷心的希望更多的人加入,一起把Apache Flink 開源社區做的更好!
文/ AI前線