震撼!通過雙重異步,Excel 10萬行數(shù)據(jù)導(dǎo)入從191秒優(yōu)化到2秒!
在現(xiàn)代的企業(yè)級應(yīng)用開發(fā)中,海量數(shù)據(jù)的處理效率和并發(fā)性能優(yōu)化是一個(gè)非常重要的課題。無論是大規(guī)模數(shù)據(jù)導(dǎo)入、文件解析,還是在分布式系統(tǒng)中處理高并發(fā)任務(wù),如何提升系統(tǒng)的處理速度、合理利用計(jì)算資源、減少線程上下文切換的開銷,這些都是開發(fā)者必須面對的問題。在這一背景下,線程池技術(shù)以及異步編程逐漸成為提升系統(tǒng)性能的利器。
本文將深入探討如何通過合理設(shè)計(jì)線程池和利用異步編程模型,有效優(yōu)化大規(guī)模數(shù)據(jù)的處理性能。我們將結(jié)合 Spring Boot 框架中的 @Async 注解、自定義線程池、以及通過使用 EasyExcel 進(jìn)行大數(shù)據(jù)量的 Excel 解析和異步寫入數(shù)據(jù)庫的場景,詳細(xì)說明如何通過分而治之的策略,減少系統(tǒng)的響應(yīng)時(shí)間、提高并發(fā)處理能力。同時(shí),還將分析如何基于 CPU 和 IO 密集型任務(wù)的特性,來合理設(shè)置線程池的核心線程數(shù)、最大線程數(shù)等參數(shù),以便在實(shí)際項(xiàng)目中能夠充分發(fā)揮硬件資源的性能。
通常我是這樣做的:
- 使用POI讀取需要導(dǎo)入的Excel文件;
- 將文件名作為表名,列標(biāo)題作為列名,并將數(shù)據(jù)拼接成SQL語句;
- 通過JDBC或Mybatis插入到數(shù)據(jù)庫。
圖片
在操作中,如果文件數(shù)量多且數(shù)據(jù)量大,處理過程可能會非常緩慢。
訪問后,感覺程序沒有響應(yīng),但實(shí)際上,它正在讀取并插入數(shù)據(jù),只是速度很慢。
讀取包含10萬行的Excel文件竟然耗時(shí)191秒!
我以為程序卡住了!
private void readXls(String filePath, String filename) throws Exception {
@SuppressWarnings("resource")
XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new FileInputStream(filePath));
// 讀取第一個(gè)工作表
XSSFSheet sheet = xssfWorkbook.getSheetAt(0);
// 獲取總行數(shù)
int maxRow = sheet.getLastRowNum();
StringBuilder insertBuilder = new StringBuilder();
insertBuilder.append("insert into ").append(filename).append(" ( UUID,");
XSSFRow row = sheet.getRow(0);
for (int i = 0; i < row.getPhysicalNumberOfCells(); i++) {
insertBuilder.append(row.getCell(i)).append(",");
}
insertBuilder.deleteCharAt(insertBuilder.length() - 1);
insertBuilder.append(" ) values ( ");
StringBuilder stringBuilder = new StringBuilder();
for (int i = 1; i <= maxRow; i++) {
XSSFRow xssfRow = sheet.getRow(i);
String id = "";
String name = "";
for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
if (j == 0) {
id = xssfRow.getCell(j) + "";
} else if (j == 1) {
name = xssfRow.getCell(j) + "";
}
}
boolean flag = isExisted(id, name);
if (!flag) {
stringBuilder.append(insertBuilder);
stringBuilder.append('\'').append(uuid()).append('\'').append(",");
for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
stringBuilder.append('\'').append(value).append('\'').append(",");
}
stringBuilder.deleteCharAt(stringBuilder.length() - 1);
stringBuilder.append(" )").append("\n");
}
}
List<String> collect = Arrays.stream(stringBuilder.toString().split("\n")).collect(Collectors.toList());
int sum = JdbcUtil.executeDML(collect);
}
private static boolean isExisted(String id, String name) {
String sql = "select count(1) as num from " + static_TABLE + " where ID = '" + id + "' and NAME = '" + name + "'";
String num = JdbcUtil.executeSelect(sql, "num");
return Integer.valueOf(num) > 0;
}
private static String uuid() {
return UUID.randomUUID().toString().replace("-", "");
}
如何優(yōu)化?
優(yōu)化1:首先,查詢所有數(shù)據(jù),將其緩存到map中,然后在插入前做決策。這樣可以大大提高速度。
優(yōu)化2:如果單個(gè)Excel文件太大,可以考慮使用異步和多線程,分批讀取多行并插入數(shù)據(jù)庫。
圖片
優(yōu)化3:如果文件太多,可以為每個(gè)Excel文件使用一個(gè)異步進(jìn)程,實(shí)現(xiàn)雙重異步讀取和插入。
圖片
使用雙重異步處理后,從191秒優(yōu)化到了2秒,你能相信嗎?
以下是異步讀取Excel文件和批量讀取大Excel文件的關(guān)鍵代碼。
異步讀取緩存的Excel Controller類
@RequestMapping(value = "/readExcelCacheAsync", method = RequestMethod.POST)
@ResponseBody
public String readExcelCacheAsync() {
String path = "G:\\Test\\data\\";
try {
// 讀取Excel之前,緩存所有數(shù)據(jù)
USER_INFO_SET = getUserInfo();
File file = new File(path);
String[] xlsxArr = file.list();
for (int i = 0; i < xlsxArr.length; i++) {
File fileTemp = new File(path + "\\" + xlsxArr[i]);
String filename = fileTemp.getName().replace(".xlsx", "");
readExcelCacheAsyncService.readXls(path + filename + ".xlsx", filename);
}
} catch (Exception e) {
logger.error("|#ReadDBCsv|#Exception: ", e);
return "error";
}
return "success";
}
批量讀取超大Excel文件
@Async("async-executor")
public void readXls(String filePath, String filename) throws Exception {
@SuppressWarnings("resource")
XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new FileInputStream(filePath));
// 讀取第一個(gè)工作表
XSSFSheet sheet = xssfWorkbook.getSheetAt(0);
// 總行數(shù)
int maxRow = sheet.getLastRowNum();
logger.info(filename + ".xlsx,共 " + maxRow + " 行數(shù)據(jù)!");
StringBuilder insertBuilder = new StringBuilder();
insertBuilder.append("insert into ").append(filename).append(" ( UUID,");
XSSFRow row = sheet.getRow(0);
for (int i = 0; i < row.getPhysicalNumberOfCells(); i++) {
insertBuilder.append(row.getCell(i)).append(",");
}
insertBuilder.deleteCharAt(insertBuilder.length() - 1);
insertBuilder.append(" ) values ( ");
int times = maxRow / STEP + 1;
for (int time = 0; time < times; time++) {
int start = STEP * time + 1;
int end = STEP * time + STEP;
if (time == times - 1) {
end = maxRow;
}
if (end + 1 - start > 0) {
readExcelDataAsyncService.readXlsCacheAsyncMybatis(sheet, row, start, end, insertBuilder);
}
}
}
異步批量插入數(shù)據(jù)庫
@Async("async-executor")
public void readXlsCacheAsync(XSSFSheet sheet, XSSFRow row, int start, int end, StringBuilder insertBuilder) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = start; i <= end; i++) {
XSSFRow xssfRow = sheet.getRow(i);
String id = "";
String name = "";
for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
if (j == 0) {
id = xssfRow.getCell(j) + "";
} else if (j == 1) {
name = xssfRow.getCell(j) + "";
}
}
// 在讀取Excel之前,先緩存所有數(shù)據(jù),然后做決策
boolean flag = isExisted(id, name);
if (!flag) {
stringBuilder.append(insertBuilder);
stringBuilder.append('\'').append(uuid()).append('\'').append(",");
for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
stringBuilder.append('\'').append(value).append('\'').append(",");
}
stringBuilder.deleteCharAt(stringBuilder.length() - 1);
stringBuilder.append(" )").append("\n");
}
}
List<String> collect = Arrays.stream(stringBuilder.toString().split("\n")).collect(Collectors.toList());
if (collect != null && collect.size() > 0) {
int sum = JdbcUtil.executeDML(collect);
}
}
private boolean isExisted(String id, String name) {
return ReadExcelCacheAsyncController.USER_INFO_SET.contains(id + "," + name);
}
異步線程池工具類
@Async 的目的是異步處理任務(wù)。
- 在方法上添加 @Async 表明該方法是異步的。
- 在類上添加 @Async 表示該類中的所有方法都是異步的。
- 使用此注解的類必須由 Spring 管理。
- 必須在啟動(dòng)類或配置類中添加 @EnableAsync 注解,@Async 才能生效。
在使用 @Async 時(shí),如果不指定線程池的名稱,即不自定義線程池,默認(rèn)會使用一個(gè)線程池。這個(gè)默認(rèn)線程池是 Spring 的 SimpleAsyncTaskExecutor。
默認(rèn)線程池的默認(rèn)配置如下:
- 默認(rèn)核心線程數(shù):8。
- 最大線程數(shù):Integer.MAX_VALUE。
- 隊(duì)列類型:LinkedBlockingQueue。
- 容量:Integer.MAX_VALUE。
- 空閑線程保留時(shí)間:60秒。
- 線程池拒絕策略:AbortPolicy。
從最大線程數(shù)可以看出,在并發(fā)情況下,線程會無限制地創(chuàng)建。
你也可以通過 yml 文件重新配置:
spring:
task:
execution:
pool:
max-size: 10
core-size: 5
keep-alive: 3s
queue-capacity: 1000
thread-name-prefix: my-executor
你也可以自定義線程池。以下是使用 @Async 自定義線程池的簡單代碼實(shí)現(xiàn):
@EnableAsync // 支持異步操作
@Configuration
public class AsyncTaskConfig {
/**
* 來自 com.google.guava 的線程池
* @return
*/
@Bean("my-executor")
public Executor firstExecutor() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-executor").build();
// 獲取 CPU 處理器數(shù)量
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(curSystemThreads, 100,
200, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), threadFactory);
threadPool.allowsCoreThreadTimeOut();
return threadPool;
}
/**
* Spring 的線程池
* @return
*/
@Bean("async-executor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// 核心線程數(shù)
taskExecutor.setCorePoolSize(24);
// 線程池維護(hù)的最大線程數(shù),超出核心線程數(shù)的線程僅當(dāng)緩沖隊(duì)列滿時(shí)才會創(chuàng)建
taskExecutor.setMaxPoolSize(200);
// 緩沖隊(duì)列
taskExecutor.setQueueCapacity(50);
// 超出核心線程數(shù)的線程空閑時(shí)間,超時(shí)后將被銷毀
taskExecutor.setKeepAliveSeconds(200);
// 異步方法內(nèi)部線程名
taskExecutor.setThreadNamePrefix("async-executor-");
/**
* 當(dāng)線程池的任務(wù)緩存隊(duì)列已滿,且線程池中的線程數(shù)量已達(dá)到最大值時(shí),如果還有任務(wù)到來,將采用任務(wù)拒絕策略。
* 通常有以下四種策略:
* ThreadPoolExecutor.AbortPolicy:拋棄任務(wù)并拋出 RejectedExecutionException 異常。
* ThreadPoolExecutor.DiscardPolicy:拋棄任務(wù),但不拋出異常。
* ThreadPoolExecutor.DiscardOldestPolicy:拋棄隊(duì)列最前面的任務(wù),然后嘗試執(zhí)行當(dāng)前任務(wù)(重復(fù)此過程)。
* ThreadPoolExecutor.CallerRunsPolicy:重試添加當(dāng)前任務(wù),自動(dòng)調(diào)用執(zhí)行方法,直到成功。
*/
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();
return taskExecutor;
}
}
異步失效的原因
- 被 @Async 注解的方法不是 public 的;
- 被 @Async 注解的方法的返回值類型只能是 void 或 Future;
- 被 @Async 注解的方法如果是靜態(tài)的也會失效;
- 未添加 @EnableAsync 注解;
- 調(diào)用者和被 @Async 注解的方法不能在同一個(gè)類中;
- 對異步方法使用 @Transactional 是無效的,但對異步方法內(nèi)調(diào)用的方法加上 @Transactional 是有效的。
線程池中設(shè)置核心線程數(shù)的問題
我尚未有時(shí)間詳細(xì)探討:在線程池中設(shè)置 CorePoolSize 和 MaxPoolSize 的最適宜和最高效的數(shù)量是多少。
借此機(jī)會進(jìn)行了一些測試。
我記得有個(gè)關(guān)于 CPU 處理器數(shù)量的說法
將 CorePoolSize 設(shè)置為 CPU 處理器的數(shù)量時(shí),效率最高嗎?
// 獲取 CPU 處理器數(shù)量
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;
Runtime.getRuntime().availableProcessors() 會獲取 CPU 核心線程數(shù),代表計(jì)算資源。
- 對于 CPU 密集型任務(wù),線程池的大小設(shè)置為 N,與 CPU 線程數(shù)一致,這可以最大限度地減少線程間的上下文切換。但在實(shí)際開發(fā)中,一般設(shè)置為 N+1,以防止線程由于不可預(yù)見的情況而阻塞。如果發(fā)生阻塞,多出來的線程可以繼續(xù)執(zhí)行任務(wù),保證 CPU 的高效利用。
- 對于 IO 密集型任務(wù),線程池的大小設(shè)置為 2N。這個(gè)數(shù)值是根據(jù)業(yè)務(wù)壓力測試得出的,或者在不涉及業(yè)務(wù)時(shí)使用推薦值。
實(shí)際中,線程池的具體大小需要根據(jù)壓力測試以及機(jī)器的當(dāng)前狀態(tài)進(jìn)行調(diào)整。
如果線程池過大,會導(dǎo)致 CPU 持續(xù)切換,系統(tǒng)整體性能并不會有顯著提高,反而可能會變慢。
我電腦的 CPU 處理器數(shù)量為 24。
那么一次讀取多少行效率最高呢?
測試中,Excel 文件包含 10 萬行數(shù)據(jù)。10 萬 / 24 = 4166,因此我設(shè)置為 4200。這是最有效的設(shè)置嗎?
測試過程中似乎的確如此。
我記得大家習(xí)慣性地將核心線程數(shù)(CorePoolSize)和最大線程數(shù)(MaxPoolSize)設(shè)置為相同的數(shù)值,通常是 200。
這只是隨機(jī)選擇,還是基于經(jīng)驗(yàn)的?
測試發(fā)現(xiàn),當(dāng) CorePoolSize 和 MaxPoolSize 都設(shè)置為 200 時(shí),最初同時(shí)開啟了 150 個(gè)線程工作。
為什么會這樣呢?
經(jīng)過數(shù)十次測試后
- 發(fā)現(xiàn)核心線程數(shù)并沒有太大區(qū)別;
- 關(guān)鍵是每次讀取和存儲的行數(shù),不能太多,存儲速度會逐漸減慢;
- 也不能太少,如果少于 150 個(gè)線程,會導(dǎo)致線程阻塞,反而減慢進(jìn)程。
IV.使用 EasyExcel 讀取并插入數(shù)據(jù)庫
我不會寫 EasyExcel 的雙異步優(yōu)化。大家要記住避免掉進(jìn)低級勤奮的陷阱。
ReadEasyExcelController
@RequestMapping(value = "/readEasyExcel", method = RequestMethod.POST)
@ResponseBody
public String readEasyExcel() {
try {
String path = "G:\\Test\\data\\";
String[] xlsxArr = new File(path).list();
for (int i = 0; i < xlsxArr.length; i++) {
String filePath = path + xlsxArr[i];
File fileTemp = new File(path + xlsxArr[i]);
String fileName = fileTemp.getName().replace(".xlsx", "");
List<UserInfo> list = new ArrayList<>();
EasyExcel.read(filePath, UserInfo.class, new ReadEasyExeclAsyncListener(readEasyExeclService, fileName, batchCount, list)).sheet().doRead();
}
}catch (Exception e){
logger.error("readEasyExcel Exception:",e);
return "error";
}
return "success";
}
ReadEasyExeclAsyncListener
public ReadEasyExeclService readEasyExeclService;
// 表名
public String TABLE_NAME;
// 批量插入閾值
private int BATCH_COUNT;
// 數(shù)據(jù)收集
private List<UserInfo> LIST;
public ReadEasyExeclAsyncListener(ReadEasyExeclService readEasyExeclService, String tableName, int batchCount, List<UserInfo> list) {
this.readEasyExeclService = readEasyExeclService;
this.TABLE_NAME = tableName;
this.BATCH_COUNT = batchCount;
this.LIST = list;
}
@Override
public void invoke(UserInfo data, AnalysisContext analysisContext) {
data.setUuid(uuid());
data.setTableName(TABLE_NAME);
LIST.add(data);
if (LIST.size() >= BATCH_COUNT) {
// 批量入庫
readEasyExeclService.saveDataBatch(LIST);
}
}
@Override
public void doAfterAllAnalysed(AnalysisContext analysisContext) {
if (LIST.size() > 0) {
// 最后一批入庫
readEasyExeclService.saveDataBatch(LIST);
}
}
public static String uuid() {
return UUID.randomUUID().toString().replace("-", "");
}
ReadEasyExeclServiceImpl
@Service
public class ReadEasyExeclServiceImpl implements ReadEasyExeclService {
@Resource
private ReadEasyExeclMapper readEasyExeclMapper;
@Override
public void saveDataBatch(List<UserInfo> list) {
// Insert into the database via mybatis
readEasyExeclMapper.saveDataBatch(list);
// Insert into the database via JDBC
// insertByJdbc(list);
list.clear();
}
private void insertByJdbc(List<UserInfo> list){
List<String> sqlList = new ArrayList<>();
for (UserInfo u : list){
StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("insert into ").append(u.getTableName()).append(" ( UUID,ID,NAME,AGE,ADDRESS,PHONE,OP_TIME ) values ( ");
sqlBuilder.append("'").append(ReadEasyExeclAsyncListener.uuid()).append("',")
.append("'").append(u.getId()).append("',")
.append("'").append(u.getName()).append("',")
.append("'").append(u.getAge()).append("',")
.append("'").append(u.getAddress()).append("',")
.append("'").append(u.getPhone()).append("',")
.append("sysdate )");
sqlList.add(sqlBuilder.toString());
}
JdbcUtil.executeDML(sqlList);
}
}
UserInfo
@Data
public class UserInfo {
private String tableName;
private String uuid;
@ExcelProperty(value = "ID")
private String id;
@ExcelProperty(value = "NAME")
private String name;
@ExcelProperty(value = "AGE")
private String age;
@ExcelProperty(value = "ADDRESS")
private String address;
@ExcelProperty(value = "PHONE")
private String phone;
}
結(jié)語
在處理高并發(fā)、大數(shù)據(jù)導(dǎo)入等場景時(shí),異步編程和線程池技術(shù)提供了一種極具效率的解決方案。通過合理配置線程池的核心線程數(shù)、最大線程數(shù)、隊(duì)列長度等參數(shù),能夠在確保系統(tǒng)穩(wěn)定性的前提下,大幅提升并發(fā)處理能力。而通過異步編程,我們可以有效避免線程阻塞、減少資源浪費(fèi),并讓系統(tǒng)在面對大量請求時(shí)依然能夠保持較高的響應(yīng)速度。
本文的示例通過 Spring Boot 的 @Async 注解和自定義線程池,在實(shí)際的 EasyExcel 大數(shù)據(jù)導(dǎo)入場景下,驗(yàn)證了這種技術(shù)組合的高效性和實(shí)用性。此外,通過對 CPU 密集型任務(wù)和 IO 密集型任務(wù)的深入分析,開發(fā)者能夠根據(jù)自身項(xiàng)目的特點(diǎn),選擇合適的線程池配置策略,最大化資源利用率和性能表現(xiàn)。
在實(shí)際應(yīng)用中,線程池和異步編程不僅適用于大數(shù)據(jù)導(dǎo)入,還可以推廣到包括文件處理、網(wǎng)絡(luò)請求、日志處理等各類需要并發(fā)處理的場景中。因此,掌握并靈活運(yùn)用這些技術(shù),將為我們的系統(tǒng)性能優(yōu)化提供堅(jiān)實(shí)的基礎(chǔ),使我們能夠應(yīng)對更復(fù)雜、更苛刻的業(yè)務(wù)需求。