使用雙異步后,從 191s 優(yōu)化到 2s
在開發(fā)中,我們經(jīng)常會遇到這樣的需求,將Excel的數(shù)據(jù)導入數(shù)據(jù)庫中。
一、一般我會這樣做:
- 通過POI讀取需要導入的Excel。
- 以文件名為表名、列頭為列名、并將數(shù)據(jù)拼接成sql。
- 通過JDBC或mybatis插入數(shù)據(jù)庫。
操作起來,如果文件比較多,數(shù)據(jù)量都很大的時候,會非常慢。
訪問之后,感覺沒什么反應,實際上已經(jīng)在讀取 + 入庫了,只是比較慢而已。
讀取一個10萬行的Excel,居然用了191s,我還以為它卡死了呢!
private void readXls(String filePath, String filename) throws Exception {
@SuppressWarnings("resource")
XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new FileInputStream(filePath));
// 讀取第一個工作表
XSSFSheet sheet = xssfWorkbook.getSheetAt(0);
// 總行數(shù)
int maxRow = sheet.getLastRowNum();
StringBuilder insertBuilder = new StringBuilder();
insertBuilder.append("insert into ").append(filename).append(" ( UUID,");
XSSFRow row = sheet.getRow(0);
for (int i = 0; i < row.getPhysicalNumberOfCells(); i++) {
insertBuilder.append(row.getCell(i)).append(",");
}
insertBuilder.deleteCharAt(insertBuilder.length() - 1);
insertBuilder.append(" ) values ( ");
StringBuilder stringBuilder = new StringBuilder();
for (int i = 1; i <= maxRow; i++) {
XSSFRow xssfRow = sheet.getRow(i);
String id = "";
String name = "";
for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
if (j == 0) {
id = xssfRow.getCell(j) + "";
} else if (j == 1) {
name = xssfRow.getCell(j) + "";
}
}
boolean flag = isExisted(id, name);
if (!flag) {
stringBuilder.append(insertBuilder);
stringBuilder.append('\'').append(uuid()).append('\'').append(",");
for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
stringBuilder.append('\'').append(value).append('\'').append(",");
}
stringBuilder.deleteCharAt(stringBuilder.length() - 1);
stringBuilder.append(" )").append("\n");
}
}
List<String> collect = Arrays.stream(stringBuilder.toString().split("\n")).collect(Collectors.toList());
int sum = JdbcUtil.executeDML(collect);
}
private static boolean isExisted(String id, String name) {
String sql = "select count(1) as num from " + static_TABLE + " where ID = '" + id + "' and NAME = '" + name + "'";
String num = JdbcUtil.executeSelect(sql, "num");
return Integer.valueOf(num) > 0;
}
private static String uuid() {
return UUID.randomUUID().toString().replace("-", "");
}
二、誰寫的?拖出去,斬了!
- 優(yōu)化1:先查詢?nèi)繑?shù)據(jù),緩存到map中,插入前再進行判斷,速度快了很多。
- 優(yōu)化2:如果單個Excel文件過大,可以采用 異步 + 多線程 讀取若干行,分批入庫。
使用雙異步后,從 191s 優(yōu)化到 2s,你敢信?
下面貼出異步讀取Excel文件、并分批讀取大Excel文件的關(guān)鍵代碼
1、readExcelCacheAsync控制類
@RequestMapping(value = "/readExcelCacheAsync", method = RequestMethod.POST)
@ResponseBody
public String readExcelCacheAsync() {
String path = "G:\\測試\\data\\";
try {
// 在讀取Excel之前,緩存所有數(shù)據(jù)
USER_INFO_SET = getUserInfo();
File file = new File(path);
String[] xlsxArr = file.list();
for (int i = 0; i < xlsxArr.length; i++) {
File fileTemp = new File(path + "\\" + xlsxArr[i]);
String filename = fileTemp.getName().replace(".xlsx", "");
readExcelCacheAsyncService.readXls(path + filename + ".xlsx", filename);
}
} catch (Exception e) {
logger.error("|#ReadDBCsv|#異常: ", e);
return "error";
}
return "success";
}
2、分批讀取超大Excel文件
@Async("async-executor")
public void readXls(String filePath, String filename) throws Exception {
@SuppressWarnings("resource")
XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new FileInputStream(filePath));
// 讀取第一個工作表
XSSFSheet sheet = xssfWorkbook.getSheetAt(0);
// 總行數(shù)
int maxRow = sheet.getLastRowNum();
logger.info(filename + ".xlsx,一共" + maxRow + "行數(shù)據(jù)!");
StringBuilder insertBuilder = new StringBuilder();
insertBuilder.append("insert into ").append(filename).append(" ( UUID,");
XSSFRow row = sheet.getRow(0);
for (int i = 0; i < row.getPhysicalNumberOfCells(); i++) {
insertBuilder.append(row.getCell(i)).append(",");
}
insertBuilder.deleteCharAt(insertBuilder.length() - 1);
insertBuilder.append(" ) values ( ");
int times = maxRow / STEP + 1;
//logger.info("將" + maxRow + "行數(shù)據(jù)分" + times + "次插入數(shù)據(jù)庫!");
for (int time = 0; time < times; time++) {
int start = STEP * time + 1;
int end = STEP * time + STEP;
if (time == times - 1) {
end = maxRow;
}
if(end + 1 - start > 0){
//logger.info("第" + (time + 1) + "次插入數(shù)據(jù)庫!" + "準備插入" + (end + 1 - start) + "條數(shù)據(jù)!");
//readExcelDataAsyncService.readXlsCacheAsync(sheet, row, start, end, insertBuilder);
readExcelDataAsyncService.readXlsCacheAsyncMybatis(sheet, row, start, end, insertBuilder);
}
}
}
3、異步批量入庫
@Async("async-executor")
public void readXlsCacheAsync(XSSFSheet sheet, XSSFRow row, int start, int end, StringBuilder insertBuilder) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = start; i <= end; i++) {
XSSFRow xssfRow = sheet.getRow(i);
String id = "";
String name = "";
for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
if (j == 0) {
id = xssfRow.getCell(j) + "";
} else if (j == 1) {
name = xssfRow.getCell(j) + "";
}
}
// 先在讀取Excel之前,緩存所有數(shù)據(jù),再做判斷
boolean flag = isExisted(id, name);
if (!flag) {
stringBuilder.append(insertBuilder);
stringBuilder.append('\'').append(uuid()).append('\'').append(",");
for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
stringBuilder.append('\'').append(value).append('\'').append(",");
}
stringBuilder.deleteCharAt(stringBuilder.length() - 1);
stringBuilder.append(" )").append("\n");
}
}
List<String> collect = Arrays.stream(stringBuilder.toString().split("\n")).collect(Collectors.toList());
if (collect != null && collect.size() > 0) {
int sum = JdbcUtil.executeDML(collect);
}
}
private boolean isExisted(String id, String name) {
return ReadExcelCacheAsyncController.USER_INFO_SET.contains(id + "," + name);
}
4、異步線程池工具類
@Async的作用就是異步處理任務。
- 在方法上添加@Async,表示此方法是異步方法;
- 在類上添加@Async,表示類中的所有方法都是異步方法;
- 使用此注解的類,必須是Spring管理的類;
- 需要在啟動類或配置類中加入@EnableAsync注解,@Async才會生效;
在使用@Async時,如果不指定線程池的名稱,也就是不自定義線程池,@Async是有默認線程池的,使用的是Spring默認的線程池SimpleAsyncTaskExecutor。
默認線程池的默認配置如下:
- 默認核心線程數(shù):8;
- 最大線程數(shù):Integet.MAX_VALUE;
- 隊列使用LinkedBlockingQueue;
- 容量是:Integet.MAX_VALUE;
- 空閑線程保留時間:60s;
- 線程池拒絕策略:AbortPolicy;
從最大線程數(shù)可以看出,在并發(fā)情況下,會無限制的創(chuàng)建線程,我勒個嗎啊。
也可以通過yml重新配置:
spring:
task:
execution:
pool:
max-size: 10
core-size: 5
keep-alive: 3s
queue-capacity: 1000
thread-name-prefix: my-executor
也可以自定義線程池,下面通過簡單的代碼來實現(xiàn)以下@Async自定義線程池。
@EnableAsync// 支持異步操作
@Configuration
public class AsyncTaskConfig {
/**
* com.google.guava中的線程池
* @return
*/
@Bean("my-executor")
public Executor firstExecutor() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-executor").build();
// 獲取CPU的處理器數(shù)量
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(curSystemThreads, 100,
200, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), threadFactory);
threadPool.allowsCoreThreadTimeOut();
return threadPool;
}
/**
* Spring線程池
* @return
*/
@Bean("async-executor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// 核心線程數(shù)
taskExecutor.setCorePoolSize(24);
// 線程池維護線程的最大數(shù)量,只有在緩沖隊列滿了之后才會申請超過核心線程數(shù)的線程
taskExecutor.setMaxPoolSize(200);
// 緩存隊列
taskExecutor.setQueueCapacity(50);
// 空閑時間,當超過了核心線程數(shù)之外的線程在空閑時間到達之后會被銷毀
taskExecutor.setKeepAliveSeconds(200);
// 異步方法內(nèi)部線程名稱
taskExecutor.setThreadNamePrefix("async-executor-");
/**
* 當線程池的任務緩存隊列已滿并且線程池中的線程數(shù)目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕策略
* 通常有以下四種策略:
* ThreadPoolExecutor.AbortPolicy:丟棄任務并拋出RejectedExecutionException異常。
* ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
* ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執(zhí)行任務(重復此過程)
* ThreadPoolExecutor.CallerRunsPolicy:重試添加當前的任務,自動重復調(diào)用 execute() 方法,直到成功
*/
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();
return taskExecutor;
}
}
5、異步失效的原因
- 注解@Async的方法不是public方法。
- 注解@Async的返回值只能為void或Future。
- 注解@Async方法使用static修飾也會失效。
- 沒加@EnableAsync注解。
- 調(diào)用方和@Async不能在一個類中。
- 在Async方法上標注@Transactional是沒用的,但在Async方法調(diào)用的方法上標注@Transcational是有效的。
三、線程池中的核心線程數(shù)設置問題
有一個問題,一直沒時間摸索,線程池中的核心線程數(shù)CorePoolSize、最大線程數(shù)MaxPoolSize,設置成多少,最合適,效率最高。
借著這個機會,測試一下。
1、我記得有這樣一個說法,CPU的處理器數(shù)量
將核心線程數(shù)CorePoolSize設置成CPU的處理器數(shù)量,是不是效率最高的?
// 獲取CPU的處理器數(shù)量
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;
Runtime.getRuntime().availableProcessors()獲取的是CPU核心線程數(shù),也就是計算資源。
- CPU密集型,線程池大小設置為N,也就是和cpu的線程數(shù)相同,可以盡可能地避免線程間上下文切換,但在實際開發(fā)中,一般會設置為N+1,為了防止意外情況出現(xiàn)線程阻塞,如果出現(xiàn)阻塞,多出來的線程會繼續(xù)執(zhí)行任務,保證CPU的利用效率。
- IO密集型,線程池大小設置為2N,這個數(shù)是根據(jù)業(yè)務壓測出來的,如果不涉及業(yè)務就使用推薦。
在實際中,需要對具體的線程池大小進行調(diào)整,可以通過壓測及機器設備現(xiàn)狀,進行調(diào)整大小。
如果線程池太大,則會造成CPU不斷的切換,對整個系統(tǒng)性能也不會有太大的提升,反而會導致系統(tǒng)緩慢。
我的電腦的CPU的處理器數(shù)量是24。
那么一次讀取多少行最合適呢?
測試的Excel中含有10萬條數(shù)據(jù),10萬/24 = 4166,那么我設置成4200,是不是效率最佳呢?
測試的過程中發(fā)現(xiàn),好像真的是這樣的。
2、我記得大家都習慣性的將核心線程數(shù)CorePoolSize和最大線程數(shù)MaxPoolSize設置成一樣的,都愛設置成200
是隨便寫的,還是經(jīng)驗而為之?
測試發(fā)現(xiàn),當你將核心線程數(shù)CorePoolSize和最大線程數(shù)MaxPoolSize都設置為200的時候,第一次它會同時開啟150個線程,來進行工作。
這個是為什么?
3、經(jīng)過數(shù)十次的測試
- 發(fā)現(xiàn)核心線程數(shù)好像差別不大。
- 每次讀取和入庫的數(shù)量是關(guān)鍵,不能太多,因為每次入庫會變慢。
- 也不能太少,如果太少,超過了150個線程,就會造成線程阻塞,也會變慢。
四、通過EasyExcel讀取并插入數(shù)據(jù)庫
EasyExcel的方式,我就不寫雙異步優(yōu)化了,大家切記陷入低水平勤奮的怪圈。
1、ReadEasyExcelController
@RequestMapping(value = "/readEasyExcel", method = RequestMethod.POST)
@ResponseBody
public String readEasyExcel() {
try {
String path = "G:\\測試\\data\\";
String[] xlsxArr = new File(path).list();
for (int i = 0; i < xlsxArr.length; i++) {
String filePath = path + xlsxArr[i];
File fileTemp = new File(path + xlsxArr[i]);
String fileName = fileTemp.getName().replace(".xlsx", "");
List<UserInfo> list = new ArrayList<>();
EasyExcel.read(filePath, UserInfo.class, new ReadEasyExeclAsyncListener(readEasyExeclService, fileName, batchCount, list)).sheet().doRead();
}
}catch (Exception e){
logger.error("readEasyExcel 異常:",e);
return "error";
}
return "suceess";
}
2、ReadEasyExeclAsyncListener
public ReadEasyExeclService readEasyExeclService;
// 表名
public String TABLE_NAME;
// 批量插入閾值
private int BATCH_COUNT;
// 數(shù)據(jù)集合
private List<UserInfo> LIST;
public ReadEasyExeclAsyncListener(ReadEasyExeclService readEasyExeclService, String tableName, int batchCount, List<UserInfo> list) {
this.readEasyExeclService = readEasyExeclService;
this.TABLE_NAME = tableName;
this.BATCH_COUNT = batchCount;
this.LIST = list;
}
@Override
public void invoke(UserInfo data, AnalysisContext analysisContext) {
data.setUuid(uuid());
data.setTableName(TABLE_NAME);
LIST.add(data);
if(LIST.size() >= BATCH_COUNT){
// 批量入庫
readEasyExeclService.saveDataBatch(LIST);
}
}
@Override
public void doAfterAllAnalysed(AnalysisContext analysisContext) {
if(LIST.size() > 0){
// 最后一批入庫
readEasyExeclService.saveDataBatch(LIST);
}
}
public static String uuid() {
return UUID.randomUUID().toString().replace("-", "");
}
}
3、ReadEasyExeclServiceImpl
@Service
public class ReadEasyExeclServiceImpl implements ReadEasyExeclService {
@Resource
private ReadEasyExeclMapper readEasyExeclMapper;
@Override
public void saveDataBatch(List<UserInfo> list) {
// 通過mybatis入庫
readEasyExeclMapper.saveDataBatch(list);
// 通過JDBC入庫
// insertByJdbc(list);
list.clear();
}
private void insertByJdbc(List<UserInfo> list){
List<String> sqlList = new ArrayList<>();
for (UserInfo u : list){
StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("insert into ").append(u.getTableName()).append(" ( UUID,ID,NAME,AGE,ADDRESS,PHONE,OP_TIME ) values ( ");
sqlBuilder.append("'").append(ReadEasyExeclAsyncListener.uuid()).append("',")
.append("'").append(u.getId()).append("',")
.append("'").append(u.getName()).append("',")
.append("'").append(u.getAge()).append("',")
.append("'").append(u.getAddress()).append("',")
.append("'").append(u.getPhone()).append("',")
.append("sysdate )");
sqlList.add(sqlBuilder.toString());
}
JdbcUtil.executeDML(sqlList);
}
}
4、UserInfo
@Data
public class UserInfo {
private String tableName;
private String uuid;
@ExcelProperty(value = "ID")
private String id;
@ExcelProperty(value = "NAME")
private String name;
@ExcelProperty(value = "AGE")
private String age;
@ExcelProperty(value = "ADDRESS")
private String address;
@ExcelProperty(value = "PHONE")
private String phone;
}