自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

在SQLite中插入10億條Python VS Rust

開(kāi)發(fā) 后端
寫(xiě)腳本來(lái)進(jìn)行數(shù)據(jù)處理,比如說(shuō)給數(shù)據(jù)庫(kù)導(dǎo)入導(dǎo)出數(shù)據(jù),這種任務(wù)一般來(lái)說(shuō)最方便的方法是用python腳本,但是如果數(shù)據(jù)量比較大時(shí)候(比如上億條)時(shí)候Python就會(huì)超級(jí)慢,看到無(wú)法忍受。

在實(shí)際生活中,市場(chǎng)有這樣的案例:寫(xiě)腳本來(lái)進(jìn)行數(shù)據(jù)處理,比如說(shuō)給數(shù)據(jù)庫(kù)導(dǎo)入導(dǎo)出數(shù)據(jù),這種任務(wù)一般來(lái)說(shuō)最方便的方法是用python腳本,但是如果數(shù)據(jù)量比較大時(shí)候(比如上億條)時(shí)候Python就會(huì)超級(jí)慢,看到無(wú)法忍受。在這種案例時(shí)候該怎么做呢,有一個(gè)外國(guó)老哥分享了自己的實(shí)踐經(jīng)歷,并且對(duì)比了Python和Rust語(yǔ)言給SQLite插入十一條數(shù)據(jù)的情況,最后用Rust實(shí)現(xiàn)了在一分鐘來(lái)完成任務(wù)。我們?cè)诖朔窒硪幌略搶?shí)踐過(guò)程,希望能對(duì)大家有所啟迪,大家也可以嘗試自己最拿手方法來(lái)實(shí)現(xiàn)該例子,并對(duì)比一下具體性能。

概述

案例中的任務(wù)是SQLite數(shù)據(jù)庫(kù)插入10億條的數(shù)據(jù)。表(user)數(shù)據(jù)結(jié)構(gòu)和約束如下:

  1. create table IF NOT EXISTS user 
  2. id INTEGER not null primary key, 
  3. area CHAR(6), 
  4. age INTEGER not null, 
  5. active INTEGER not null 
  6. ); 

隨機(jī)生成數(shù)據(jù)。其中are列為六位數(shù)的區(qū)號(hào)(任何六位數(shù)字)。age將是5、10 或15中的一個(gè)數(shù)字。Active為0或1。

  • 實(shí)驗(yàn)環(huán)境硬件配置為:MacBook Pro,2019(2.4 GHz 四核i5,8GB內(nèi)存,256GB SSD硬盤(pán),Big Sur 11.1)。
  • 任務(wù)前提:任務(wù)無(wú)需保持程序穩(wěn)健性,如果進(jìn)程崩潰并且所有數(shù)據(jù)都丟失了也沒(méi)關(guān)系??梢栽俅芜\(yùn)行腳本。
  • 需要充分利用我的機(jī)器資源:100% CPU、8GB 內(nèi)存和千兆字節(jié)的SSD空間。

無(wú)需使用真正的隨機(jī)方法,stdlib偽隨機(jī)方法即可。

Python

