ElasticSearch - 批量更新bulk死鎖問(wèn)題排查
一、問(wèn)題系統(tǒng)介紹
1. 監(jiān)聽(tīng)商品變更MQ消息,查詢(xún)商品最新的信息,調(diào)用BulkProcessor批量更新ES集群中的商品字段信息;
2. 由于商品數(shù)據(jù)非常多,所以將商品數(shù)據(jù)存儲(chǔ)到ES集群上,整個(gè)ES集群共劃分了256個(gè)分片,并根據(jù)商品的三級(jí)類(lèi)目ID進(jìn)行分片路由。
比如一個(gè)SKU的商品名稱(chēng)發(fā)生變化,我們就會(huì)收到這個(gè)SKU的變更MQ消息,然后再去查詢(xún)商品接口,將商品的最新名稱(chēng)查詢(xún)回來(lái),再根據(jù)這個(gè)SKU的三級(jí)分類(lèi)ID進(jìn)行路由,找到對(duì)應(yīng)的ES集群分片,然后更新商品名稱(chēng)字段信息。
由于商品變更MQ消息量巨大,為了提升更新ES的性能,防止出現(xiàn)MQ消息積壓?jiǎn)栴},所以本系統(tǒng)使用了BulkProcessor進(jìn)行批量異步更新。
ES客戶(hù)端版本如下:
<dependency>
<artifactId>elasticsearch-rest-client</artifactId>
<groupId>org.elasticsearch.client</groupId>
<version>6.5.3</version>
</dependency>
BulkProcessor配置偽代碼如下:
//在這里調(diào)用build()方法構(gòu)造bulkProcessor,在底層實(shí)際上是用了bulk的異步操作
this.fullDataBulkProcessor = BulkProcessor.builder((request, bulkListener) ->
fullDataEsClient.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener)
// 1000條數(shù)據(jù)請(qǐng)求執(zhí)行一次bulk
.setBulkActions(1000)
// 5mb的數(shù)據(jù)刷新一次bulk
.setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
// 并發(fā)請(qǐng)求數(shù)量, 0不并發(fā), 1并發(fā)允許執(zhí)行
.setConcurrentRequests(1)
// 固定1s必須刷新一次
.setFlushInterval(TimeValue.timeValueSeconds(1L))
// 重試5次,間隔1s
.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 5))
.build();
二、問(wèn)題怎么發(fā)現(xiàn)的
1. 618大促開(kāi)始后,由于商品變更MQ消息非常頻繁,MQ消息每天的消息量更是達(dá)到了日常的數(shù)倍,而且好多商品還變更了三級(jí)類(lèi)目ID;
2. 系統(tǒng)在更新這些三級(jí)類(lèi)目ID發(fā)生變化的SKU商品信息時(shí),根據(jù)修改后的三級(jí)類(lèi)目ID路由后的分片更新商品信息時(shí)發(fā)生了錯(cuò)誤,并且重試了5次,依然沒(méi)有成功;
3. 因?yàn)樵谛侣酚傻姆制蠜](méi)有這個(gè)商品的索引信息,這些更新請(qǐng)求永遠(yuǎn)也不會(huì)執(zhí)行成功,系統(tǒng)的日志文件中也記錄了大量的異常重試日志。
4. 商品變更MQ消息也開(kāi)始出現(xiàn)了積壓報(bào)警,MQ消息的消費(fèi)速度明顯趕不上生產(chǎn)速度。
5. 觀察MQ消息消費(fèi)者的UMP監(jiān)控?cái)?shù)據(jù),發(fā)現(xiàn)消費(fèi)性能很平穩(wěn),沒(méi)有明顯波動(dòng),但是調(diào)用次數(shù)會(huì)在系統(tǒng)消費(fèi)MQ一段時(shí)間后出現(xiàn)斷崖式下降,由原來(lái)的每分鐘幾萬(wàn)調(diào)用量逐漸下降到個(gè)位數(shù)。
6. 在重啟應(yīng)用后,系統(tǒng)又開(kāi)始消費(fèi),UMP監(jiān)控調(diào)用次數(shù)恢復(fù)到正常水平,但是系統(tǒng)運(yùn)行一段時(shí)間后,還是會(huì)出現(xiàn)消費(fèi)暫停問(wèn)題,仿佛所有消費(fèi)線(xiàn)程都被暫停了一樣。
三、排查問(wèn)題的詳細(xì)過(guò)程
首先找一臺(tái)暫停消費(fèi)MQ消息的容器,查看應(yīng)用進(jìn)程ID,使用jstack命令dump應(yīng)用進(jìn)程的整個(gè)線(xiàn)程堆棧信息,將導(dǎo)出的線(xiàn)程堆棧信息打包上傳到 https://fastthread.io/ 進(jìn)行線(xiàn)程狀態(tài)分析。分析報(bào)告如下:
通過(guò)分析報(bào)告發(fā)現(xiàn)有124個(gè)處于BLOCKED狀態(tài)的線(xiàn)程,然后可以點(diǎn)擊查看各線(xiàn)程的詳細(xì)堆棧信息,堆棧信息如下:
連續(xù)查看多個(gè)線(xiàn)程的詳細(xì)堆棧信息,MQ消費(fèi)線(xiàn)程都是在waiting to lock <0x00000005eb781b10> (a
org.elasticsearch.action.bulk.BulkProcessor),然后根據(jù)0x00000005eb781b10去搜索發(fā)現(xiàn),這個(gè)對(duì)象鎖正在被另外一個(gè)線(xiàn)程占用,占用線(xiàn)程堆棧信息如下:
這個(gè)線(xiàn)程狀態(tài)此時(shí)正處于WAITING狀態(tài),通過(guò)線(xiàn)程名稱(chēng)發(fā)現(xiàn),該線(xiàn)程應(yīng)該是ES客戶(hù)端內(nèi)部線(xiàn)程。正是該線(xiàn)程搶占了業(yè)務(wù)線(xiàn)程的鎖,然后又在等待其他條件觸發(fā)該線(xiàn)程執(zhí)行,所以導(dǎo)致了所有的MQ消費(fèi)業(yè)務(wù)線(xiàn)程一直無(wú)法獲取BulkProcessor內(nèi)部的鎖,導(dǎo)致出現(xiàn)了消費(fèi)暫停問(wèn)題。
但是這個(gè)線(xiàn)程elasticsearch[scheduler][T#1]為啥不能執(zhí)行? 它是什么時(shí)候啟動(dòng)的? 又有什么作用?
就需要我們對(duì)BulkProcessor進(jìn)行深入分析,由于BulkProcessor是通過(guò)builder模塊進(jìn)行創(chuàng)建的,所以深入builder源碼,了解一下BulkProcessor的創(chuàng)建過(guò)程。
public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {
Objects.requireNonNull(consumer, "consumer");
Objects.requireNonNull(listener, "listener");
final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
return new Builder(consumer, listener,
(delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS),
() -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
}
內(nèi)部創(chuàng)建了一個(gè)時(shí)間調(diào)度執(zhí)行線(xiàn)程池,線(xiàn)程命名規(guī)則和上述持有鎖的線(xiàn)程名稱(chēng)相似,具體代碼如下:
static ScheduledThreadPoolExecutor initScheduler(Settings settings) {
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
scheduler.setRemoveOnCancelPolicy(true);
return scheduler;
}
最后在build方法內(nèi)部執(zhí)行了BulkProcessor的內(nèi)部有參構(gòu)造方法,在構(gòu)造方法內(nèi)部啟動(dòng)了一個(gè)周期性執(zhí)行的flushing任務(wù),代碼如下
BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
Scheduler scheduler, Runnable onClose) {
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.getBytes();
this.bulkRequest = new BulkRequest();
this.scheduler = scheduler;
this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);
// Start period flushing task after everything is setup
this.cancellableFlushTask = startFlushTask(flushInterval, scheduler);
this.onClose = onClose;
}
private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) {
if (flushInterval == null) {
return new Scheduler.Cancellable() {
@Override
public void cancel() {}
@Override
public boolean isCancelled() {
return true;
}
};
}
final Runnable flushRunnable = scheduler.preserveContext(new Flush());
return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
}
class Flush implements Runnable {
@Override
public void run() {
synchronized (BulkProcessor.this) {
if (closed) {
return;
}
if (bulkRequest.numberOfActions() == 0) {
return;
}
execute();
}
}
}
通過(guò)源代碼發(fā)現(xiàn),該flush任務(wù)就是在創(chuàng)建BulkProcessor對(duì)象時(shí)設(shè)置的固定時(shí)間flush邏輯,當(dāng)setFlushInterval方法參數(shù)生效,就會(huì)啟動(dòng)一個(gè)后臺(tái)定時(shí)flush任務(wù)。flush間隔,由setFlushInterval方法參數(shù)定義。該flush任務(wù)在運(yùn)行期間,也會(huì)搶占BulkProcessor對(duì)象鎖,搶到鎖后,才會(huì)執(zhí)行execute方法。具體的方法調(diào)用關(guān)系源代碼如下:
/**
* Adds the data from the bytes to be processed by the bulk processor
*/
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
@Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception {
bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true, xContentType);
executeIfNeeded();
return this;
}
private void executeIfNeeded() {
ensureOpen();
if (!isOverTheLimit()) {
return;
}
execute();
}
// (currently) needs to be executed under a lock
private void execute() {
final BulkRequest bulkRequest = this.bulkRequest;
final long executionId = executionIdGen.incrementAndGet();
this.bulkRequest = new BulkRequest();
this.bulkRequestHandler.execute(bulkRequest, executionId);
}
而上述代碼中的add方法,則是由MQ消費(fèi)業(yè)務(wù)線(xiàn)程去調(diào)用,在該方法上同樣有一個(gè)synchronized關(guān)鍵字,所以消費(fèi)MQ業(yè)務(wù)線(xiàn)程會(huì)和flush任務(wù)執(zhí)行線(xiàn)程直接會(huì)存在鎖競(jìng)爭(zhēng)關(guān)系。具體MQ消費(fèi)業(yè)務(wù)線(xiàn)程調(diào)用偽代碼如下:
@Override
public void upsertCommonSku(CommonSkuEntity commonSkuEntity) {
String source = JsonUtil.toString(commonSkuEntity);
UpdateRequest updateRequest = new UpdateRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());
updateRequest.doc(source, XContentType.JSON);
IndexRequest indexRequest = new IndexRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());
indexRequest.source(source, XContentType.JSON);
updateRequest.upsert(indexRequest);
updateRequest.routing(commonSkuEntity.getCat3().toString());
fullbulkProcessor.add(updateRequest);
}
通過(guò)以上對(duì)線(xiàn)程堆棧分析,發(fā)現(xiàn)所有的業(yè)務(wù)線(xiàn)程都在等待elasticsearch[scheduler][T#1]線(xiàn)程釋放BulkProcessor對(duì)象鎖,但是該線(xiàn)程確一直沒(méi)有釋放該對(duì)象鎖,從而出現(xiàn)了業(yè)務(wù)線(xiàn)程的死鎖問(wèn)題。
結(jié)合應(yīng)用日志文件中出現(xiàn)的大量異常重試日志,可能與BulkProcessor的異常重試策略有關(guān),然后進(jìn)一步了解BulkProcessor的異常重試代碼邏輯。由于業(yè)務(wù)線(xiàn)程中提交BulkRequest請(qǐng)求都統(tǒng)一提交到了BulkRequestHandler對(duì)象中的execute方法內(nèi)部進(jìn)行處理,代碼如下:
public final class BulkRequestHandler {
private final Logger logger;
private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
private final BulkProcessor.Listener listener;
private final Semaphore semaphore;
private final Retry retry;
private final int concurrentRequests;
BulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy,
BulkProcessor.Listener listener, Scheduler scheduler, int concurrentRequests) {
assert concurrentRequests >= 0;
this.logger = Loggers.getLogger(getClass());
this.consumer = consumer;
this.listener = listener;
this.concurrentRequests = concurrentRequests;
this.retry = new Retry(backoffPolicy, scheduler);
this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
}
public void execute(BulkRequest bulkRequest, long executionId) {
Runnable toRelease = () -> {};
boolean bulkRequestSetupSuccessful = false;
try {
listener.beforeBulk(executionId, bulkRequest);
semaphore.acquire();
toRelease = semaphore::release;
CountDownLatch latch = new CountDownLatch(1);
retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
try {
listener.afterBulk(executionId, bulkRequest, response);
} finally {
semaphore.release();
latch.countDown();
}
}
@Override
public void onFailure(Exception e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
semaphore.release();
latch.countDown();
}
}
});
bulkRequestSetupSuccessful = true;
if (concurrentRequests == 0) {
latch.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} finally {
if (bulkRequestSetupSuccessful == false) { // if we fail on client.bulk() release the semaphore
toRelease.run();
}
}
}
boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
semaphore.release(this.concurrentRequests);
return true;
}
return false;
}
}
BulkRequestHandler通過(guò)構(gòu)造方法初始化了一個(gè)Retry任務(wù)對(duì)象,該對(duì)象中也傳入了一個(gè)Scheduler,且該對(duì)象和flush任務(wù)中傳入的是同一個(gè)線(xiàn)程池,該線(xiàn)程池內(nèi)部只維護(hù)了一個(gè)固定線(xiàn)程。而execute方法首先會(huì)先根據(jù)Semaphore來(lái)控制并發(fā)執(zhí)行數(shù)量,該并發(fā)數(shù)量在構(gòu)建BulkProcessor時(shí)通過(guò)參數(shù)指定,通過(guò)上述配置發(fā)現(xiàn)該值配置為1。所以每次只允許一個(gè)線(xiàn)程執(zhí)行該方法。即MQ消費(fèi)業(yè)務(wù)線(xiàn)程和flush任務(wù)線(xiàn)程,同一時(shí)間只能有一個(gè)線(xiàn)程可以執(zhí)行。然后下面在了解一下重試任務(wù)是如何執(zhí)行的,具體看如下代碼:
public void withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest,
ActionListener<BulkResponse> listener) {
RetryHandler r = new RetryHandler(backoffPolicy, consumer, listener, scheduler);
r.execute(bulkRequest);
}
RetryHandler內(nèi)部會(huì)執(zhí)行提交bulkRequest請(qǐng)求,同時(shí)也會(huì)監(jiān)聽(tīng)bulkRequest執(zhí)行異常狀態(tài),然后執(zhí)行任務(wù)重試邏輯,重試代碼如下:
private void retry(BulkRequest bulkRequestForRetry) {
assert backoff.hasNext();
TimeValue next = backoff.next();
logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry));
scheduledRequestFuture = scheduler.schedule(next, ThreadPool.Names.SAME, command);
}
RetryHandler將執(zhí)行失敗的bulk請(qǐng)求重新交給了內(nèi)部scheduler線(xiàn)程池去執(zhí)行,通過(guò)以上代碼了解,該線(xiàn)程池內(nèi)部只維護(hù)了一個(gè)固定線(xiàn)程,同時(shí)該線(xiàn)程池可能還會(huì)被另一個(gè)flush任務(wù)去占用執(zhí)行。所以如果重試邏輯正在執(zhí)行的時(shí)候,此時(shí)線(xiàn)程池內(nèi)的唯一線(xiàn)程正在執(zhí)行flush任務(wù),則會(huì)阻塞重試邏輯執(zhí)行,重試邏輯不能執(zhí)行完成,則不會(huì)釋放Semaphore,但是由于并發(fā)數(shù)量配置的是1,所以flush任務(wù)線(xiàn)程需要等待其他線(xiàn)程釋放一個(gè)Semaphore許可后才能繼續(xù)執(zhí)行。所以此處形成了循環(huán)等待,導(dǎo)致Semaphore和BulkProcessor對(duì)象鎖都無(wú)法釋放,從而使得所有的MQ消費(fèi)業(yè)務(wù)線(xiàn)程都阻塞在獲取BulkProcessor鎖之前。
同時(shí),在GitHub的ES客戶(hù)端源碼客戶(hù)端上也能搜索到類(lèi)似問(wèn)題,例如:
https://github.com/elastic/elasticsearch/issues/47599 ,所以更加印證了之前的猜想,就是因?yàn)閎ulk的不斷重試從而引發(fā)了BulkProcessor內(nèi)部的死鎖問(wèn)題。
四、如何解決問(wèn)題
既然前邊已經(jīng)了解到了問(wèn)題產(chǎn)生的原因,所以就有了如下幾種解決方案:
1.升級(jí)ES客戶(hù)端版本到7.6正式版,后續(xù)版本通過(guò)將異常重試任務(wù)線(xiàn)程池和flush任務(wù)線(xiàn)程池進(jìn)行了物理隔離,從而避免了線(xiàn)程池的競(jìng)爭(zhēng),但是需要考慮版本兼容性。
2.由于該死鎖問(wèn)題是由大量異常重試邏輯引起的,可以在不影響業(yè)務(wù)邏輯的情況取消重試邏輯,該方案可以不需要升級(jí)客戶(hù)端版本,但是需要評(píng)估業(yè)務(wù)影響,執(zhí)行失敗的請(qǐng)求可以通過(guò)其他其他方式進(jìn)行業(yè)務(wù)重試。
作者:京東零售 曹志飛
來(lái)源:京東云開(kāi)發(fā)者社區(qū)