一不小心成了知名開源項目的貢獻者?!
真是一個意外之喜,之前寫的 18 張圖手把手教你使用 Canal Adapter 同步 MySQL 數(shù)據(jù)到 ES8,建議收藏!寫了怎么使用 Canal Adapter 進行 MySQL 數(shù)據(jù)同步到 ES8。
在這個文章中我提到了這么一個內(nèi)容,官方自帶的 ES8 Adapter 同步類不支持 ES8 的 TLS 認證,所以導致我們在部署 ES8 集群的時候需要關閉這個安全功能。
但是作為技術人員就是見不得功能被閹割,所以就拉取了源碼,在原有的基礎上進行改造支持了 TLS 認證。
圖片
改完過后本地重新打包實現(xiàn)了功能,本著獨樂樂不如眾樂樂,就順手提了一個 PR,結(jié)果萬萬沒想到,最近發(fā)現(xiàn)這個 PR 被合并到主干了!?。?/p>
圖片
就這樣一不小心成了一個幾萬星的知名開源項目貢獻者,大佬還在 PR 下面回復了一個 tks,突然發(fā)現(xiàn)自己和大佬也可以靠的這么近。
圖片
問題描述
在沒有修復的時候,啟動了 canal 適配器過后,在進行 MySQL 數(shù)據(jù)同步到 ES8 的時候,出現(xiàn)下面的錯誤,這個錯誤的原因是因為 ES8 默認開啟了安全認證,并且自帶了簽名證書。Canal Adapter 在適配 ES8 的時候并沒有支持這個功能,因此報錯了。
2024-04-13 20:55:39.368 [pool-3-thread-1] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - ElasticsearchException[java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: ExecutionException[javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: ValidatorException[PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target]; nested: SunCertPathBuilderException[unable to find valid certification path to requested target];
java.lang.RuntimeException: ElasticsearchException[java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: ExecutionException[javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: ValidatorException[PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target]; nested: SunCertPathBuilderException[unable to find valid certification path to requested target];
at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:112)
at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:60)
at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:104)
at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:83)
at com.alibaba.otter.canal.client.adapter.ProxyOuterAdapter.sync(ProxyOuterAdapter.java:42)
at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.batchSync(AdapterProcessor.java:139)
at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$1(AdapterProcessor.java:97)
at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:890)
at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$2(AdapterProcessor.java:94)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.ElasticsearchException: java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2695)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2154)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2118)
at org.elasticsearch.client.IndicesClient.getMapping(IndicesClient.java:538)
at com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection.getMapping(ESConnection.java:132)
at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getEsType(ES8xTemplate.java:392)
at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getValFromData(ES8xTemplate.java:269)
at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getESDataFromDmlData(ES8xTemplate.java:324)
at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.singleTableSimpleFiledUpdate(ESSyncService.java:814)
at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.update(ESSyncService.java:208)
at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:97)
... 12 common frames omitted
Caused by: java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.getValue(BaseFuture.java:257)
at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:244)
at org.elasticsearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:75)
at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2692)
... 23 common frames omitted
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1431)
at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
at org.apache.http.nio.reactor.ssl.SSLIOSession.doWrap(SSLIOSession.java:270)
at org.apache.http.nio.reactor.ssl.SSLIOSession.doHandshake(SSLIOSession.java:316)
at org.apache.http.nio.reactor.ssl.SSLIOSession.isAppInputReady(SSLIOSession.java:537)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:120)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
... 1 common frames omitted
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1509)
at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
at sun.security.ssl.Handshaker.processLoop(Handshaker.java:979)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:919)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:916)
at java.security.AccessController.doPrivileged(Native Method)
at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1369)
at org.apache.http.nio.reactor.ssl.SSLIOSession.doRunTask(SSLIOSession.java:288)
at org.apache.http.nio.reactor.ssl.SSLIOSession.doHandshake(SSLIOSession.java:356)
... 9 common frames omitted
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292)
at sun.security.validator.Validator.validate(Validator.java:260)
at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:324)
at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:281)
at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136)
at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1496)
... 17 common frames omitted
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280)
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:382)
... 23 common frames omitted
2024-04-13 20:55:39.370 [Thread-4] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - Outer adapter sync failed! Error sync and rollback, execute times: 13
解決方案
解決方案有兩個:
- 部署搭建 ES 集群的時候,關閉這個安全證書的功能,對應 ES 的配置是在 elasticsearch.yml 里面的 xpack.security.enabled 為 false,docker 部署的 ES 需要進入的容器里面去進行修改,或者在容器啟動的時候就配置。
- 修改 canal adapter 的源碼,兼容證書;
這里主要講一下方案 2,因為對于方案 1 需要取消 ES8 的安全功能,不推薦。
修改源碼,兼容 ES8 安全配置
拷貝證書
在使用 docker 安裝和部署 ES8 的時候,默認已經(jīng)創(chuàng)建好了一個證書,我們需要將證書從容器中拷貝出來,命令如下
docker cp es01:/usr/share/elasticsearch/config/certs/http_ca.crt .
這里的 es01 是容器名稱,根據(jù)自己的進行替換即可,拷貝出來的路徑可以自行替換,記住在哪就行,后面會用到。
修改代碼
在 canal adapter 的源碼中,找到下面這類,com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection#ESConnection
圖片
將其中的構(gòu)造方法改成下面這段
public ESConnection(String[] hosts, Map<String, String> properties) throws UnknownHostException {
String caPath = properties.get("security.ca.path");
if (StringUtils.isNotEmpty(caPath)) {
connectEsWithCa(hosts, properties, caPath);
} else {
connectEsWithoutCa(hosts, properties);
}
}
private void connectEsWithCa(String[] hosts, Map<String, String> properties, String caPath) {
Path caCertificatePath = Paths.get(caPath);
try (InputStream is = Files.newInputStream(caCertificatePath)) {
CertificateFactory factory = CertificateFactory.getInstance("X.509");
Certificate trustedCa = factory.generateCertificate(is);
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore, null);
final SSLContext sslContext = sslContextBuilder.build();
HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
String nameAndPwd = properties.get("security.auth");
if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
String[] nameAndPwdArr = nameAndPwd.split(":");
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpClientBuilder.setSSLContext(sslContext);
});
}
restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true).build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void connectEsWithoutCa(String[] hosts, Map<String, String> properties) {
HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
String nameAndPwd = properties.get("security.auth");
if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
String[] nameAndPwdArr = nameAndPwd.split(":");
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
restClientBuilder.setHttpClientConfigCallback(
httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true)
.build();
}
簡單說明
- 其中 connectEsWithoutCa 方法為原來的構(gòu)造方法的實現(xiàn);
- connectEsWithCa 方法為兼容了安全認證的方法構(gòu)造方法實現(xiàn);
- 這兩個方法的使用根據(jù)是否配置了 security.ca.path 屬性來判斷;
- 而 security.ca.path 這個配置是在啟動器的 outerAdapters 的 ES8 的 properties 下,與 security.auth 同級;
代碼修改到這里就結(jié)束了,下面看下如何使用
重新打包
修改好了代碼過后,通過 maven 重新打包,打包出對應的 es8 的 jar 包即可。
圖片
將編譯打包后的 jar 重新復制到 canal 適配器的 plugin 目錄下面,并且修改一下對應的名稱跟下載下來的版本一致即可,比如我這邊之前下載的 1.1.7 版本。
圖片
其中 client-adapter.es8x-1.1.7-jar-with-dependencies.jar.7 是原來下載下來攜帶的 jar,client-adapter.es8x-1.1.7-jar-with-dependencies.jar 是我重新打包編譯后的 jar。
修改啟動器的配置
前面講到兼容代碼的時候,我們使用了一個叫 security.ca.path 的配置,所以我們需要將前面拷貝的 ca 證書路徑,配置在這個屬性上,即 security.ca.path: /opt/canal/http_ca.crt
完整的配置如下所示
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: -1
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
# kafka.bootstrap.servers: 127.0.0.1:9092
# kafka.enable.auto.commit: false
# kafka.auto.commit.interval.ms: 1000
# kafka.auto.offset.reset: latest
# kafka.request.timeout.ms: 40000
# kafka.session.timeout.ms: 30000
# kafka.isolation.level: read_committed
# kafka.max.poll.records: 1000
# rocketMQ consumer
# rocketmq.namespace:
# rocketmq.namesrv.addr: 127.0.0.1:9876
# rocketmq.batch.size: 1000
# rocketmq.enable.message.trace: false
# rocketmq.customized.trace.topic:
# rocketmq.access.channel:
# rocketmq.subscribe.filter:
# rabbitMQ consumer
# rabbitmq.host:
# rabbitmq.virtual.host:
# rabbitmq.username:
# rabbitmq.password:
# rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/database?useUnicode=true
username: root
password: 123456
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: es8
key: es-key
hosts: https://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # transport or rest
security.auth: elastic:password
security.ca.path: /opt/canal/http_ca.crt
cluster.name: docker-cluster
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# druid.stat.enable: false
# druid.stat.slowSqlMillis: 1000
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
# - name: phoenix
# key: phoenix
# properties:
# jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
# jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
# jdbc.username:
# jdbc.password:
配置好了證書路徑過后,就可以正常啟動和同步數(shù)據(jù)了,具體的實操也可以看對應的公眾號文章 18 張圖手把手教你使用 Canal Adapter 同步 MySQL 數(shù)據(jù)到 ES8,建議收藏!,這里就不重復演示了。
總結(jié)
以前一直想著要參與一下開源項目,沒想到這次也算是實現(xiàn)了一個小小的目標,其實這次純屬是一個意外之喜,原本只是在自己學習和研究 canal 的數(shù)據(jù)同步,然后發(fā)現(xiàn)了這個問題,最后就修復了一下,順手提了一個 PR,沒想到還真的被合并了,想想還是很激動的。
這個事情告訴我們只要真正的去參與和使用并了解一個開源項目了過后,還是有機會貢獻自己的代碼的,哪怕只是一個很小的一部分,也算是為開源項目貢獻了一份自己的綿薄之力。
另外最近也發(fā)現(xiàn)了另一個開源項目的一些小 bug,回頭再提交一下 PR,向著開源的道路繼續(xù)前行。