首先是原始版本的Python方法。Python標(biāo)準(zhǔn)庫(kù)提供了一個(gè)SQLite模塊,首先使用它編寫(xiě)了第一個(gè)版本。代碼如下:

  1. import sqlite3 
  2. from commons import get_random_age, get_random_active, get_random_bool, get_random_area_code, create_table 
  3. DB_NAME = "naive.db" 
  4. def faker(con: sqlite3.Connection, count=100_000): 
  5. for _ in range(count): 
  6. age = get_random_age() 
  7. active = get_random_active() 
  8. # switch for area code 
  9. if get_random_bool(): 
  10. # random 6 digit number 
  11. area = get_random_area_code() 
  12. con.execute('INSERT INTO user VALUES (NULL,?,?,?)', (area, age, active)) 
  13. else: 
  14. con.execute('INSERT INTO user VALUES (NULL,NULL,?,?)', (age, active)) 
  15. con.commit() 
  16. def main(): 
  17. con = sqlite3.connect(DB_NAME, isolation_level=None
  18. con.execute('PRAGMA journal_mode = WAL;') 
  19. create_table(con) 
  20. faker(con, count=10_000_000
  21. if __name__ == '__main__': 
  22. main() 

在該腳本中,通for循環(huán)中一一插入1000萬(wàn)條數(shù)據(jù)。執(zhí)行花了將近15分鐘?;诖诉M(jìn)行優(yōu)化迭代,提高性能。

SQLite中,每次插入都是原子性的并且為一個(gè)事務(wù)。每個(gè)事務(wù)都需要保證寫(xiě)入磁盤(pán)(涉及IO操作),因此可能會(huì)很慢。為了優(yōu)化,可以嘗試通過(guò)不同大小的批量插入,對(duì)比發(fā)現(xiàn),100000是最佳選擇。通過(guò)這個(gè)簡(jiǎn)單的更改,運(yùn)行時(shí)間減少到了10分鐘,優(yōu)化了3分之一,但是仍然非常耗時(shí)。優(yōu)化后,批量插入版本源碼:

SQLite庫(kù)優(yōu)化

除了在代碼層優(yōu)化外,如果對(duì)于單純的數(shù)據(jù)寫(xiě)入,對(duì)數(shù)據(jù)庫(kù)本身搞的優(yōu)化也是非常重要的。對(duì)于SQLite優(yōu)化,可以做如下配置:

  1. PRAGMA journal_mode = OFF
  2. PRAGMA synchronous = 0
  3. PRAGMA cache_size = 1000000
  4. PRAGMA locking_mode = EXCLUSIVE
  5. PRAGMA temp_store = MEMORY

具體解釋:

首先,journal_mode設(shè)置為OFF,將會(huì)關(guān)閉回滾日志,禁用 SQLite 的原子提交和回滾功能,這樣在事務(wù)失敗情況下,無(wú)法恢復(fù),基于例子實(shí)例穩(wěn)健性要求可以設(shè)置,但是嚴(yán)禁在生產(chǎn)環(huán)境中使用。

其次,關(guān)閉synchronous,SQLite可以不再校驗(yàn)磁盤(pán)寫(xiě)入的數(shù)據(jù)可靠性。寫(xiě)入SQLite可能并不意味著它已刷新到磁盤(pán)。同樣,嚴(yán)禁在生產(chǎn)環(huán)境中啟用。

cache_size用戶指定SQLite允許在內(nèi)存中保留多少內(nèi)存頁(yè)。不要在生產(chǎn)中分配太高的的數(shù)值。

使用在EXCLUSIVE鎖定模式,SQLite連接持有的鎖永遠(yuǎn)不會(huì)被釋放。

設(shè)置temp_store到MEMOR將使其表現(xiàn)得像一個(gè)內(nèi)存數(shù)據(jù)庫(kù)。

優(yōu)化性能

對(duì)上面的兩個(gè)腳本,添加 SQLite優(yōu)化參數(shù),然后重新運(yùn)行:

  1. def main():     
  2. con = sqlite3.connect(DB_NAME, isolation_level=None)     
  3. con.execute('PRAGMA journal_mode = OFF;')     
  4. con.execute('PRAGMA synchronous = 0;')     
  5. con.execute('PRAGMA cache_size = 1000000;') # give it a GB     
  6. con.execute('PRAGMA locking_mode = EXCLUSIVE;')     
  7. con.execute('PRAGMA temp_store = MEMORY;')     
  8. create_table(con)     

faker(con, count=100_000_000)

優(yōu)化后版本,原始版本,插入1億行數(shù)據(jù),大概花了10分鐘;對(duì)比批量插入版本大概花了8.5分鐘。

pypy版本

對(duì)比CPython PyPy在數(shù)據(jù)處理中可以提高性能,據(jù)說(shuō)可以提高4倍以上的性能。本實(shí)驗(yàn)中也嘗試編譯PyPy解釋器,運(yùn)行腳本(代碼無(wú)需修改)。

使用pypy解釋器,批處理版本,插入1億行數(shù)據(jù)只需2.5分鐘。性能大概是Cpython的3.5倍,可見(jiàn)傳說(shuō)的4倍性能提高確實(shí)是真的,誠(chéng)不我欺也!。同時(shí),為了測(cè)試在純循環(huán)插入中消耗的時(shí)間,在腳本中刪除SQL指令并運(yùn)行:

以上腳本在CPython中耗時(shí)5.5分鐘 。PyPy執(zhí)行耗時(shí)1.5分鐘(同樣提高了3.5倍)。

Rust

在完成Python各種優(yōu)化折騰。又嘗試了Rust版本的插入,對(duì)比也有個(gè)原始版本和批量插入版本。原始版本,也是每行插入:

  1. use rusqlite::{params, Connection}; 
  2. mod common; 
  3. fn faker(mut conn: Connection, count: i64) { 
  4. let tx = conn.transaction().unwrap(); 
  5. for _ in 0..count { 
  6. let with_area = common::get_random_bool(); 
  7. let age = common::get_random_age(); 
  8. let is_active = common::get_random_active(); 
  9. if with_area { 
  10. let area_code = common::get_random_area_code(); 
  11. tx.execute( 
  12. "INSERT INTO user VALUES (NULL, ?, ?, ?)", 
  13. params![area_code, age, is_active], 
  14. .unwrap(); 
  15. } else { 
  16. tx.execute( 
  17. "INSERT INTO user VALUES (NULL, NULL, ?, ?)", 
  18. params![age, is_active], 
  19. .unwrap(); 
  20. tx.commit().unwrap(); 
  21. fn main() { 
  22. let conn = Connection::open("basic.db").unwrap(); 
  23. conn.execute_batch( 
  24. "PRAGMA journal_mode = OFF
  25. PRAGMA synchronous = 0
  26. PRAGMA cache_size = 1000000
  27. PRAGMA locking_mode = EXCLUSIVE
  28. PRAGMA temp_store = MEMORY;", 
  29. .expect("PRAGMA"); 
  30. conn.execute( 
  31. "CREATE TABLE IF NOT EXISTS user ( 
  32. id INTEGER not null primary key, 
  33. area CHAR(6), 
  34. age INTEGER not null, 
  35. active INTEGER not null)", 
  36. [], 
  37. .unwrap(); 
  38. faker(conn, 100_000_000) 

該版執(zhí)行,大概用時(shí)3分鐘。然后我做了進(jìn)一步的實(shí)驗(yàn):

將rusqlite,換成sqlx異步運(yùn)行。

  1. use std::str::FromStr;     
  2.  
  3. use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous};     
  4. use sqlx::{ConnectOptions, Connection, Executor, SqliteConnection, Statement};     
  5.  
  6. mod common;     
  7.  
  8. async fn faker(mut conn: SqliteConnection, count: i64) -> Result<(), sqlx::Error> {     
  9. let mut tx = conn.begin().await?;     
  10. let stmt_with_area = tx     
  11. .prepare("INSERT INTO user VALUES (NULL, ?, ?, ?)")     
  12. .await?;     
  13. let stmt = tx     
  14. .prepare("INSERT INTO user VALUES (NULL, NULL, ?, ?)")     
  15. .await?;     
  16. for _ in 0..count {     
  17. let with_area = common::get_random_bool();     
  18. let age = common::get_random_age();     
  19. let is_active = common::get_random_active();     
  20. if with_area {     
  21. let area_code = common::get_random_area_code();     
  22. stmt_with_area     
  23. .query()     
  24. .bind(area_code)     
  25. .bind(age)     
  26. .bind(is_active)     
  27. .execute(&mut tx)     
  28. .await?;     
  29. } else {     
  30. stmt.query()     
  31. .bind(age)     
  32. .bind(is_active)     
  33. .execute(&mut tx)     
  34. .await?;     
  35. }     
  36. }     
  37. tx.commit().await?;     
  38. Ok(())     
  39. }     
  40.  
  41. #[tokio::main]     
  42. async fn main() -> Result<(), sqlx::Error> {     
  43. let mut conn = SqliteConnectOptions::from_str("basic_async.db")     
  44. .unwrap()     
  45. .create_if_missing(true)     
  46. .journal_mode(SqliteJournalMode::Off)     
  47. .synchronous(SqliteSynchronous::Off)     
  48. .connect()     
  49. .await?;     
  50. conn.execute("PRAGMA cache_size = 1000000;").await?;     
  51. conn.execute("PRAGMA locking_mode = EXCLUSIVE;").await?;     
  52. conn.execute("PRAGMA temp_store = MEMORY;").await?;     
  53. conn.execute(     
  54. "CREATE TABLE IF NOT EXISTS user (     
  55. id INTEGER not null primary key,     
  56. area CHAR(6),     
  57. age INTEGER not null,     
  58. active INTEGER not null);",     
  59. )     
  60. .await?;     
  61. faker(conn, 100_000_000).await?;     
  62. Ok(())     

這個(gè)版本花了大約14分鐘。性能反而下降下降了。比Python版本還要差(原因值得深析)。

對(duì)執(zhí)行的原始SQL語(yǔ)句,切換到準(zhǔn)備好的語(yǔ)句并在循環(huán)中插入行,但重用了準(zhǔn)備好的語(yǔ)句。該版本只用了大約一分鐘。

使用準(zhǔn)備好的語(yǔ)句并將它們插入到50行的批次中,插入10億條,耗時(shí)34.3 秒。

  1. use rusqlite::{Connection, ToSql, Transaction}; 
  2. mod common; 
  3. fn faker_wrapper(mut conn: Connection, count: i64) { 
  4. let tx = conn.transaction().unwrap(); 
  5. faker(&tx, count); 
  6. tx.commit().unwrap(); 
  7. fn faker(tx: &Transaction, count: i64) { 
  8. // that is, we will batch 50 inserts of rows at once 
  9. let min_batch_size: i64 = 50
  10. if count < min_batch_size { 
  11. panic!("count cant be less than min batch size"); 
  12. // jeez, refactor this! 
  13. let mut with_area_params = " (NULL, ?, ?, ?),".repeat(min_batch_size as usize); 
  14. with_area_params.pop(); 
  15. let with_area_paramswith_area_params = with_area_params.as_str(); 
  16. let mut without_area_params = " (NULL, NULL, ?, ?),".repeat(min_batch_size as usize); 
  17. without_area_params.pop(); 
  18. let without_area_paramswithout_area_params = without_area_params.as_str(); 
  19. let st1 = format!("INSERT INTO user VALUES {}", with_area_params); 
  20. let st2 = format!("INSERT INTO user VALUES {}", without_area_params); 
  21. let mut stmt_with_area = tx.prepare_cached(st1.as_str()).unwrap(); 
  22. let mut stmt = tx.prepare_cached(st2.as_str()).unwrap(); 
  23. for _ in 0..(count / min_batch_size) { 
  24. let with_area = common::get_random_bool(); 
  25. let age = common::get_random_age(); 
  26. let is_active = common::get_random_active(); 
  27. let mut param_values: Vec<_> = Vec::new(); 
  28. if with_area { 
  29. // lets prepare the batch 
  30. let mut vector = Vec::<(String, i8, i8)>::new(); 
  31. for _ in 0..min_batch_size { 
  32. let area_code = common::get_random_area_code(); 
  33. vector.push((area_code, age, is_active)); 
  34. for batch in vector.iter() { 
  35. param_values.push(&batch.0 as &dyn ToSql); 
  36. param_values.push(&batch.1 as &dyn ToSql); 
  37. param_values.push(&batch.2 as &dyn ToSql); 
  38. stmt_with_area.execute(&*param_values).unwrap(); 
  39. } else { 
  40. // lets prepare the batch 
  41. let mut vector = Vec::<(i8, i8)>::new(); 
  42. for _ in 0..min_batch_size { 
  43. vector.push((age, is_active)); 
  44. for batch in vector.iter() { 
  45. param_values.push(&batch.0 as &dyn ToSql); 
  46. param_values.push(&batch.1 as &dyn ToSql); 
  47. stmt.execute(&*param_values).unwrap(); 
  48. fn main() { 
  49. let conn = Connection::open("basic_batched.db").unwrap(); 
  50. conn.execute_batch( 
  51. "PRAGMA journal_mode = OFF
  52. PRAGMA synchronous = 0
  53. PRAGMA cache_size = 1000000
  54. PRAGMA locking_mode = EXCLUSIVE
  55. PRAGMA temp_store = MEMORY;", 
  56. .expect("PRAGMA"); 
  57. conn.execute( 
  58. "CREATE TABLE IF NOT EXISTS user ( 
  59. id INTEGER not null primary key, 
  60. area CHAR(6), 
  61. age INTEGER not null, 
  62. active INTEGER not null)", 
  63. [], 
  64. .unwrap(); 
  65. faker_wrapper(conn, 100_000_000) 
  66. 創(chuàng)建了一個(gè)線程版本,其中有一個(gè)從通道接收數(shù)據(jù)的寫(xiě)入線程和四個(gè)將數(shù)據(jù)推送到管道其他線程。 
  67. use rusqlite::{Connection, ToSql}; 
  68. use std::sync::mpsc; 
  69. use std::sync::mpsc::{Receiver, Sender}; 
  70. use std::thread; 
  71. mod common; 
  72. static MIN_BATCH_SIZE: i64 = 50
  73. enum ParamValues { 
  74. WithArea(Vec<(String, i8, i8)>), 
  75. WithoutArea(Vec<(i8, i8)>), 
  76. fn consumer(rx: Receiver<ParamValues>) { 
  77. let mut conn = Connection::open("threaded_batched.db").unwrap(); 
  78. conn.execute_batch( 
  79. "PRAGMA journal_mode = OFF
  80. PRAGMA synchronous = 0
  81. PRAGMA cache_size = 1000000
  82. PRAGMA locking_mode = EXCLUSIVE
  83. PRAGMA temp_store = MEMORY;", 
  84. .expect("PRAGMA"); 
  85. conn.execute( 
  86. "CREATE TABLE IF NOT EXISTS user ( 
  87. id INTEGER not null primary key, 
  88. area CHAR(6), 
  89. age INTEGER not null, 
  90. active INTEGER not null)", 
  91. [], 
  92. .unwrap(); 
  93. let tx = conn.transaction().unwrap(); 
  94. // jeez, refactor this! 
  95. let mut with_area_params = " (NULL, ?, ?, ?),".repeat(MIN_BATCH_SIZE as usize); 
  96. with_area_params.pop(); 
  97. let with_area_paramswith_area_params = with_area_params.as_str(); 
  98. let mut without_area_params = " (NULL, NULL, ?, ?),".repeat(MIN_BATCH_SIZE as usize); 
  99. without_area_params.pop(); 
  100. let without_area_paramswithout_area_params = without_area_params.as_str(); 
  101. let st1 = format!("INSERT INTO user VALUES {}", with_area_params); 
  102. let st2 = format!("INSERT INTO user VALUES {}", without_area_params); 
  103. let mut stmt_with_area = tx.prepare_cached(st1.as_str()).unwrap(); 
  104. let mut stmt_without_area = tx.prepare_cached(st2.as_str()).unwrap(); 
  105. for param_values in rx { 
  106. let mut row_values: Vec<&dyn ToSql> = Vec::new(); 
  107. match param_values { 
  108. ParamValues::WithArea(values) => { 
  109. for batch in values.iter() { 
  110. row_values.push(&batch.0 as &dyn ToSql); 
  111. row_values.push(&batch.1 as &dyn ToSql); 
  112. row_values.push(&batch.2 as &dyn ToSql); 
  113. stmt_with_area.execute(&*row_values).unwrap(); 
  114. ParamValues::WithoutArea(values) => { 
  115. for batch in values.iter() { 
  116. row_values.push(&batch.0 as &dyn ToSql); 
  117. row_values.push(&batch.1 as &dyn ToSql); 
  118. stmt_without_area.execute(&*row_values).unwrap(); 
  119. tx.commit().unwrap(); 
  120. fn producer(tx: Sender<ParamValues>, count: i64) { 
  121. if count < MIN_BATCH_SIZE { 
  122. panic!("count cant be less than min batch size"); 
  123. for _ in 0..(count / MIN_BATCH_SIZE) { 
  124. let with_area = common::get_random_bool(); 
  125. let age = common::get_random_age(); 
  126. let is_active = common::get_random_active(); 
  127. let mut param_values: Vec<_> = Vec::new(); 
  128. if with_area { 
  129. // lets prepare the batch 
  130. let mut vector = Vec::<(String, i8, i8)>::new(); 
  131. for _ in 0..MIN_BATCH_SIZE { 
  132. let area_code = common::get_random_area_code(); 
  133. vector.push((area_code, age, is_active)); 
  134. for batch in vector.iter() { 
  135. param_values.push(&batch.0 as &dyn ToSql); 
  136. param_values.push(&batch.1 as &dyn ToSql); 
  137. param_values.push(&batch.2 as &dyn ToSql); 
  138. // send the values 
  139. tx.send(ParamValues::WithArea(vector)).unwrap(); 
  140. } else { 
  141. // lets prepare the batch 
  142. let mut vector = Vec::<(i8, i8)>::new(); 
  143. for _ in 0..MIN_BATCH_SIZE { 
  144. vector.push((age, is_active)); 
  145. for batch in vector.iter() { 
  146. param_values.push(&batch.0 as &dyn ToSql); 
  147. param_values.push(&batch.1 as &dyn ToSql); 
  148. // send the values 
  149. tx.send(ParamValues::WithoutArea(vector)).unwrap(); 
  150. fn main() { 
  151. // setup the DB and tables 
  152. let (tx, rx): (Sender<ParamValues>, Receiver<ParamValues>) = mpsc::channel(); 
  153. // lets launch the consumer 
  154. let consumer_handle = thread::spawn(|| consumer(rx)); 
  155. let cpu_count = num_cpus::get(); 
  156. let total_rows = 100_000_000
  157. let each_producer_count = (total_rows / cpu_count) as i64; 
  158. let mut handles = Vec::with_capacity(cpu_count); 
  159. for _ in 0..cpu_count { 
  160. let thread_tx = tx.clone(); 
  161. handles.push(thread::spawn(move || { 
  162. producer(thread_tx, each_producer_count.clone()) 
  163. })) 
  164. for t in handles { 
  165. t.join().unwrap(); 
  166. drop(tx); 
  167. // wait till consumer is exited 
  168. consumer_handle.join().unwrap(); 

這是性能最好的版本,耗時(shí)約32.37秒。

基準(zhǔn)測(cè)試對(duì)比:

總結(jié)

通過(guò)案例不同任務(wù)實(shí)驗(yàn),總體上可以得到:

  • 通過(guò)SQLite PRAGMA語(yǔ)句優(yōu)化設(shè)置可以提高插入性能。
  • 使用準(zhǔn)備好的語(yǔ)句可以提高性能
  • 進(jìn)行批量插入可以提高性能。
  • PyPy 實(shí)際上比CPython快4倍
  • 線程/異步不一定能提高性能。

 

責(zé)任編輯:趙寧寧 來(lái)源: 蟲(chóng)蟲(chóng)搜奇
相關(guān)推薦

2021-07-19 15:33:27

編程Rust開(kāi)發(fā)

2011-05-25 10:32:19

SQLite

2024-04-15 08:30:53

MySQLORM框架

2024-07-04 13:42:12

2024-05-21 11:34:03

RustPython編譯器

2024-06-24 07:00:00

C++RustGo

2022-11-17 10:23:13

VS CodeCodiumPython

2015-03-03 09:52:02

2022-04-06 14:15:10

Python數(shù)據(jù)

2018-06-21 09:12:01

編程語(yǔ)言Python數(shù)據(jù)分析

2024-01-08 13:31:00

Rust自動(dòng)化測(cè)試

2024-06-04 10:49:05

Rust插件開(kāi)發(fā)工具

2024-04-02 08:30:40

RustUnix信號(hào)服務(wù)器

2022-01-14 10:50:23

PythonRust編程語(yǔ)言

2024-09-20 18:02:42

C#數(shù)據(jù)庫(kù)SQLite

2021-08-22 17:22:31

VS Code容器開(kāi)發(fā)人員

2009-12-04 09:51:32

VS XML注釋

2011-08-04 18:00:47

SQLite數(shù)據(jù)庫(kù)批量數(shù)據(jù)

2023-06-15 17:00:11

Rust循環(huán)

2023-03-07 17:50:03

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)