必收藏技能!為Java多線程應(yīng)用程序優(yōu)化數(shù)據(jù)存儲庫
數(shù)據(jù)存儲庫通常是超高要求系統(tǒng)的瓶頸。在這些系統(tǒng)中,正在執(zhí)行的查詢數(shù)量非常大。DelayedBatchExecutor是一個用于減少所需查詢數(shù)量的組件,通過在Java多線程應(yīng)用程序中對所需查詢進行批處理。
1個參數(shù)的n個查詢Vs. n個參數(shù)的1個查詢
假設(shè)有一個對關(guān)系數(shù)據(jù)庫執(zhí)行查詢的Java應(yīng)用程序,以便在給定其唯一標識符(id)的情況下檢索Product實體(row)。
查詢?nèi)缦滤荆?/p>
SELECT * FROM PRODUCT WHERE ID =
現(xiàn)在,檢索n個Products,有如下兩種方法:
- 執(zhí)行1個參數(shù)的n個獨立查詢:
- SELECT * FROM PRODUCT WHERE ID =
- SELECT * FROM PRODUCT WHERE ID =
- ...
- SELECT * FROM PRODUCT WHERE ID =
- 使用IN運算符或ORs的串聯(lián),對n個參數(shù)執(zhí)行1個查詢以便同時檢索n個 Products
- -- Example using IN OPERATOR
- SELECT * FROM PRODUCT WHERE ID IN (, , ..., )
后者在網(wǎng)絡(luò)流量和數(shù)據(jù)庫服務(wù)器資源(CPU和磁盤)方面更為有效,因為:
- 往返數(shù)據(jù)庫的次數(shù)為1,而不是n。
- 數(shù)據(jù)庫引擎優(yōu)化了n個參數(shù)的數(shù)據(jù)遍歷過程,即每個表格可能只需要掃描1次,而不是n次。
這不僅適用于SELECT操作,而且適用于其他操作,例如 INSERTs,UPDATEs和DELETEs。實際上,JDBC API包括上述操作的批量處理操作。
同樣的情況也適用于NoSQL存儲庫,其中大多都明確提供BULK操作。
DelayedBatchExecutor
需要從數(shù)據(jù)庫中檢索數(shù)據(jù)的Java應(yīng)用程序,如REST微服務(wù)或異步消息處理器,通常以多線程應(yīng)用程序(*1)實現(xiàn),其中:
- 每個線程在其執(zhí)行的某個時刻執(zhí)行相同的查詢(每個查詢具有不同的參數(shù))。
- 并發(fā)線程數(shù)很高(每秒數(shù)十或數(shù)百)。
在這種場景下,數(shù)據(jù)庫很可能在較短的時間間隔內(nèi)多次執(zhí)行相同的查詢。
如前所述,如果將1個參數(shù)的n個查詢替換為具有n個參數(shù)的單個等效查詢,那么則應(yīng)用程序?qū)⑹褂幂^少的數(shù)據(jù)庫服務(wù)器和網(wǎng)絡(luò)資源。
好消息是它可以通過timewindows(時間窗口)的機制來實現(xiàn),如下所示:
第一個嘗試執(zhí)行查詢的線程會打開一個時間窗口,因此其參數(shù)被存儲在一個列表中,同時該線程被暫停。在時間窗口內(nèi)執(zhí)行相同查詢的其余線程會將其參數(shù)添加到列表中,并且也會被暫停。此時,數(shù)據(jù)庫上未執(zhí)行任何查詢。
時間窗口結(jié)束或列表已滿(先前已定義最大容量限制)后,將使用列表中存儲的所有參數(shù)執(zhí)行單個查詢。最后,一旦數(shù)據(jù)庫提供了該查詢的結(jié)果,每個線程將接收相應(yīng)的結(jié)果,同時所有線程將自動恢復(fù)。
筆者構(gòu)建了一個簡單而輕量級的應(yīng)用機制(DelayedBatchExecutor),很容易在新的或現(xiàn)有的應(yīng)用程序中使用。它基于Reactor庫,并且為參數(shù)列表使用超時的Flux緩沖發(fā)布器。
運用DelayedBatchExecutor的吞吐量和延遲分析
假設(shè)針對Products的REST微服務(wù)公開了一個端點,用于檢索數(shù)據(jù)庫中給定的 productId的Product數(shù)據(jù)。在沒有DelayedBatchExecutor的情況下,如果每秒對端點命中200次,則數(shù)據(jù)庫每秒執(zhí)行200個查詢。如果端點使用的DelayedBatchExecutor 配置了50毫秒的時間窗口且最大容量=10個參數(shù),數(shù)據(jù)庫每秒鐘將只執(zhí)行10個參數(shù)的20個查詢,代價是每執(zhí)行一個線程,最多在50毫秒內(nèi)增加延時(*2)。
換句話說,為了將延時增加50毫秒(* 2),數(shù)據(jù)庫每秒接收的查詢減少了10倍,然而保持了系統(tǒng)的整體吞吐量。還不錯!!
其他有趣的配置:
- 窗口時間= 100毫秒,最大容量= 20個參數(shù)→20個參數(shù)的10個查詢(查詢減少20倍)
- 窗口時間= 500毫秒,最大容量= 100個參數(shù)→2個查詢100個參數(shù)(查詢減少100倍)
執(zhí)行中的DelayedBatchExecutor
深入研究Product微服務(wù)示例。假設(shè)對于每個傳入的HTTP請求,微服務(wù)的控制器都要求檢索已有id的Product(Java Bean),因此將調(diào)用以下方法:
DAO組件(ProductDAO)的public Product getProductById(IntegerproductId) .
以下分別是有和沒有 DelayedBatchExecutor的DAO執(zhí)行。
沒有 DelayedBatchExecutor
- public classProductDAO {
- public Product getProductById(Integer id) {
- Product product= ...// execute the query SELECT * FROM PRODUCT WHERE ID=
- // using your favourite API: JDBC, JPA, Hibernate...
- return product;
- }
- ...
- }
有DelayedBatchExecutor
- // Singleton
- publicclass ProductDAO {
- DelayedBatchExecutor2 delayedBatchExecutorProductById =
- DelayedBatchExecutor.define(Duration.ofMillis(50), 10, this::retrieveProductsByIds);
- public Product getProductById(Integer id) {
- Product product = delayedBatchExecutorProductById.execute(id);
- return product;
- }
- private List retrieveProductsByIds(List idList) {
- List productList = ...// execute query:SELECT * FROM PRODUCT WHERE ID IN (idList.get(0), ..., idList.get(n));
- // using your favourite API: JDBC, JPA, Hibernate...
- // The positions of the elements of the list to return must match the ones in the parameters list.
- // For instance, the first Product of the list to be returned must be the one with
- // the Id in the first position of productIdsList and so on...
- // NOTE: null could be used as value, meaning that no Product exist for the given productId
- return productList;
- }
- ...
- }
首先,必須在DAO中創(chuàng)建一個DelayedBatchExecutor實例,在本例中為 delayedBatchExecutorProductById。需要以下三個參數(shù):
- 時間窗口(在此示例中為50毫秒)
- 參數(shù)列表的最大容量(在此示例中為10個參數(shù))
- 將使用參數(shù)列表調(diào)用的方法(詳細信息見后文)。在此示例中,方法為retrieveProductsByIds
其次,已經(jīng)重構(gòu)了DAO方法 publicProduct getProductById(Integer productId),以簡單調(diào)用delayedBatchExecutorProductById 實例的execute 方法。所有的“magic”都是由 DelayedBatchExecutor完成的。
之所以delayedBatchExecutorProductById是DelayedBatchExecutor2
如果execute方法需要接收兩個參數(shù)(例如,一個 Integer和一個String)并返回Product實例,則定義為 DelayedBatchExecutor3
最終,retrieveProductsByIds方法必須返回List
如果使用的是 DelayedBatchExecutor3
就是這樣。
一旦運行,執(zhí)行控制器邏輯的并發(fā)線程會在某時刻調(diào)用方法 getProductById(Integerid) ,并且此方法將返回對應(yīng)的Product。并發(fā)線程不知自己已經(jīng)被 DelayedBatchExecutor暫停并恢復(fù)了。
由數(shù)據(jù)存儲庫延伸的“題外話”
盡管本文與數(shù)據(jù)存儲庫有關(guān),但 DelayedBatchExecutor也可以用在其他地方,例如:對REST進行微服務(wù)請求。再說,用1個參數(shù)啟動n個GET請求要比使用n個參數(shù)啟動1個GET昂貴得多。
DelayedBatchExecutor的優(yōu)化
筆者創(chuàng)建了 DelayedBatchExecutor并使用了一段時間,有效地解決了個人項目中并發(fā)線程啟動的多個查詢的執(zhí)行問題。因此相信它對其他人可能也有用處,所以決定將其公開。
話雖如此,DelayedBatchExecutor改進和功能擴展的空間還很大。最有趣的是能夠根據(jù)執(zhí)行的特定條件動態(tài)更改DelayedBatchExecutor參數(shù)(窗口時間和最大容量)的功能,以便在利用帶有n個參數(shù)的查詢時很大程度地減少延時。