干貨分享:利用Java多線程技術(shù)導(dǎo)入數(shù)據(jù)到Elasticsearch
前言

近期接到一個(gè)任務(wù),需要改造現(xiàn)有從mysql往Elasticsearch導(dǎo)入數(shù)據(jù)MTE(mysqlToEs)小工具,由于之前采用單線程導(dǎo)入,千億數(shù)據(jù)需要兩周左右的時(shí)間才能導(dǎo)入完成,導(dǎo)入效率非常低。所以樓主花了3天的時(shí)間,利用java線程池框架Executors中的FixedThreadPool線程池重寫了MTE導(dǎo)入工具,單臺服務(wù)器導(dǎo)入效率提高十幾倍(合理調(diào)整線程數(shù)據(jù),效率更高)。
關(guān)鍵技術(shù)棧
- Elasticsearch
- jdbc
- ExecutorService\Thread
- sql
工具說明
maven依賴
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>${mysql.version}</version>
- </dependency>
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>${elasticsearch.version}</version>
- </dependency>
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>transport</artifactId>
- <version>${elasticsearch.version}</version>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>${lombok.version}</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>${fastjson.version}</version>
- </dependency>
java線程池設(shè)置
默認(rèn)線程池大小為21個(gè),可調(diào)整。其中POR為處理流程已辦數(shù)據(jù)線程池,ROR為處理流程已閱數(shù)據(jù)線程池。
- private static int THREADS = 21;
- public static ExecutorService POR = Executors.newFixedThreadPool(THREADS);
- public static ExecutorService ROR = Executors.newFixedThreadPool(THREADS);
定義已辦生產(chǎn)者線程/已閱生產(chǎn)者線程:ZlPendProducer/ZlReadProducer
- public class ZlPendProducer implements Runnable {
- ...
- @Override
- public void run() {
- System.out.println(threadName + "::啟動...");
- for (int j = 0; j < Const.TBL.TBL_PEND_COUNT; j++)
- try {
- ....
- int size = 1000;
- for (int i = 0; i < count; i += size) {
- if (i + size > count) {
- //作用為size***沒有100條數(shù)據(jù)則剩余幾條newList中就裝幾條
- size = count - i;
- }
- String sql = "select * from " + tableName + " limit " + i + ", " + size;
- System.out.println(tableName + "::sql::" + sql);
- rs = statement.executeQuery(sql);
- List<HistPendingEntity> lst = new ArrayList<>();
- while (rs.next()) {
- HistPendingEntity p = PendUtils.getHistPendingEntity(rs);
- lst.add(p);
- }
- MteExecutor.POR.submit(new ZlPendConsumer(lst));
- Thread.sleep(2000);
- }
- ....
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- public class ZlReadProducer implements Runnable {
- ...已閱生產(chǎn)者處理邏輯同已辦生產(chǎn)者
- }
定義已辦消費(fèi)者線程/已閱生產(chǎn)者線程:ZlPendConsumer/ZlReadConsumer
- public class ZlPendConsumer implements Runnable {
- private String threadName;
- private List<HistPendingEntity> lst;
- public ZlPendConsumer(List<HistPendingEntity> lst) {
- this.lst = lst;
- }
- @Override
- public void run() {
- ...
- lst.forEach(v -> {
- try {
- String json = new Gson().toJson(v);
- EsClient.addDataInJSON(json, Const.ES.HistPendDB_Index, Const.ES.HistPendDB_type, v.getPendingId(), null);
- Const.COUNTER.LD_P.incrementAndGet();
- } catch (Exception e) {
- e.printStackTrace();
- System.out.println("err::PendingId::" + v.getPendingId());
- }
- });
- ...
- }
- }
- public class ZlReadConsumer implements Runnable {
- //已閱消費(fèi)者處理邏輯同已辦消費(fèi)者
- }
定義導(dǎo)入Elasticsearch數(shù)據(jù)監(jiān)控線程:Monitor
監(jiān)控線程-Monitor為了計(jì)算每分鐘導(dǎo)入Elasticsearch的數(shù)據(jù)總條數(shù),利用監(jiān)控線程,可以調(diào)整線程池的線程數(shù)的大小,以便利用多線程更快速的導(dǎo)入數(shù)據(jù)。
- public void monitorToES() {
- new Thread(() -> {
- while (true) {
- StringBuilder sb = new StringBuilder();
- sb.append("已辦表數(shù)::").append(Const.TBL.TBL_PEND_COUNT)
- .append("::已辦總數(shù)::").append(Const.COUNTER.LD_P_TOTAL)
- .append("::已辦入庫總數(shù)::").append(Const.COUNTER.LD_P);
- sb.append("~~~~已閱表數(shù)::").append(Const.TBL.TBL_READ_COUNT);
- sb.append("::已閱總數(shù)::").append(Const.COUNTER.LD_R_TOTAL)
- .append("::已閱入庫總數(shù)::").append(Const.COUNTER.LD_R);
- if (ldPrevPendCount == 0 && ldPrevReadCount == 0) {
- ldPrevPendCount = Const.COUNTER.LD_P.get();
- ldPrevReadCount = Const.COUNTER.LD_R.get();
- start = System.currentTimeMillis();
- } else {
- long end = System.currentTimeMillis();
- if ((end - start) / 1000 >= 60) {
- start = end;
- sb.append("\n#########################################\n");
- sb.append("已辦每分鐘TPS::" + (Const.COUNTER.LD_P.get() - ldPrevPendCount) + "條");
- sb.append("::已閱每分鐘TPS::" + (Const.COUNTER.LD_R.get() - ldPrevReadCount) + "條");
- ldPrevPendCount = Const.COUNTER.LD_P.get();
- ldPrevReadCount = Const.COUNTER.LD_R.get();
- }
- }
- System.out.println(sb.toString());
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).start();
- }
初始化Elasticsearch:EsClient
- String cName = meta.get("cName");//es集群名字
- String esNodes = meta.get("esNodes");//es集群ip節(jié)點(diǎn)
- Settings esSetting = Settings.builder()
- .put("cluster.name", cName)
- .put("client.transport.sniff", true)//增加嗅探機(jī)制,找到ES集群
- .put("thread_pool.search.size", 5)//增加線程池個(gè)數(shù),暫時(shí)設(shè)為5
- .build();
- String[] nodes = esNodes.split(",");
- client = new PreBuiltTransportClient(esSetting);
- for (String node : nodes) {
- if (node.length() > 0) {
- String[] hostPort = node.split(":");
- client.addTransportAddress(new TransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
- }
- }
初始化數(shù)據(jù)庫連接
- conn = DriverManager.getConnection(url, user, password);
啟動參數(shù)
- nohup java -jar mte.jar ES-Cluster2019 node1:9300,node2:9300,node3:9300 root 123456! jdbc:mysql://ip:3306/mte 130 130 >> ./mte.log 2>&1 &
參數(shù)說明
ES-Cluster2019 為Elasticsearch集群名字
node1:9300,node2:9300,node3:9300為es的節(jié)點(diǎn)IP
130 130為已辦已閱分表的數(shù)據(jù)
程序入口:MteMain

- // 監(jiān)控線程
- Monitor monitorService = new Monitor();
- monitorService.monitorToES();
- // 已辦生產(chǎn)者線程
- Thread pendProducerThread = new Thread(new ZlPendProducer(conn, "ZlPendProducer"));
- pendProducerThread.start();
- // 已閱生產(chǎn)者線程
- Thread readProducerThread = new Thread(new ZlReadProducer(conn, "ZlReadProducer"));
- readProducerThread.start();