如何從 Spark 的 DataFrame 中取出具體某一行?
如何從 Spark 的 DataFrame 中取出具體某一行?
根據(jù)阿里專家Spark的DataFrame不是真正的DataFrame-秦續(xù)業(yè)的文章-知乎[1]的文章:
- DataFrame 應(yīng)該有『保證順序,行列對(duì)稱』等規(guī)律
- 因此「Spark DataFrame 和 Koalas 不是真正的 DataFrame」
確實(shí)可以運(yùn)行,但卻看到一句話,大意是數(shù)據(jù)會(huì)被放到一個(gè)分區(qū)來(lái)執(zhí)行,這正是因?yàn)閿?shù)據(jù)本身之間并不保證順序,因此只能把數(shù)據(jù)收集到一起,排序,再調(diào)用 shift。這樣就不再是一個(gè)分布式的程序了,甚至比 pandas 本身更慢。
我們可以明確一個(gè)前提:Spark 中 DataFrame 是 RDD 的擴(kuò)展,限于其分布式與彈性內(nèi)存特性,我們沒(méi)法直接進(jìn)行類似 df.iloc(r, c) 的操作來(lái)取出其某一行。
但是現(xiàn)在我有個(gè)需求,分箱,具體來(lái)講,需要『排序后遍歷每一行及其鄰居比如 i 與 i+j』,因此,我們必須能夠獲取數(shù)據(jù)的某一行!
不知道有沒(méi)有高手有好的方法?我只想到了以下幾招!
1/3排序后select再collect
collect 是將 DataFrame 轉(zhuǎn)換為數(shù)組放到內(nèi)存中來(lái)。但是 Spark 處理的數(shù)據(jù)一般都很大,直接轉(zhuǎn)為數(shù)組,會(huì)爆內(nèi)存。
因此不能直接 collect 。
要處理哪一列,就直接 select('列名') 取出這一列就好,再 collect 。我的數(shù)據(jù)有 2e5 * 2e4 這么多,因此 select 后只剩一列大小為 2e5 * 1 ,還是可以 collect 的。
這顯然不是個(gè)好方法!因?yàn)闊o(wú)法處理真正的大數(shù)據(jù),比如行很多時(shí)。
2/3排序后加index然后用SQL查找
給 DataFrame 實(shí)例 .sort("列名") 后,用 SQL 語(yǔ)句查找:
- select 列名 from df_table where 索引列名 = i
我對(duì)于 SQL 不是很了解,因此這個(gè)做法只是在構(gòu)思階段。
此外,我不清楚 SQL 的性能!我要調(diào)用很多次 df.iloc[i, 列] ,那這樣會(huì)不會(huì)太慢了?
3/3排序后加index然后轉(zhuǎn)置查找列名
這個(gè)想法也只是停留在腦子里!因?yàn)闀?huì)有些難度。
給每一行加索引列,從0開(kāi)始計(jì)數(shù),然后把矩陣轉(zhuǎn)置,新的列名就用索引列來(lái)做。
之后再取第 i 個(gè)數(shù),就 df(i.toString) 就行。
這個(gè)方法似乎靠譜。
附加方案:ml.feature.Bucketizer
- import org.apache.spark.ml.feature.{Bucketizer, QuantileDiscretizer}
spark中 Bucketizer 的作用和我實(shí)現(xiàn)的需求差不多(盡管細(xì)節(jié)不同),我猜測(cè)其中也應(yīng)該有相似邏輯。有能力和精力了應(yīng)該去讀讀源碼,看看官方怎么實(shí)現(xiàn)的。
參考資料
[1]Spark的DataFrame不是真正的DataFrame-秦續(xù)業(yè)的文章-知乎:
https://zhuanlan.zhihu.com/p/135329592