10億數(shù)據(jù)如何最快速插入MySQL
最快的速度把10億條數(shù)據(jù)導入到數(shù)據(jù)庫,首先需要明確一下,10億條數(shù)據(jù)什么形式存在哪里,每條數(shù)據(jù)多大,是否有序?qū)?,是否不能重復,?shù)據(jù)庫是否是MySQL?
假設有如下約束
- 10億條數(shù)據(jù),每條數(shù)據(jù) 1 Kb
- 數(shù)據(jù)內(nèi)容是非結(jié)構(gòu)化的用戶訪問日志,需要解析后寫入到數(shù)據(jù)庫
- 數(shù)據(jù)存放在Hdfs 或 S3 分布式文件存儲里
- 10億條數(shù)據(jù)并不是1個大文件,而是被切分為多個文件,后綴標記順序
- 要求有序?qū)?,盡量不重復
- 數(shù)據(jù)庫是 MySQL
首先考慮10億數(shù)據(jù)寫到MySQL單表可行嗎?
數(shù)據(jù)庫單表能支持10億嗎?
答案是不能,單表推薦的值是2000W以下。這個值怎么計算出來的呢?
MySQL索引數(shù)據(jù)結(jié)構(gòu)是B+樹,全量數(shù)據(jù)存儲在主鍵索引,也就是聚簇索引的葉子結(jié)點上。B+樹插入和查詢的性能和B+樹層數(shù)直接相關,2000W以下是3層索引,而2000w以上則可能為四層索引。
Mysql b+索引的葉子節(jié)點每頁大小16K。當前每條數(shù)據(jù)正好1K,所以簡單理解為每個葉子節(jié)點存儲16條數(shù)據(jù)。b+索引每個非葉子節(jié)點大小也是16K,但是其只需要存儲主鍵和指向葉子節(jié)點的指針,我們假設主鍵的類型是 BigInt,長度為 8 字節(jié),而指針大小在 InnoDB 中設置為 6 字節(jié),這樣一共 14 字節(jié),這樣一個非葉子節(jié)點可以存儲 16 * 1024/14=1170。
也就是每個非葉子節(jié)點可關聯(lián)1170個葉子節(jié)點,每個葉子節(jié)點存儲16條數(shù)據(jù)。由此可得到B+樹索引層數(shù)和存儲數(shù)量的表格。2KW 以上 索引層數(shù)為 4 層,性能更差。
圖片
為了便于計算,我們可以設計單表容量在1KW,10億條數(shù)據(jù)共100個表。
如何高效的寫入數(shù)據(jù)庫
單條寫入數(shù)據(jù)庫性能比較差,可以考慮批量寫入數(shù)據(jù)庫,批量數(shù)值動態(tài)可調(diào)整。每條1K,默認可先調(diào)整為100條批量寫入。
批量數(shù)據(jù)如何保證數(shù)據(jù)同時寫成功?MySQL Innodb存儲引擎保證批量寫入事務同時成功或失敗。
寫庫時要支持重試,寫庫失敗重試寫入,如果重試N次后依然失敗,可考慮單條寫入100條到數(shù)據(jù)庫,失敗數(shù)據(jù)打印記錄,丟棄即可。
此外寫入時按照主鍵id順序順序?qū)懭肟梢赃_到最快的性能,而非主鍵索引的插入則不一定是順序的,頻繁地索引結(jié)構(gòu)調(diào)整會導致插入性能下降。最好不創(chuàng)建非主鍵索引,或者在表創(chuàng)建完成后再創(chuàng)建索引,以保證最快的插入性能。
是否需要并發(fā)寫同一個表
不能
- 并發(fā)寫同一個表無法保證數(shù)據(jù)寫入時是有序的。
- 提高批量插入的閾值,在一定程度上增加了插入并發(fā)度。無需再并發(fā)寫入單表
MySQL存儲引擎的選擇
Myisam 比innodb有更好的插入性能,但失去了事務支持,批量插入時無法保證同時成功或失敗,所以當批量插入超時或失敗時,如果重試,勢必對導致一些重復數(shù)據(jù)的發(fā)生。但是為了保證更快的導入速度,可以把myisam存儲引擎列為計劃之一。
圖片
從數(shù)據(jù)可以看到批量寫入明顯優(yōu)于單條寫入。并且在innodb關閉即時刷新磁盤策略后,innodb插入性能沒有比myisam差太多。
innodb_flush_log_at_trx_commit: 控制MySQL刷新數(shù)據(jù)到磁盤的策略。
- 默認=1,即每次事務提交都會刷新數(shù)據(jù)到磁盤,安全性最高不會丟失數(shù)據(jù)。
- 當配置為0、2 會每隔1s刷新數(shù)據(jù)到磁盤, 在系統(tǒng)宕機、mysql crash時可能丟失1s的數(shù)據(jù)。
考慮到Innodb在關閉即刷新磁盤策略時,批量性能也不錯,所以暫定先使用innodb(如果公司MySQL集群不允許改變這個策略值,可能要使用MyIsam了。)。線上環(huán)境測試時可以重點對比兩者的插入性能。
要不要進行分庫
mysql 單庫的并發(fā)寫入是有性能瓶頸的,一般情況5K TPS寫入就很高了。
當前數(shù)據(jù)都采用SSD 存儲,性能應該更好一些。但如果是HDD的話,雖然順序讀寫會有非常高的表現(xiàn),但HDD無法應對并發(fā)寫入,例如每個庫10張表,假設10張表在并發(fā)寫入,每張表雖然是順序?qū)懭?,由于多個表的存儲位置不同,HDD只有1個磁頭,不支持并發(fā)寫,只能重新尋道,耗時將大大增加,失去順序讀寫的高性能。
所以對于HDD而言,單庫并發(fā)寫多個表并不是好的方案?;氐絊SD的場景,不同SSD廠商的寫入能力不同,對于并發(fā)寫入的能力也不同,有的支持500M/s,有的支持1G/s讀寫,有的支持8個并發(fā),有的支持4個并發(fā)。在線上實驗之前,我們并不知道實際的性能表現(xiàn)如何。
所以在設計上要更加靈活,需要支持以下能力
- 支持配置數(shù)據(jù)庫的數(shù)量
- 支持配置并發(fā)寫表的數(shù)量,(如果MySQL是HDD磁盤,只讓一張表順序?qū)懭?,其他任務等待?/li>
通過以上配置,靈活調(diào)整線上數(shù)據(jù)庫的數(shù)量,以及寫表并發(fā)度,無論是HDD還是SSD,我們系統(tǒng)都能支持。不論是什么廠商型號的SSD,性能表現(xiàn)如何,都可調(diào)整配置,不斷獲得更高的性能。這也是后面設計的思路,不固定某一個閾值數(shù)量,都要動態(tài)可調(diào)整。
接下來聊一下文件讀取,10億條數(shù)據(jù),每條1K,一共是931G。近1T大文件,一般不會生成如此大的文件。所以我們默認文件已經(jīng)被大致切分為100個文件。每個文件數(shù)量大致相同即可。為什么切割為100個呢?切分為1000個,增大讀取并發(fā),不是可以更快導入數(shù)據(jù)庫嗎?剛才提到數(shù)據(jù)庫的讀寫性能受限于磁盤,但任何磁盤相比寫操作,讀操作都要更快。尤其是讀取時只需要從文件讀取,但寫入時MySQL要執(zhí)行建立索引,解析SQL、事務等等復雜的流程。所以寫的并發(fā)度最大是100,讀文件的并發(fā)度無需超過100。
更重要的是讀文件并發(fā)度等于分表數(shù)量,有利于簡化模型設計。即100個讀取任務,100個寫入任務,對應100張表。
如何保證寫入數(shù)據(jù)庫有序
既然文件被切分為100個10G的小文件,可以按照文件后綴+ 在文件行號 作為記錄的唯一鍵,同時保證同一個文件的內(nèi)容被寫入同一個表。例如
- index_90.txt 被寫入 數(shù)據(jù)庫database_9,table_0 ,
- index_67.txt被寫入數(shù)據(jù)庫 database_6,table_7。
這樣每個表都是有序的。整體有序通過數(shù)據(jù)庫后綴+表名后綴實現(xiàn)。
如何更快地讀取文件
10G的文件顯然不能一次性讀取到內(nèi)存中,場景的文件讀取包括
- Files.readAllBytes一次性加載內(nèi)內(nèi)存
- FileReader+ BufferedReader 逐行讀取
- File+ BufferedReader
- Scanner逐行讀取
- Java NIO FileChannel緩沖區(qū)方式讀取
在MAC上,使用這幾種方式的讀取3.4G大小文件的性能對比
圖片
由此可見 使用JavaNIO FileChannnel明顯更優(yōu),但是FileChannel的方式是先讀取固定大小緩沖區(qū),不支持按行讀取。也無法保證緩沖區(qū)正好包括整數(shù)行數(shù)據(jù)。如果緩沖區(qū)最后一個字節(jié)正好卡在一行數(shù)據(jù)中間,還需要額外配合讀取下一批數(shù)據(jù)。如何把緩沖區(qū)變?yōu)橐恍行袛?shù)據(jù),比較困難。
File file = new File("/xxx.txt");
FileInputStream fileInputStream = null;
long now = System.currentTimeMillis();
try {
fileInputStream = new FileInputStream(file);
FileChannel fileChannel = fileInputStream.getChannel();
int capacity = 1 * 1024 * 1024;//1M
ByteBuffer byteBuffer = ByteBuffer.allocate(capacity);
StringBuffer buffer = new StringBuffer();
int size = 0;
while (fileChannel.read(byteBuffer) != -1) {
//讀取后,將位置置為0,將limit置為容量, 以備下次讀入到字節(jié)緩沖中,從0開始存儲
byteBuffer.clear();
byte[] bytes = byteBuffer.array();
size += bytes.length;
}
System.out.println("file size:" + size);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
//TODO close資源.
}
System.out.println("Time:" + (System.currentTimeMillis() - now));
JavaNIO 是基于緩沖區(qū)的,ByteBuffer可轉(zhuǎn)為byte數(shù)組,需要轉(zhuǎn)為字符串,并且要處理按行截斷。
但是BufferedReader JavaIO方式讀取可以天然支持按行截斷,況且性能還不錯,10G文件,大致只需要讀取30s,由于導入的整體瓶頸在寫入部分,即便30s讀取完,也不會影響整體性能。所以文件讀取使用BufferedReader 逐行讀取。即方案3
如果協(xié)調(diào)讀文件任務和寫數(shù)據(jù)庫任務
這塊比較混亂,請耐心看完。
100個讀取任務,每個任務讀取一批數(shù)據(jù),立即寫入數(shù)據(jù)庫是否可以呢?前面提到了由于數(shù)據(jù)庫并發(fā)寫入的瓶頸,無法滿足1個庫同時并發(fā)大批量寫入10個表,所以100個任務同時寫入數(shù)據(jù)庫,勢必導致每個庫同時有10個表同時在順序?qū)?,這加劇了磁盤的并發(fā)寫壓力。為盡可能提高速度,減少磁盤并發(fā)寫入帶來的性能下降, 需要一部分寫入任務被暫停的。那么讀取任務需要限制并發(fā)度嗎?不需要。
假設寫入任務和讀取任務合并,會影響讀取任務并發(fā)度。初步計劃讀取任務和寫入任務各自處理,誰也不耽誤誰。但實際設計時發(fā)現(xiàn)這個方案較為困難。
最初的設想是引入Kafka,即100個讀取任務把數(shù)據(jù)投遞到Kafka,由寫入任務消費kafka寫入DB。100個讀取任務把消息投遞到Kafka,此時順序就被打亂了,如何保證有序?qū)懭霐?shù)據(jù)庫呢?我想到可以使用Kafka partition路由,即讀取任務id把同一任務的消息都路由到同一個partition,保證每個partition內(nèi)有序消費。
要準備多少個分片呢?100個很明顯太多,如果partition小于100個,例如10個。那么勢必存在多個任務的消息混合在一起。如果同一個庫的多個表在一個Kafka partition,且這個數(shù)據(jù)庫只支持單表批量寫入,不支持并發(fā)寫多個表。這個庫多個表的消息混在一個分片中,由于并發(fā)度的限制,不支持寫入的表對應的消息只能被丟棄。所以這個方案既復雜,又難以實現(xiàn)。
所以最終放棄了Kafka方案,也暫時放棄了將讀取和寫入任務分離的方案。
最終方案簡化為 讀取任務讀一批數(shù)據(jù),寫入一批。即任務既負責讀文件、又負責插入數(shù)據(jù)庫。
如何保證任務的可靠性
如果讀取任務進行到一半,宕機或者服務發(fā)布如何處理呢?或者數(shù)據(jù)庫故障,一直寫入失敗,任務被暫時終止,如何保證任務再次拉起時,再斷點處繼續(xù)處理,不會存在重復寫入呢?
剛才我們提到可以 為每一個記錄設置一個主鍵Id,即 文件后綴index+文件所在行號??梢酝ㄟ^主鍵id的方式保證寫入的冪等。
文件所在的行號,最大值 大致為 10G/1k = 10M,即10000000。拼接最大的后綴99。最大的id為990000000。
所以也無需數(shù)據(jù)庫自增主鍵ID,可以在批量插入時指定主鍵ID。
如果另一個任務也需要導入數(shù)據(jù)庫呢?如何實現(xiàn)主鍵ID隔離,所以主鍵ID還是需要拼接taskId。例如{taskId}{fileIndex}{fileRowNumber} 轉(zhuǎn)化為Long類型。如果taskId較大,拼接后的數(shù)值過大,轉(zhuǎn)化為Long類型可能出錯。
最重要的是,如果有的任務寫入1kw,有的其他任務寫入100W,使用Long類型無法獲知每個占位符的長度,存在沖突的可能性。而如果拼接字符串{taskId}_{fileIndex}_{fileRowNumber} ,新增唯一索引,會導致插入性能更差,無法滿足最快導入數(shù)據(jù)的訴求。所以需要想另一個方案。
可以考慮使用Redis記錄當前任務的進度。例如Redis記錄task的進度,批量寫入數(shù)據(jù)庫成功后,更新 task進度。
INCRBY KEY_NAME INCR_AMOUNT
指定當前進度增加100,例如 incrby task_offset_{taskId} 100。如果出現(xiàn)批量插入失敗的,則重試插入。多次失敗,則單個插入,單個更新redis。要確保Redis更新成功,可以在Redis更新時 也加上重試。
如果還不放心Redis進度和數(shù)據(jù)庫更新的一致性,可以考慮 消費 數(shù)據(jù)庫binlog,每一條記錄新增則redis +1 。
如果任務出現(xiàn)中斷,則首先查詢?nèi)蝿盏膐ffset。然后讀取文件到指定的offset繼續(xù) 處理。
如何協(xié)調(diào)讀取任務的并發(fā)度
前面提到了為了避免單個庫插入表的并發(fā)度過高,影響數(shù)據(jù)庫性能??梢钥紤]限制并發(fā)度。如何做到呢?
既然讀取任務和寫入任務合并一起。那么就需要同時限制讀取任務。即每次只挑選一批讀取寫入任務執(zhí)行。
在此之前需要設計一下任務表的存儲模型。
圖片
- bizId為了以后支持別的產(chǎn)品線,預設字段。默認為1,代表當前業(yè)務線。
- datbaseIndex 代表被分配的數(shù)據(jù)庫后綴
- tableIndex 代表被分配的表名后綴
- parentTaskId,即總的任務id
- offset可以用來記錄當前任務的進度
- 10億條數(shù)據(jù)導入數(shù)據(jù)庫,切分為100個任務后,會新增100個taskId,分別處理一部分數(shù)據(jù),即一個10G文件。
- status 狀態(tài)用來區(qū)分當前任務是否在執(zhí)行,執(zhí)行完成。
如何把任務分配給每一個節(jié)點,可以考慮搶占方式。每個任務節(jié)點都需要搶占任務,每個節(jié)點同時只能搶占1個任務。具體如何實現(xiàn)呢?可以考慮 每個節(jié)點都啟動一個定時任務,定期掃表,掃到待執(zhí)行子任務,嘗試執(zhí)行該任務。
如何控制并發(fā)呢?可以使用redission的信號量。key為數(shù)據(jù)庫id、
RedissonClient redissonClient = Redisson.create(config);
RSemaphore rSemaphore = redissonClient.getSemaphore("semaphore");
// 設置1個并發(fā)度
rSemaphore.trySetPermits(1);
rSemaphore.tryAcquire();//申請加鎖,非阻塞。
由任務負責定期輪訓,搶到名額后,就開始執(zhí)行任務。將該任務狀態(tài)置為Process,任務完成后或失敗后,釋放信號量。
圖片
但是使用信號量限流有個問題,如果任務忘記釋放信號量,或者進程Crash無法釋放信號量,如何處理呢?可以考慮給信號量增加一個超時時間。那么如果任務執(zhí)行過長,導致提前釋放信號量,另一個客戶單爭搶到信號量,導致 兩個客戶端同時寫一個任務如何處理呢?
what,明明是將10億數(shù)據(jù)導入數(shù)據(jù)庫,怎么變成分布式鎖超時的類似問題?
實際上 Redisson的信號量并沒有很好的辦法解決信號量超時問題,正常思維:如果任務執(zhí)行過長,導致信號量被釋放,解決這個問題只需要續(xù)約就可以了,任務在執(zhí)行中,只要發(fā)現(xiàn)快信號量過期了,就續(xù)約一段時間,始終保持信號量不過期。但是 Redission并沒有提供信號量續(xù)約的能力,怎么辦?
不妨換個思路,我們一直在嘗試讓多個節(jié)點爭搶信號量,進而限制并發(fā)度??梢栽囋囘x取一個主節(jié)點,通過主節(jié)點輪訓任務表。分三種情況,
情況1 當前執(zhí)行中數(shù)量小于并發(fā)度。
- 則選取id最小的待執(zhí)行任務,狀態(tài)置為進行中,通知發(fā)布消息。
- 消費到消息的進程,申請分布式鎖,開始處理任務。處理完成釋放鎖。借助于Redission分布式鎖續(xù)約,保證任務完成前,鎖不會超時。
情況2 當前執(zhí)行中數(shù)量等于并發(fā)度。
- 主節(jié)點嘗試 get 進行中任務是否有鎖。
- 如果沒有鎖,說明有任務執(zhí)行失敗,此時應該重新發(fā)布任務。如果有鎖,說明有任務正在執(zhí)行中。
情況3 當前執(zhí)行中數(shù)量大于并發(fā)度
- 上報異常情況,報警,人工介入
使用主節(jié)點輪訓任務,可以減少任務的爭搶,通過kafka發(fā)布消息,接收到消息的進程處理任務。為了保證更多的節(jié)點參與消費,可以考慮增加Kafka分片數(shù)。雖然每個節(jié)點可能同時處理多個任務,但是不會影響性能,因為性能瓶頸在數(shù)據(jù)庫。
那么主節(jié)點應該如何選取呢?可以通過Zookeeper+curator 選取主節(jié)點??煽啃员容^高。
10億條數(shù)據(jù)插入數(shù)據(jù)庫的時間影響因素非常多。包括數(shù)據(jù)庫磁盤類型、性能。數(shù)據(jù)庫分庫數(shù)量如果能切分1000個庫當然性能更快,要根據(jù)線上實際情況決策分庫和分表數(shù)量,這極大程度決定了寫入的速率。最后數(shù)據(jù)庫批量插入的閾值也不是一成不變的,需要不斷測試調(diào)整,以求得最佳的性能??梢园凑?00,1000,10000等不斷嘗試批量插入的最佳閾值。
最后總結(jié)一下幾點重要的
總結(jié)
- 要首先確認約束條件,才能設計方案。確定面試官主要想問的方向,例如1T文件如何切割為小文件,雖是難點,然而可能不是面試官想考察的問題。
- 從數(shù)據(jù)規(guī)模看,需要分庫分表,大致確定分表的規(guī)模。
- 從單庫的寫入瓶頸分析,判斷需要進行分庫。
- 考慮到磁盤對并發(fā)寫的支持力度不同,同一個庫多個表寫入的并發(fā)需要限制。并且支持動態(tài)調(diào)整,方便在線上環(huán)境調(diào)試出最優(yōu)值。
- MySQL innodb、myisam 存儲引擎對寫入性能支持不同,也要在線上對比驗證
- 數(shù)據(jù)庫批量插入的最佳閾值需要反復測試得出。
- 由于存在并發(fā)度限制,所以基于Kafka分離讀取任務和寫入任務比較困難。所以合并讀取任務和寫入任務。
- 需要Redis記錄任務執(zhí)行的進度。任務失敗后,重新導入時,記錄進度,可避免數(shù)據(jù)重復問題。
- 分布式任務的協(xié)調(diào)工作是難點,使用Redission信號量無法解決超時續(xù)約問題??梢杂芍鞴?jié)點分配任務+分布式鎖保證任務排他寫入。主節(jié)點使用Zookeeper+Curator選取。