訂閱
糾錯
加入自媒體

Shuffle核心概念、Shuffle調優及故障排除

2021-03-23 14:42
園陌
關注

三、 SortShuffle解析

SortShuffleManager的運行機制主要分成兩種,一種是普通運行機制,另一種是bypass運行機制。當shuffle read task的數量小于等于spark.shuffle.sort.bypassMergeThreshold參數的值時(默認為200),就會啟用bypass機制。

1. 普通運行機制

在該模式下,數據會先寫入一個內存數據結構中,此時根據不同的shuffle算子,可能選用不同的數據結構。如果是reduceByKey這種聚合類的shuffle算子,那么會選用Map數據結構,一邊通過Map進行聚合,一邊寫入內存;如果是join這種普通的shuffle算子,那么會選用Array數據結構,直接寫入內存。接著,每寫一條數據進入內存數據結構之后,就會判斷一下,是否達到了某個臨界閾值。如果達到臨界閾值的話,那么就會嘗試將內存數據結構中的數據溢寫到磁盤,然后清空內存數據結構。

在溢寫到磁盤文件之前,會先根據key對內存數據結構中已有的數據進行排序。排序過后,會分批將數據寫入磁盤文件。默認的batch數量是10000條,也就是說,排序好的數據,會以每批1萬條數據的形式分批寫入磁盤文件。寫入磁盤文件是通過Java的BufferedOutputStream實現的。BufferedOutputStream是Java的緩沖輸出流,首先會將數據緩沖在內存中,當內存緩沖滿溢之后再一次寫入磁盤文件中,這樣可以減少磁盤IO次數,提升性能。

一個task將所有數據寫入內存數據結構的過程中,會發生多次磁盤溢寫操作,也就會產生多個臨時文件。最后會將之前所有的臨時磁盤文件都進行合并,這就是merge過程,此時會將之前所有臨時磁盤文件中的數據讀取出來,然后依次寫入最終的磁盤文件之中。此外,由于一個task就只對應一個磁盤文件,也就意味著該task為下游stage的task準備的數據都在這一個文件中,因此還會單獨寫一份索引文件,其中標識了下游各個task的數據在文件中的start offset與end offset。

SortShuffleManager由于有一個磁盤文件merge的過程,因此大大減少了文件數量。比如第一個stage有50個task,總共有10個Executor,每個Executor執行5個task,而第二個stage有100個task。由于每個task最終只有一個磁盤文件,因此此時每個Executor上只有5個磁盤文件,所有Executor只有50個磁盤文件。

普通運行機制的SortShuffleManager工作原理如下圖所示:

普通運行機制的SortShuffleManager工作原理2. bypass運行機制

bypass運行機制的觸發條件如下:

shuffle map task數量小于spark.shuffle.sort.bypassMergeThreshold=200參數的值。不是聚合類的shuffle算子。

此時,每個task會為每個下游task都創建一個臨時磁盤文件,并將數據按key進行hash然后根據key的hash值,將key寫入對應的磁盤文件之中。當然,寫入磁盤文件時也是先寫入內存緩沖,緩沖寫滿之后再溢寫到磁盤文件的。最后,同樣會將所有臨時磁盤文件都合并成一個磁盤文件,并創建一個單獨的索引文件。

該過程的磁盤寫機制其實跟未經優化的HashShuffleManager是一模一樣的,因為都要創建數量驚人的磁盤文件,只是在最后會做一個磁盤文件的合并而已。因此少量的最終磁盤文件,也讓該機制相對未經優化的HashShuffleManager來說,shuffle read的性能會更好。

而該機制與普通SortShuffleManager運行機制的不同在于:第一,磁盤寫機制不同;第二,不會進行排序。也就是說,啟用該機制的最大好處在于,shuffle write過程中,不需要進行數據的排序操作,也就節省掉了這部分的性能開銷。

bypass運行機制的SortShuffleManager工作原理如下圖所示:

bypass運行機制的SortShuffleManager工作原理

四、map和reduce端緩沖區大小

在Spark任務運行過程中,如果shuffle的map端處理的數據量比較大,但是map端緩沖的大小是固定的,可能會出現map端緩沖數據頻繁spill溢寫到磁盤文件中的情況,使得性能非常低下,通過調節map端緩沖的大小,可以避免頻繁的磁盤IO操作,進而提升Spark任務的整體性能。

map端緩沖的默認配置是32KB,如果每個task處理640KB的數據,那么會發生640/32 = 20次溢寫,如果每個task處理64000KB的數據,即會發生64000/32=2000次溢寫,這對于性能的影響是非常嚴重的。

map端緩沖的配置方法:

val conf = new SparkConf()
 .set("spark.shuffle.file.buffer", "64")

Spark Shuffle過程中,shuffle reduce task的buffer緩沖區大小決定了reduce task每次能夠緩沖的數據量,也就是每次能夠拉取的數據量,如果內存資源較為充足,適當增加拉取數據緩沖區的大小,可以減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。

reduce端數據拉取緩沖區的大小可以通過spark.reducer.maxSizeInFlight參數進行設置,默認為48MB。該參數的設置方法如下:

reduce端數據拉取緩沖區配置:

val conf = new SparkConf()
 .set("spark.reducer.maxSizeInFlight", "96")

五、reduce端重試次數和等待時間間隔

Spark Shuffle過程中,reduce task拉取屬于自己的數據時,如果因為網絡異常等原因導致失敗會自動進行重試。對于那些包含了特別耗時的shuffle操作的作業,建議增加重試最大次數(比如60次),以避免由于JVM的full gc或者網絡不穩定等因素導致的數據拉取失敗。在實踐中發現,對于針對超大數據量(數十億~上百億)的shuffle過程,調節該參數可以大幅度提升穩定性。

reduce端拉取數據重試次數可以通過spark.shuffle.io.maxRetries參數進行設置,該參數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗,默認為3,該參數的設置方法如下:

reduce端拉取數據重試次數配置:

val conf = new SparkConf()
 .set("spark.shuffle.io.maxRetries", "6")

Spark Shuffle過程中,reduce task拉取屬于自己的數據時,如果因為網絡異常等原因導致失敗會自動進行重試,在一次失敗后,會等待一定的時間間隔再進行重試,可以通過加大間隔時長(比如60s),以增加shuffle操作的穩定性。

reduce端拉取數據等待間隔可以通過spark.shuffle.io.retryWait參數進行設置,默認值為5s,該參數的設置方法如下:

reduce端拉取數據等待間隔配置:

val conf = new SparkConf()
 .set("spark.shuffle.io.retryWait", "60s")

<上一頁  1  2  3  下一頁>  
聲明: 本文由入駐維科號的作者撰寫,觀點僅代表作者本人,不代表OFweek立場。如有侵權或其他問題,請聯系舉報。

發表評論

0條評論,0人參與

請輸入評論內容...

請輸入評論/評論長度6~500個字

您提交的評論過于頻繁,請輸入驗證碼繼續

暫無評論

暫無評論

    人工智能 獵頭職位 更多
    掃碼關注公眾號
    OFweek人工智能網
    獲取更多精彩內容
    文章糾錯
    x
    *文字標題:
    *糾錯內容:
    聯系郵箱:
    *驗 證 碼:

    粵公網安備 44030502002758號