10萬條數(shù)據(jù)批量插入,到底怎么做才快?
上周松哥轉(zhuǎn)載了一個數(shù)據(jù)批量插入的文章,里邊和大家聊了一下數(shù)據(jù)批量插入的問題,批量插入到底怎么做才快。
有個小伙伴看了文章后提出了不同的意見:
松哥認(rèn)真和 BUG 同學(xué)聊了下,基本上明白了這個小伙伴的意思,于是我自己也寫了個測試案例,重新整理了今天這篇文章,希望和小伙伴們一起探討這個問題,也歡迎小伙伴們提出更好的方案。
1. 思路分析
批量插入這個問題,我們用 JDBC 操作,其實就是兩種思路吧:
- 用一個 for 循環(huán),把數(shù)據(jù)一條一條的插入(這種需要開啟批處理)。
- 生成一條插入 sql,類似這種 insert into user(username,address) values('aa','bb'),('cc','dd')...。
到底哪種快呢?
我們從兩方面來考慮這個問題:
- 插入 SQL 本身執(zhí)行的效率。
- 網(wǎng)絡(luò) I/O。
先說第一種方案,就是用 for 循環(huán)循環(huán)插入:
- 這種方案的優(yōu)勢在于,JDBC 中的 PreparedStatement 有預(yù)編譯功能,預(yù)編譯之后會緩存起來,后面的 SQL 執(zhí)行會比較快并且 JDBC 可以開啟批處理,這個批處理執(zhí)行非常給力。
- 劣勢在于,很多時候我們的 SQL 服務(wù)器和應(yīng)用服務(wù)器可能并不是同一臺,所以必須要考慮網(wǎng)絡(luò) IO,如果網(wǎng)絡(luò) IO 比較費時間的話,那么可能會拖慢 SQL 執(zhí)行的速度。
再來說第二種方案,就是生成一條 SQL 插入:
- 這種方案的優(yōu)勢在于只有一次網(wǎng)絡(luò) IO,即使分片處理也只是數(shù)次網(wǎng)絡(luò) IO,所以這種方案不會在網(wǎng)絡(luò) IO 上花費太多時間。
- 當(dāng)然這種方案有好幾個劣勢,一是 SQL 太長了,甚至可能需要分片后批量處理;二是無法充分發(fā)揮 PreparedStatement 預(yù)編譯的優(yōu)勢,SQL 要重新解析且無法復(fù)用;三是最終生成的 SQL 太長了,數(shù)據(jù)庫管理器解析這么長的 SQL 也需要時間。
所以我們最終要考慮的就是我們在網(wǎng)絡(luò) IO 上花費的時間,是否超過了 SQL 插入的時間?這是我們要考慮的核心問題。
2. 數(shù)據(jù)測試
接下來我們來做一個簡單的測試,批量插入 5 萬條數(shù)據(jù)看下。
首先準(zhǔn)備一個簡單的測試表:
- CREATE TABLE `user` (
- `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
- `username` varchar(255) DEFAULT NULL,
- `address` varchar(255) DEFAULT NULL,
- `password` varchar(255) DEFAULT NULL,
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
接下來創(chuàng)建一個 Spring Boot 工程,引入 MyBatis 依賴和 MySQL 驅(qū)動,然后 application.properties 中配置一下數(shù)據(jù)庫連接信息:
- spring.datasource.username=root
- spring.datasource.password=123
- spring.datasource.url=jdbc:mysql:///batch_insert?serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
大家需要注意,這個數(shù)據(jù)庫連接 URL 地址中多了一個參數(shù) rewriteBatchedStatements,這是核心。
MySQL JDBC 驅(qū)動在默認(rèn)情況下會無視 executeBatch() 語句,把我們期望批量執(zhí)行的一組 sql 語句拆散,一條一條地發(fā)給 MySQL 數(shù)據(jù)庫,批量插入實際上是單條插入,直接造成較低的性能。將 rewriteBatchedStatements 參數(shù)置為 true, 數(shù)據(jù)庫驅(qū)動才會幫我們批量執(zhí)行 SQL。
OK,這樣準(zhǔn)備工作就做好了。
2.1 方案一測試
首先我們來看方案一的測試,即一條一條的插入(實際上是批處理)。
首先創(chuàng)建相應(yīng)的 mapper,如下:
- @Mapper
- public interface UserMapper {
- Integer addUserOneByOne(User user);
- }
對應(yīng)的 XML 文件如下:
- <insert id="addUserOneByOne">
- insert into user (username,address,password) values (#{username},#{address},#{password})
- </insert>
service 如下:
- @Service
- public class UserService extends ServiceImpl<UserMapper, User> implements IUserService {
- private static final Logger logger = LoggerFactory.getLogger(UserService.class);
- @Autowired
- UserMapper userMapper;
- @Autowired
- SqlSessionFactory sqlSessionFactory;
- @Transactional(rollbackFor = Exception.class)
- public void addUserOneByOne(List<User> users) {
- SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH);
- UserMapper um = session.getMapper(UserMapper.class);
- long startTime = System.currentTimeMillis();
- for (User user : users) {
- um.addUserOneByOne(user);
- }
- session.commit();
- long endTime = System.currentTimeMillis();
- logger.info("一條條插入 SQL 耗費時間 {}", (endTime - startTime));
- }
- }
這里我要說一下:
雖然是一條一條的插入,但是我們要開啟批處理模式(BATCH),這樣前前后后就只用這一個 SqlSession,如果不采用批處理模式,反反復(fù)復(fù)的獲取 Connection 以及釋放 Connection 會耗費大量時間,效率奇低,這種效率奇低的方式松哥就不給大家測試了。
接下來寫一個簡單的測試接口看下:
- @RestController
- public class HelloController {
- private static final Logger logger = getLogger(HelloController.class);
- @Autowired
- UserService userService;
- /**
- * 一條一條插入
- */
- @GetMapping("/user2")
- public void user2() {
- List<User> users = new ArrayList<>();
- for (int i = 0; i < 50000; i++) {
- User u = new User();
- u.setAddress("廣州:" + i);
- u.setUsername("張三:" + i);
- u.setPassword("123:" + i);
- users.add(u);
- }
- userService.addUserOneByOne(users);
- }
- }
寫個簡單的單元測試:
- /**
- *
- * 單元測試加事務(wù)的目的是為了插入之后自動回滾,避免影響下一次測試結(jié)果
- * 一條一條插入
- */
- @Test
- @Transactional
- void addUserOneByOne() {
- List<User> users = new ArrayList<>();
- for (int i = 0; i < 50000; i++) {
- User u = new User();
- u.setAddress("廣州:" + i);
- u.setUsername("張三:" + i);
- u.setPassword("123:" + i);
- users.add(u);
- }
- userService.addUserOneByOne(users);
- }
可以看到,耗時 901 毫秒,5w 條數(shù)據(jù)插入不到 1 秒。
2.2 方案二測試
方案二是生成一條 SQL,然后插入。
mapper 如下:
- @Mapper
- public interface UserMapper {
- void addByOneSQL(@Param("users") List<User> users);
- }
對應(yīng)的 SQL 如下:
- <insert id="addByOneSQL">
- insert into user (username,address,password) values
- <foreach collection="users" item="user" separator=",">
- (#{user.username},#{user.address},#{user.password})
- </foreach>
- </insert>
service 如下:
- @Service
- public class UserService extends ServiceImpl<UserMapper, User> implements IUserService {
- private static final Logger logger = LoggerFactory.getLogger(UserService.class);
- @Autowired
- UserMapper userMapper;
- @Autowired
- SqlSessionFactory sqlSessionFactory;
- @Transactional(rollbackFor = Exception.class)
- public void addByOneSQL(List<User> users) {
- long startTime = System.currentTimeMillis();
- userMapper.addByOneSQL(users);
- long endTime = System.currentTimeMillis();
- logger.info("合并成一條 SQL 插入耗費時間 {}", (endTime - startTime));
- }
- }
然后在單元測試中調(diào)一下這個方法:
- /**
- * 合并成一條 SQL 插入
- */
- @Test
- @Transactional
- void addByOneSQL() {
- List<User> users = new ArrayList<>();
- for (int i = 0; i < 50000; i++) {
- User u = new User();
- u.setAddress("廣州:" + i);
- u.setUsername("張三:" + i);
- u.setPassword("123:" + i);
- users.add(u);
- }
- userService.addByOneSQL(users);
- }
可以看到插入 5 萬條數(shù)據(jù)耗時 1805 毫秒。
可以看到,生成一條 SQL 的執(zhí)行效率還是要差一點。
另外還需要注意,第二種方案還有一個問題,就是當(dāng)數(shù)據(jù)量大的時候,生成的 SQL 將特別的長,MySQL 可能一次性處理不了這么大的 SQL,這個時候就需要修改 MySQL 的配置或者對待插入的數(shù)據(jù)進行分片處理了,這些操作又會導(dǎo)致插入時間更長。
2.3 對比分析
很明顯,方案一更具優(yōu)勢。當(dāng)批量插入十萬、二十萬數(shù)據(jù)的時候,方案一的優(yōu)勢會更加明顯(方案二則需要修改 MySQL 配置或者對待插入數(shù)據(jù)進行分片)。
3. MP 怎么做的?
小伙伴們知道,其實 MyBatis Plus 里邊也有一個批量插入的方法 saveBatch,我們來看看它的實現(xiàn)源碼:
- @Transactional(rollbackFor = Exception.class)
- @Override
- public boolean saveBatch(Collection<T> entityList, int batchSize) {
- String sqlStatement = getSqlStatement(SqlMethod.INSERT_ONE);
- return executeBatch(entityList, batchSize, (sqlSession, entity) -> sqlSession.insert(sqlStatement, entity));
- }
可以看到,這里拿到的 sqlStatement 就是一個 INSERT_ONE,即一條一條插入。
再來看 executeBatch 方法,如下:
- public static <E> boolean executeBatch(Class<?> entityClass, Log log, Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) {
- Assert.isFalse(batchSize < 1, "batchSize must not be less than one");
- return !CollectionUtils.isEmpty(list) && executeBatch(entityClass, log, sqlSession -> {
- int size = list.size();
- int i = 1;
- for (E element : list) {
- consumer.accept(sqlSession, element);
- if ((i % batchSize == 0) || i == size) {
- sqlSession.flushStatements();
- }
- i++;
- }
- });
- }
這里注意 return 中的第三個參數(shù),是一個 lambda 表達式,這也是 MP 中批量插入的核心邏輯,可以看到,MP 先對數(shù)據(jù)進行分片(默認(rèn)分片大小是 1000),分片完成之后,也是一條一條的插入。繼續(xù)查看 executeBatch 方法,就會發(fā)現(xiàn)這里的 sqlSession 其實也是一個批處理的 sqlSession,并非普通的 sqlSession。
綜上,MP 中的批量插入方案跟我們 2.1 小節(jié)的批量插入思路其實是一樣的。
4. 小結(jié)
好啦,經(jīng)過上面的分析,現(xiàn)在小伙伴們知道了批量插入該怎么做了吧?
本文轉(zhuǎn)載自微信公眾號「江南一點雨」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系江南一點雨公眾號。