DDD死黨:內存Join——將復用和擴展用到極致
1. 為什么"內存Join"是個無法繞過的話題
首先,我們先簡單解釋下,什么是“內存Join”。
相信大家對關系數(shù)據(jù)庫的 join 語句肯定不陌生,其作用就是通過關聯(lián)關系從多個表中查詢數(shù)據(jù),關聯(lián)條件和數(shù)據(jù)聚合全部由 數(shù)據(jù)庫服務完成。
而 內存 Join,簡單來說就是把原本數(shù)據(jù)庫幫我們完成的數(shù)據(jù)聚合操作遷移到應用服務,在應用服務的內存中完成。
數(shù)據(jù)庫join非常簡單,但隨著系統(tǒng)的發(fā)展,內存join變得越來越重要,其核心驅動力有:
- 微服務。微服務要求“數(shù)據(jù)資產(chǎn)私有化”,也就是說每個服務的數(shù)據(jù)庫是私有資產(chǎn),不允許其他服務的直接訪問。如果需要訪問,只能通過服務所提供的接口完成
- 分庫分表的限制。當數(shù)據(jù)量超過 MySQL 單實例承載能力時,通常會通過“分庫分表”這一技術手段來解決,分庫分表后,數(shù)據(jù)被分散到多個分區(qū)中,導致 join 語句失效
- 性能瓶頸。在高并發(fā)情況下,join 存在一定的性能問題,高并發(fā)、高性能端場景不適合使用。很多公司規(guī)范中對 join 的使用做出了明確的限制
2. 課程先導
發(fā)現(xiàn)變化,封裝變化,管理變化,是開發(fā)人員的必備技能。
本篇文章從查詢訂單這個業(yè)務場景為入口,針對數(shù)據(jù)的內存join進行多次抽象和封裝,最終實現(xiàn)“內存Join聲明化”。
首先,先看下最終的效果,從直觀上感受下“抽象”帶來的效率提升。
通過抽象,可以達到如下效果:
- 左邊一坨“模板代碼” 等價于右邊一個注解
- 模型需要綁定 UserVO 數(shù)據(jù),只需使用 @JoinUserVOOnId 注解進行聲明配置即可
- @JoinInMemoryConfig 注解的 PARALLEL 配置將開啟多線程并行處理,以提供性能
神秘背后的本質便是“抽象”。讓我們以訂單查詢?yōu)榫€索,層層遞進,最終實現(xiàn)“能力聲明化”。
能力聲明化,是抽象的一種高級表現(xiàn),無需編寫代碼,通過配置的方式為特定組件進行能力加強。
在正式開始之前,可以先了解下整體的推演流程:
3.【案例分析】訂單查詢
假設,我們是訂單中心的一位研發(fā)伙伴,需要開發(fā) “我的訂單” 模塊,其核心接口包括:
- 我的訂單,查詢用戶的全部訂單,包括 訂單信息、用戶信息、郵寄地址信息、商品信息等;
- 訂單詳情,查詢某個訂單的詳細信息,包括 訂單信息、用戶信息、郵寄地址信息、商品信息、支付信息等;
根據(jù)需求定義 OrderService 接口如下:
public interface OrderService {
// 我的訂單
List<OrderListVO> getByUserId(Long userId);
// 訂單詳情
OrderDetailVO getDetailByOrderId(Long orderId);
}
// 為配合多種實現(xiàn)策略,使用抽象類進行統(tǒng)一
public abstract class OrderListVO {
public abstract OrderVO getOrder();
public abstract UserVO getUser();
public abstract AddressVO getAddress();
public abstract ProductVO getProduct();
}
// 為配合多種實現(xiàn)策略,使用抽象類進行統(tǒng)一
public abstract class OrderDetailVO {
public abstract OrderVO getOrder();
public abstract UserVO getUser();
public abstract AddressVO getAddress();
public abstract ProductVO getProduct();
public abstract List<PayInfoVO> getPayInfo();
}
3.1. Foreach + 單條抓取方案
這么簡單的需求,那不是信手拈來,很快就提供了一版
代碼具體如下:
@Service
public class OrderServiceCodingV1 implements OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private AddressRepository addressRepository;
@Autowired
private ProductRepository productRepository;
@Autowired
private UserRepository userRepository;
@Autowired
private PayInfoRepository payInfoRepository;
@Override
public List<OrderListVO> getByUserId(Long userId) {
// 獲取用戶訂單
List<Order> orders = this.orderRepository.getByUserId(userId);
// 依次進行數(shù)據(jù)綁定
return orders.stream()
.map(order -> convertToOrderListVO(order))
.collect(toList());
}
private OrderListVOCodingV1 convertToOrderListVO(Order order) {
OrderVO orderVO = OrderVO.apply(order);
OrderListVOCodingV1 orderDetailVO = new OrderListVOCodingV1(orderVO);
// 綁定地址信息
Address address = this.addressRepository.getById(order.getAddressId());
AddressVO addressVO = AddressVO.apply(address);
orderDetailVO.setAddress(addressVO);
// 綁定用戶信息
User user = this.userRepository.getById(order.getUserId());
UserVO userVO = UserVO.apply(user);
orderDetailVO.setUser(userVO);
// 綁定商品信息
Product product = this.productRepository.getById(order.getProductId());
ProductVO productVO = ProductVO.apply(product);
orderDetailVO.setProduct(productVO);
return orderDetailVO;
}
@Override
public OrderDetailVO getDetailByOrderId(Long orderId) {
// 暫時忽略
Order order = this.orderRepository.getById(orderId);
return convertToOrderDetailVO(order);
}
private OrderDetailVO convertToOrderDetailVO(Order order) {
OrderDetailVOCodingV1 orderDetail = new OrderDetailVOCodingV1(OrderVO.apply(order));
// 獲取地址并進行綁定
Address address = this.addressRepository.getById(order.getAddressId());
AddressVO addressVO = AddressVO.apply(address);
orderDetail.setAddress(addressVO);
// 獲取用戶并進行綁定
User user = this.userRepository.getById(order.getUserId());
UserVO userVO = UserVO.apply(user);
orderDetail.setUser(userVO);
// 獲取商品并進行綁定
Product product = this.productRepository.getById(order.getProductId());
ProductVO productVO = ProductVO.apply(product);
orderDetail.setProduct(productVO);
// 獲取支付信息并進行綁定
List<PayInfo> payInfos = this.payInfoRepository.getByOrderId(order.getId());
List<PayInfoVO> payInfoVOList = payInfos.stream()
.map(PayInfoVO::apply)
.collect(toList());
orderDetail.setPayInfo(payInfoVOList);
return orderDetail;
}
}
如果真的這樣實現(xiàn),那你離“被跑路”不遠了。
為什么會這么說呢?因為 ==“我的訂單”這個接口存在嚴重的性能問題!==
“我的訂單”接口具體實現(xiàn)如下:
- 查詢 order 信息
- 依次對其進行數(shù)據(jù)抓取
- 完成數(shù)據(jù)綁定并返回結果
單個用戶請求,數(shù)據(jù)庫訪問總次數(shù) = 1(獲取用戶訂單)+ N(訂單數(shù)量) * 3(需要抓取的關聯(lián)數(shù)據(jù))
其中,N(訂單數(shù)量) * 3(關聯(lián)數(shù)據(jù)數(shù)量) 存在性能隱患,存在嚴重的==讀放大效應==。一旦遇到忠實用戶,存在成百上千訂單,除了超時別無辦法。
“訂單詳情”接口實現(xiàn),目前問題不大,最大的問題為:“訂單詳情”與“我的訂單”兩個接口存在大量的重復邏輯!
3.2. 批量查詢 + 內存Join
首先,我們先來解決 “我的訂單”接口的性能問題。從之前的分析可知,性能低下的根本原因在于 “讀放大效應”,數(shù)據(jù)庫請求次數(shù)與用戶訂單數(shù)成正比,為了更好的保障性能,最好將數(shù)據(jù)庫操作控制在一個常量。
整體思路為:先批量獲取要綁定的數(shù)據(jù),然后遍歷每一個訂單,在內存中完成數(shù)據(jù)綁定。
實現(xiàn)代碼如下:
@Service
public class OrderServiceCodingV2 implements OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private AddressRepository addressRepository;
@Autowired
private ProductRepository productRepository;
@Autowired
private UserRepository userRepository;
@Autowired
private PayInfoRepository payInfoRepository;
@Override
public List<OrderListVO> getByUserId(Long userId) {
List<Order> orders = this.orderRepository.getByUserId(userId);
List<OrderListVOCodingV2> orderDetailVOS = orders.stream()
.map(order -> new OrderListVOCodingV2(OrderVO.apply(order)))
.collect(toList());
// 批量獲取用戶,并依次進行綁定
List<Long> userIds = orders.stream()
.map(Order::getUserId)
.collect(toList());
List<User> users = this.userRepository.getByIds(userIds);
Map<Long, User> userMap = users.stream()
.collect(toMap(User::getId, Function.identity(), (a, b) -> a));
for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){
User user = userMap.get(orderDetailVO.getOrder().getUserId());
UserVO userVO = UserVO.apply(user);
orderDetailVO.setUser(userVO);
}
// 批量獲取地址,并依次進行綁定
List<Long> addressIds = orders.stream()
.map(Order::getAddressId)
.collect(toList());
List<Address> addresses = this.addressRepository.getByIds(addressIds);
Map<Long, Address> addressMap = addresses.stream()
.collect(toMap(Address::getId, Function.identity(), (a, b) -> a));
for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){
Address address = addressMap.get(orderDetailVO.getOrder().getAddressId());
AddressVO addressVO = AddressVO.apply(address);
orderDetailVO.setAddress(addressVO);
}
// 批量獲取商品,并依次進行綁定
List<Long> productIds = orders.stream()
.map(Order::getProductId)
.collect(toList());
List<Product> products = this.productRepository.getByIds(productIds);
Map<Long, Product> productMap = products.stream()
.collect(toMap(Product::getId, Function.identity(), (a, b) -> a));
for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){
Product product = productMap.get(orderDetailVO.getOrder().getProductId());
ProductVO productVO = ProductVO.apply(product);
orderDetailVO.setProduct(productVO);
}
return orderDetailVOS.stream()
.collect(toList());
}
@Override
public OrderDetailVO getDetailByOrderId(Long orderId) {
// 暫時忽略
Order order = this.orderRepository.getById(orderId);
return convertToOrderDetailVO(order);
}
private OrderDetailVO convertToOrderDetailVO(Order order) {
// 暫時忽略
return orderDetail;
}
}
調整之后,對于“我的訂單”接口,單個用戶請求==數(shù)據(jù)庫的訪問次數(shù)變成了常量(4)==。
如果你是這么實現(xiàn)的,那恭喜你,你已步入合格程序員行列。
3.3. 并行批量查詢 + 內存Join
批量查詢+內存Join 方案能滿足大部分場景,如果要抓取的數(shù)據(jù)太多,也就是數(shù)據(jù)庫訪問這個==常量變大==時,性能也會越來越差。
原因很簡單,由于串行執(zhí)行,整體耗時 = 獲取訂單耗時 + sum(抓取數(shù)據(jù)耗時)
聰明的同學早就躍躍欲試,這個我會:多線程并行執(zhí)行唄。
是的,基于 Future 的實現(xiàn)如下(還有很多版本,比如 CountDownLatch)
整體設計如下:
示例代碼如下:
@Service
public class OrderServiceCodingV3 implements OrderService {
private ExecutorService executorService;
@Autowired
private OrderRepository orderRepository;
@Autowired
private AddressRepository addressRepository;
@Autowired
private ProductRepository productRepository;
@Autowired
private UserRepository userRepository;
@Autowired
private PayInfoRepository payInfoRepository;
@PostConstruct
public void init(){
// 初始化線程池(不要使用Executors,這里只是演示,需要對資源進行評估)
this.executorService = Executors.newFixedThreadPool(20);
}
@SneakyThrows
@Override
public List<OrderListVO> getByUserId(Long userId) {
List<Order> orders = this.orderRepository.getByUserId(userId);
List<OrderListVOCodingV2> orderDetailVOS = orders.stream()
.map(order -> new OrderListVOCodingV2(OrderVO.apply(order)))
.collect(toList());
List<Callable<Void>> callables = Lists.newArrayListWithCapacity(3);
// 創(chuàng)建異步任務
callables.add(() -> {
// 批量獲取用戶,并依次進行綁定
List<Long> userIds = orders.stream()
.map(Order::getUserId)
.collect(toList());
List<User> users = this.userRepository.getByIds(userIds);
Map<Long, User> userMap = users.stream()
.collect(toMap(User::getId, Function.identity(), (a, b) -> a));
for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){
User user = userMap.get(orderDetailVO.getOrder().getUserId());
UserVO userVO = UserVO.apply(user);
orderDetailVO.setUser(userVO);
}
return null;
});
// 創(chuàng)建異步任務
callables.add(() ->{
// 批量獲取地址,并依次進行綁定
List<Long> addressIds = orders.stream()
.map(Order::getAddressId)
.collect(toList());
List<Address> addresses = this.addressRepository.getByIds(addressIds);
Map<Long, Address> addressMap = addresses.stream()
.collect(toMap(Address::getId, Function.identity(), (a, b) -> a));
for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){
Address address = addressMap.get(orderDetailVO.getOrder().getAddressId());
AddressVO addressVO = AddressVO.apply(address);
orderDetailVO.setAddress(addressVO);
}
return null;
});
// 創(chuàng)建異步任務
callables.add(() -> {
// 批量獲取商品,并依次進行綁定
List<Long> productIds = orders.stream()
.map(Order::getProductId)
.collect(toList());
List<Product> products = this.productRepository.getByIds(productIds);
Map<Long, Product> productMap = products.stream()
.collect(toMap(Product::getId, Function.identity(), (a, b) -> a));
for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){
Product product = productMap.get(orderDetailVO.getOrder().getProductId());
ProductVO productVO = ProductVO.apply(product);
orderDetailVO.setProduct(productVO);
}
return null;
});
// 執(zhí)行異步任務
this.executorService.invokeAll(callables);
return orderDetailVOS.stream()
.collect(toList());
}
@Override
public OrderDetailVO getDetailByOrderId(Long orderId) {
// 暫時忽略
Order order = this.orderRepository.getById(orderId);
return convertToOrderDetailVO(order);
}
private OrderDetailVO convertToOrderDetailVO(Order order) {
// 暫時忽略
}
}
多線程并發(fā)執(zhí)行,整體耗時 = 獲取訂單耗時 + max(抓取數(shù)據(jù)耗時)
如果你能夠這樣實現(xiàn)的,那恭喜你,你已步入高級程序員行列。
然后呢,到此為止了?NO,接下來才是高潮?。?!
讓我們打開認知,開啟“抽象+封裝”之旅。
4. Fetcher封裝
仔細研究上述代碼,尋找里面的==“變與不變”==,你會發(fā)現(xiàn):
- 由于“我的訂單” 和 “訂單詳情” 返回的是不同的 VO,導致在實現(xiàn)綁定操作時寫了兩套基本一樣的邏輯;
- Address、User、Product 的綁定邏輯骨架是一樣的,一些細節(jié)操作存在差異;
找到邏輯中的變化點,接下來便是有針對性的進行封裝。
4.1. 消除方法中的重復代碼
對于 “我的訂單” 和 “訂單詳情” 返回==不同的 VO==,該怎么處理呢?
非常簡單,思路如下:
- 【不變】抽象出“行為接口” Fetcher,統(tǒng)一操作行為
- 【變化】基于多態(tài),不同的 VO 派生自相同的接口,但可以自己定義實現(xiàn),從而實現(xiàn)個性化變化
整體設計如下:
簡單示例如下:
// 以 UserVO 為例,ProductVO、AddressVO,PayInfoVO 基本一致,不在贅述
public interface UserVOFetcherV1 {
Long getUserId();
void setUser(UserVO user);
}
// OrderDetailVO 實現(xiàn)對應的接口,為了突出重點暫時忽略具體實現(xiàn)
public class OrderDetailVOFetcherV1 extends OrderDetailVO
implements AddressVOFetcherV1,
ProductVOFetcherV1,
UserVOFetcherV1,
PayInfoVOFetcherV1{
}
// OrderListVO 實現(xiàn)對應接口,為了突出重點暫時忽略具體實現(xiàn)
public class OrderListVOFetcherV1 extends OrderListVO
implements AddressVOFetcherV1,
ProductVOFetcherV1,
UserVOFetcherV1 {
}
有了統(tǒng)一的操作接口,接下來便是抽取具體的綁定邏輯,以 UserVOFetcherExecutor 為例:
@Component
public class UserVOFetcherExecutorV1 {
@Autowired
private UserRepository userRepository;
public void fetch(List<? extends UserVOFetcherV1> fetchers){
List<Long> ids = fetchers.stream()
.map(UserVOFetcherV1::getUserId)
.distinct()
.collect(Collectors.toList());
List<User> users = userRepository.getByIds(ids);
Map<Long, User> userMap = users.stream()
.collect(toMap(user -> user.getId(), Function.identity()));
fetchers.forEach(fetcher -> {
Long userId = fetcher.getUserId();
User user = userMap.get(userId);
if (user != null){
UserVO userVO = UserVO.apply(user);
fetcher.setUser(userVO);
}
});
}
}
實現(xiàn)邏輯沒有變化,最重要的變化在于“入?yún)㈩愋汀保辉谑蔷唧w的 VO,而是抽象的 UserVOFetcher 接口。
AddressVOFetcherExecutor、ProductVOFetcherExecutor、PayInfoVOFetcherExecutor 與 UserVOFetcherExecutorV1 邏輯基本一致,篇幅問題不在贅述。
這樣一個小小的調整,會給使用方帶來什么便利?一起看下使用方的變化:
@Service
public class OrderServiceFetcherV1 implements OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private AddressVOFetcherExecutorV1 addressVOFetcherExecutorV1;
@Autowired
private ProductVOFetcherExecutorV1 productVOFetcherExecutorV1;
@Autowired
private UserVOFetcherExecutorV1 userVOFetcherExecutorV1;
@Autowired
private PayInfoVOFetcherExecutorV1 payInfoVOFetcherExecutorV1;
@Override
public List<OrderListVO> getByUserId(Long userId) {
List<Order> orders = this.orderRepository.getByUserId(userId);
List<OrderListVOFetcherV1> orderDetailVOS = orders.stream()
.map(order -> new OrderListVOFetcherV1(OrderVO.apply(order)))
.collect(toList());
// 直接使用 FetcherExecutor 完成數(shù)據(jù)綁定
this.addressVOFetcherExecutorV1.fetch(orderDetailVOS);
this.productVOFetcherExecutorV1.fetch(orderDetailVOS);
this.userVOFetcherExecutorV1.fetch(orderDetailVOS);
return orderDetailVOS.stream()
.collect(toList());
}
@Override
public OrderDetailVO getDetailByOrderId(Long orderId) {
Order order = this.orderRepository.getById(orderId);
OrderDetailVOFetcherV1 orderDetail = new OrderDetailVOFetcherV1(OrderVO.apply(order));
List<OrderDetailVOFetcherV1> orderDetailVOS = Arrays.asList(orderDetail);
// 直接使用 FetcherExecutor 完成數(shù)據(jù)綁定
this.addressVOFetcherExecutorV1.fetch(orderDetailVOS);
this.productVOFetcherExecutorV1.fetch(orderDetailVOS);
this.userVOFetcherExecutorV1.fetch(orderDetailVOS);
this.payInfoVOFetcherExecutorV1.fetch(orderDetailVOS);
return orderDetail;
}
}
兩個方法直接使用 FetcherExecutor 完成數(shù)據(jù)抓取和綁定,實現(xiàn)了==綁定邏輯的復用==。
如果再有 VO 需要進行數(shù)據(jù)綁定,只需:
- VO 實現(xiàn) XXXFetcher 接口,實現(xiàn)對應方法,提供關聯(lián)數(shù)據(jù)并完成數(shù)據(jù)綁定
- 使用 XXXFetcherExecutor 完成數(shù)據(jù)綁定
至此,面對新業(yè)務基本上與“綁定邏輯”說再見了。
4.2. 重構綁定邏輯
接下來讓我們一起聚焦于綁定邏輯,先對比下上述的UserVOFetcherExecutor 與下面的 AddressVOFetcherExecutor, 找到里面的變化與不變:
@Component
public class AddressVOFetcherExecutorV1 {
@Autowired
private AddressRepository addressRepository;
public void fetch(List<? extends AddressVOFetcherV1> fetchers){
// 獲取關聯(lián)信息
List<Long> ids = fetchers.stream()
.map(AddressVOFetcherV1::getAddressId)
.distinct()
.collect(Collectors.toList());
// 查詢關聯(lián)數(shù)據(jù)
List<Address> addresses = addressRepository.getByIds(ids);
// 轉為為 Map
Map<Long, Address> addressMap = addresses.stream()
.collect(toMap(address -> address.getId(), Function.identity()));
// 依次進行數(shù)據(jù)綁定
fetchers.forEach(fetcher -> {
Long addressId = fetcher.getAddressId();
Address address = addressMap.get(addressId);
if (address != null){
// 轉換為 VO
AddressVO addressVO = AddressVO.apply(address);
// 將數(shù)據(jù)寫回到結果
fetcher.setAddress(addressVO);
}
});
}
}
仔細觀察,會發(fā)現(xiàn):
【不變】邏輯骨架基本一致,基本是由:
- 獲取關聯(lián)信息
- 查詢關聯(lián)數(shù)據(jù)
- 將其轉換為 Map
- 講數(shù)據(jù)轉化為 VO
- 將 VO 綁定到結果對象
【變化】實現(xiàn)細節(jié)存在差異;
- 從什么接口中獲取關聯(lián)信息
- 如何查詢關聯(lián)數(shù)據(jù)
- 轉換為 Map 的鍵是什么
- 如何將數(shù)據(jù)轉換為 VO
- 如何完成數(shù)據(jù)的綁定
熟悉設計模式的伙伴是否眼前一亮?停頓一下好好回想一下,哪種模式就是用來處理這種問題的?
答案便是:模板方法模式
整體思想為:
- 將不變的邏輯骨架封裝在父類方法
- 將變化的實現(xiàn)細節(jié)放在子類中進行擴展
整體設計如下:
抽取公共父類如下:
abstract class BaseItemFetcherExecutor<FETCHER extends ItemFetcher, DATA, RESULT>
implements ItemFetcherExecutor<FETCHER>{
@Override
public void fetch(List<FETCHER> fetchers) {
// 獲取關聯(lián)信息
List<Long> ids = fetchers.stream()
.map(this::getFetchId)
.distinct()
.collect(Collectors.toList());
// 查詢關聯(lián)數(shù)據(jù)
List<DATA> datas = loadData(ids);
// 轉為為 Map
Map<Long, List<DATA>> dataMap = datas.stream()
.collect(groupingBy(this::getDataId));
// 依次進行數(shù)據(jù)綁定
fetchers.forEach(fetcher -> {
Long id = getFetchId(fetcher);
List<DATA> ds = dataMap.get(id);
if (ds != null){
// 轉換為 VO
List<RESULT> result = ds.stream()
.map( data -> convertToVo(data))
.collect(Collectors.toList());
// 將數(shù)據(jù)寫回到結果
setResult(fetcher, result);
}
});
}
protected abstract Long getFetchId(FETCHER fetcher);
protected abstract List<DATA> loadData(List<Long> ids);
protected abstract Long getDataId(DATA data);
protected abstract RESULT convertToVo(DATA data);
protected abstract void setResult(FETCHER fetcher, List<RESULT> result);
}
基于 BaseItemFetcherExecutor 的 UserFetcherExecutor 如下:
@Component
public class UserVOFetcherExecutorV2
extends BaseItemFetcherExecutor<UserVOFetcherV2, User, UserVO>{
@Autowired
private UserRepository userRepository;
@Override
protected Long getFetchId(UserVOFetcherV2 fetcher) {
return fetcher.getUserId();
}
@Override
protected List<User> loadData(List<Long> ids) {
return this.userRepository.getByIds(ids);
}
@Override
protected Long getDataId(User user) {
return user.getId();
}
@Override
protected UserVO convertToVo(User user) {
return UserVO.apply(user);
}
@Override
protected void setResult(UserVOFetcherV2 fetcher, List<UserVO> userVO) {
if (CollectionUtils.isNotEmpty(userVO)) {
fetcher.setUser(userVO.get(0));
}
}
@Override
public boolean support(Class<UserVOFetcherV2> cls) {
// 暫時忽略,稍后會細講
return UserVOFetcherV2.class.isAssignableFrom(cls);
}
}
UserVOFetcherExecutor究竟發(fā)生什么變化呢?好像變得更復雜了:
- 從代碼量角度(行數(shù))變得更多了,因為類函數(shù)明顯變大
- 從復雜度角度(邏輯)變得更加簡單,每個方法基本都是一兩句語句
那我們究竟得到了什么好處?可以花幾分鐘好好思考一下!??!
在說結果之前,讓我們看下另一個變化點。回想下 FetcherExecutor 的執(zhí)行點,如下:
@Override
public List<OrderListVO> getByUserId(Long userId) {
List<Order> orders = this.orderRepository.getByUserId(userId);
List<OrderListVOFetcherV1> orderDetailVOS = orders.stream()
.map(order -> new OrderListVOFetcherV1(OrderVO.apply(order)))
.collect(toList());
// 手工調用,OrderListVO 實現(xiàn)新接口,需要增加新的依賴和調用
this.addressVOFetcherExecutorV1.fetch(orderDetailVOS);
this.productVOFetcherExecutorV1.fetch(orderDetailVOS);
this.userVOFetcherExecutorV1.fetch(orderDetailVOS);
return orderDetailVOS.stream()
.collect(toList());
}
@Override
public OrderDetailVO getDetailByOrderId(Long orderId) {
Order order = this.orderRepository.getById(orderId);
OrderDetailVOFetcherV1 orderDetail = new OrderDetailVOFetcherV1(OrderVO.apply(order));
List<OrderDetailVOFetcherV1> orderDetailVOS = Arrays.asList(orderDetail);
// 手工調用,OrderDetailVO 實現(xiàn)新接口,需要增加新的依賴和調用
this.addressVOFetcherExecutorV1.fetch(orderDetailVOS);
this.productVOFetcherExecutorV1.fetch(orderDetailVOS);
this.userVOFetcherExecutorV1.fetch(orderDetailVOS);
this.payInfoVOFetcherExecutorV1.fetch(orderDetailVOS);
return orderDetail;
}
其實,需要調用哪些 FetcherExecutor 完全可以由 VO 實現(xiàn)的接口來確定。也就是說,需要綁定新數(shù)據(jù),只需 VO 繼承并實現(xiàn)新的 Fetcher 接口即可。
對此,我們需要:
- 一個統(tǒng)一的訪問入口,對外提供訪問
- 每個 FetcherExecutor 能夠識別 VO 并執(zhí)行綁定邏輯
哪個設計模式是用來解決這個問題?花幾分鐘好好思考一下!
答案是:責任鏈模型
標準的責任鏈模式用起來比較繁瑣,在 Spring 實現(xiàn)中大量使用他的一種變現(xiàn),及提供一個驗證接口,由組件自身完成判斷,用于決定是否執(zhí)行自身邏輯。
整體設計如下:
首先,為了統(tǒng)一 FetcherExecutor 的行為,抽取通用接口:
public interface ItemFetcherExecutor<F extends ItemFetcher> {
/**
* 該組件是否能處理 cls 類型
* @param cls
* @return
*/
boolean support(Class<F> cls);
/**
* 執(zhí)行真正的數(shù)據(jù)綁定
* @param fetchers
*/
void fetch(List<F> fetchers);
}
具體的實現(xiàn),可以見 UserVOFetcherExecutorV2 的 support 方法:
@Override
public boolean support(Class<UserVOFetcherV2> cls) {
return UserVOFetcherV2.class.isAssignableFrom(cls);
}
實現(xiàn)邏輯非常簡單,只是判斷 cls 是否實現(xiàn)了 UserVOFetcherV2 接口。
有了 FetcherExecutor 組件后,接下來就是為其提供統(tǒng)一的訪問入口:
@Service
public class FetcherService {
@Autowired
private List<ItemFetcherExecutor> itemFetcherExecutors;
public <F extends ItemFetcher> void fetch(Class<F> cls, List<F> fetchers){
if (CollectionUtils.isNotEmpty(fetchers)){
this.itemFetcherExecutors.stream()
// 是否能處理該類型
.filter(itemFetcherExecutor -> itemFetcherExecutor.support(cls))
// 執(zhí)行真正的綁定
.forEach(itemFetcherExecutor -> itemFetcherExecutor.fetch(fetchers));
}
}
}
邏輯即為簡單,依次遍歷 FetcherExecutor,根據(jù) support 執(zhí)行結果,執(zhí)行 fetch 邏輯。
【小常識】Spring 可以將容器中的全部實現(xiàn)直接注入到 List<Bean>。在上述代碼中,將會把所有的 ItemFetcherExecutor 實現(xiàn)注入到 itemFetcherExecutors 屬性。因此,在新增 FetcherExecutor 時,只需將其聲明為 Spring Bean,無需調整代碼邏輯。
OK,我們有了 FetcherService 提供統(tǒng)一的數(shù)據(jù)綁定能力,原來 OrderServiceFetcher 中 fetch 操作的變化點轉移到 FetcherService,自身變得非常穩(wěn)定。具體如下:
@Service
public class OrderServiceFetcherV2 implements OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private FetcherService fetcherService;
@Override
public List<OrderListVO> getByUserId(Long userId) {
List<Order> orders = this.orderRepository.getByUserId(userId);
List<OrderListVOFetcherV2> orderDetailVOS = orders.stream()
.map(order -> new OrderListVOFetcherV2(OrderVO.apply(order)))
.collect(toList());
// VO 數(shù)據(jù)綁定發(fā)生變化,只需調整 VO 實現(xiàn)接口,此處無需變化
fetcherService.fetch(OrderListVOFetcherV2.class, orderDetailVOS);
return orderDetailVOS.stream()
.collect(toList());
}
@Override
public OrderDetailVO getDetailByOrderId(Long orderId) {
Order order = this.orderRepository.getById(orderId);
OrderDetailVOFetcherV2 orderDetail = new OrderDetailVOFetcherV2(OrderVO.apply(order));
// VO 數(shù)據(jù)綁定發(fā)生變化,只需調整 VO 實現(xiàn)接口,此處無需變化
fetcherService.fetch(OrderDetailVOFetcherV2.class, Arrays.asList(orderDetail));
return orderDetail;
}
}
終于,我們將變化收斂到 VO 內,VO 需要綁定新的數(shù)據(jù),只需實現(xiàn)對應接口即可。
4.3. 并發(fā)綁定
經(jīng)過重構,代碼結構變得非常清晰,如果想通過多線程并發(fā)方式提供性能,需要調整哪些組件呢?好好想想?。?!
只需對FetcherService進行調整,讓我們來一個并發(fā)版本,具體如下:
@Service
public class ConcurrentFetcherService {
private ExecutorService executorService;
@Autowired
private List<ItemFetcherExecutor> itemFetcherExecutors;
@PostConstruct
public void init(){
this.executorService = Executors.newFixedThreadPool(20);
}
@SneakyThrows
public <F extends ItemFetcher> void fetch(Class<F> cls, List<F> fetchers){
if (CollectionUtils.isNotEmpty(fetchers)){
// 創(chuàng)建異步執(zhí)行任務
List<Callable<Void>> callables = this.itemFetcherExecutors.stream()
.filter(itemFetcherExecutor -> itemFetcherExecutor.support(cls))
.map(itemFetcherExecutor -> (Callable<Void>) () -> {
itemFetcherExecutor.fetch(fetchers);
return null;
}).collect(Collectors.toList());
// 線程池中并行執(zhí)行
this.executorService.invokeAll(callables);
}
}
}
OrderServiceFetcherV3 只需使用 ConcurrentFetcherService 替代 原來的 FetcherService 并擁有了并發(fā)能力。
5. 注解方案
5.1. 復雜配置 @JoinInMemory 來幫忙
縱觀整個 Fetcher 封裝,雖然結構清晰,但細節(jié)過于繁瑣,特別是:
- 待抓取數(shù)據(jù)需要抽取 Fetcher 接口
- 需要提供自己的 FetcherExecutor 實現(xiàn)
- VO 需要實現(xiàn)多個 Fetcher 接口
這些不便將成為落地最大的阻礙,那有沒有辦法進行進一步簡化?
這需要思考下這些設計背后的深層需求:
Fetcher接口目的包括
- 提供綁定信息
- 設置綁定結果
- 被 FetcherExecutor 識別并進行處理
FetcherExecutor設計的目標包括:
- 識別待處理的 Fetcher
- 定制個性化流程
所有這些需求是否可用 ==注解== 的方式實現(xiàn)?
- 在 VO 屬性上增加注解,說明綁定結果寫回到該屬性上
- 注解配置來源屬性,提供綁定信息
- 注解配置流程屬性,完成 FetcherExecutor 的個性化定制
- 每個注解背后是一個 FetcherExecutor 實現(xiàn),完成 FetcherExecutor 與 “Fetcher” 綁定
根據(jù)上述分析,注解可完成全部任務,新建注解如下:
@Target({ElementType.FIELD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface JoinInMemory {
/**
* 從 sourceData 中提取 key
* @return
*/
String keyFromSourceData();
/**
* 從 joinData 中提取 key
* @return
*/
String keyFromJoinData();
/**
* 批量數(shù)據(jù)抓取
* @return
*/
String loader();
/**
* 結果轉換器
* @return
*/
String joinDataConverter() default "";
/**
* 運行級別,同一級別的 join 可 并行執(zhí)行
* @return
*/
int runLevel() default 10;
}
乍一看,需要配置的信息真多,其實大多數(shù)配置全部與 FetcherExecutor 實現(xiàn)相關。
abstract class AbstractJoinItemExecutor<SOURCE_DATA, JOIN_KEY, JOIN_DATA, JOIN_RESULT> implements JoinItemExecutor<SOURCE_DATA> {
/**
* 從原始數(shù)據(jù)中生成 JoinKey
* @param data
* @return
*/
protected abstract JOIN_KEY createJoinKeyFromSourceData(SOURCE_DATA data);
/**
* 根據(jù) JoinKey 批量獲取 JoinData
* @param joinKeys
* @return
*/
protected abstract List<JOIN_DATA> getJoinDataByJoinKeys(List<JOIN_KEY> joinKeys);
/**
* 從 JoinData 中獲取 JoinKey
* @param joinData
* @return
*/
protected abstract JOIN_KEY createJoinKeyFromJoinData(JOIN_DATA joinData);
/**
* 將 JoinData 轉換為 JoinResult
* @param joinData
* @return
*/
protected abstract JOIN_RESULT convertToResult(JOIN_DATA joinData);
/**
* 將 JoinResult 寫回至 SourceData
* @param data
* @param JoinResults
*/
protected abstract void onFound(SOURCE_DATA data, List<JOIN_RESULT> JoinResults);
/**
* 未找到對應的 JoinData
* @param data
* @param joinKey
*/
protected abstract void onNotFound(SOURCE_DATA data, JOIN_KEY joinKey);
@Override
public void execute(List<SOURCE_DATA> sourceDatas) {
// 從源數(shù)據(jù)中提取 JoinKey
List<JOIN_KEY> joinKeys = sourceDatas.stream()
.filter(Objects::nonNull)
.map(this::createJoinKeyFromSourceData)
.filter(Objects::nonNull)
.distinct()
.collect(toList());
log.debug("get join key {} from source data {}", joinKeys, sourceDatas);
// 根據(jù) JoinKey 獲取 JoinData
List<JOIN_DATA> allJoinDatas = getJoinDataByJoinKeys(joinKeys);
log.debug("get join data {} by join key {}", allJoinDatas, joinKeys);
// 將 JoinData 以 Map 形式進行組織
Map<JOIN_KEY, List<JOIN_DATA>> joinDataMap = allJoinDatas.stream()
.filter(Objects::nonNull)
.collect(groupingBy(this::createJoinKeyFromJoinData));
log.debug("group by join key, result is {}", joinDataMap);
// 處理每一條 SourceData
for (SOURCE_DATA data : sourceDatas){
// 從 SourceData 中 獲取 JoinKey
JOIN_KEY joinKey = createJoinKeyFromSourceData(data);
if (joinKey == null){
log.warn("join key from join data {} is null", data);
continue;
}
// 根據(jù) JoinKey 獲取 JoinData
List<JOIN_DATA> joinDatasByKey = joinDataMap.get(joinKey);
if (CollectionUtils.isNotEmpty(joinDatasByKey)){
// 獲取到 JoinData, 轉換為 JoinResult,進行數(shù)據(jù)寫回
List<JOIN_RESULT> joinResults = joinDatasByKey.stream()
.filter(Objects::nonNull)
.map(joinData -> convertToResult(joinData))
.collect(toList());
log.debug("success to convert join data {} to join result {}", joinDatasByKey, joinResults);
onFound(data, joinResults);
log.debug("success to write join result {} to source data {}", joinResults, data);
}else {
log.warn("join data lost by join key {} for source data {}", joinKey, data);
// 為獲取到 JoinData,進行 notFound 回調
onNotFound(data, joinKey);
}
}
}
}
JoinInMemory 注解屬性和AbstractJoinItemExecutor基本一致,在此就不做贅述,我們先看下具體的使用方式:
@Data
public class OrderDetailVOAnnV1 extends OrderDetailVO {
private final OrderVO order;
@JoinInMemory(keyFromSourceData = "#{order.userId}",
keyFromJoinData = "#{id}",
loader = "#{@userRepository.getByIds(#root)}",
joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}"
)
private UserVO user;
// 其他暫時忽略
}
@Data
public class OrderListVOAnnV1 extends OrderListVO {
private final OrderVO order;
@JoinInMemory(keyFromSourceData = "#{order.userId}",
keyFromJoinData = "#{id}",
loader = "#{@userRepository.getByIds(#root)}",
joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}"
)
private UserVO user;
// 其他暫時忽略
}
我們以 UserVO user 屬性為例
@JoinInMemory 注解中大量使用 SpEL,不熟悉的伙伴可以自行網(wǎng)上進行檢索。
其他部分不變,定義 OrderService 如下:
@Service
public class OrderServiceAnnV1 implements OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private JoinService joinService;
@Override
public List<OrderListVO> getByUserId(Long userId) {
List<Order> orders = this.orderRepository.getByUserId(userId);
List<OrderListVOAnnV1> orderDetailVOS = orders.stream()
.map(order -> new OrderListVOAnnV1(OrderVO.apply(order)))
.collect(toList());
this.joinService.joinInMemory(OrderListVOAnnV1.class, orderDetailVOS);
return orderDetailVOS.stream()
.collect(toList());
}
@Override
public OrderDetailVO getDetailByOrderId(Long orderId) {
Order order = this.orderRepository.getById(orderId);
OrderDetailVOAnnV1 orderDetail = new OrderDetailVOAnnV1(OrderVO.apply(order));
this.joinService.joinInMemory(OrderDetailVOAnnV1.class, Arrays.asList(orderDetail));
return orderDetail;
}
}
相對于 Fetcher 抽象,我們將 Fetcher、FetcherExecutor 全部配置化,并通過 注解的方式進行呈現(xiàn),相對于 Coding 方案,注解方案更加靈活,工作量也更小。
5.2. 復雜配置 @Alias 來幫忙
相對于 Fetcher 封裝,一個 @JoinInMemory 成功干掉了兩個組件,但觀其自身配置起來還是非常繁瑣。比如,在訂單查詢這個場景,在 OrderListVO 和 OrderDetailVO 中都需要對 UserVO 進行數(shù)據(jù)綁定,觀察兩個注解,我們會發(fā)現(xiàn)很多重復配置:
//OrderListVO
@JoinInMemory(keyFromSourceData = "#{order.userId}",
keyFromJoinData = "#{id}",
loader = "#{@userRepository.getByIds(#root)}",
joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}"
)
private UserVO user;
// OrderDetailVO
@JoinInMemory(keyFromSourceData = "#{order.userId}",
keyFromJoinData = "#{id}",
loader = "#{@userRepository.getByIds(#root)}",
joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}"
)
private UserVO user;
兩個配置完全一樣,細品之后會發(fā)現(xiàn):
【變化】入?yún)⒆兓?,讀取的屬性不同,只是本次恰巧相同而已
- OrderListVO 指的是 OrderListVO 屬性 order 的id值
- OrderDetailVO 指的是 OrderDetailVO 屬性 order 的值
【不變】處理邏輯不變
- keyFromJoinData 指的是 user對象的 id
- loader 指的是通過 userRepository 的 getByIds 加載數(shù)據(jù)
- joinDataConverter 指的是將 user 轉換為 UserVO
【不變】
- 將綁定結果 UserVO 綁定到屬性上(屬性名不同沒有影響)
對于不變部分如何進行統(tǒng)一管理?
自定義注解 結合 Spring @AliasFor 便可以解決這個問題,以 UserVO 為例:
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
// 管理通用屬性
@JoinInMemory(keyFromSourceData = "",
keyFromJoinData = "#{id}",
loader = "#{@userRepository.getByIds(#root)}",
joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}"
)
public @interface JoinUserVOOnId {
// 使用別名將 keyFromSourceData 的配置暴露出來
@AliasFor(
annotation = JoinInMemory.class
)
String keyFromSourceData();
}
新注解有如下幾個特點:
- 在注解上使用 @JoinInMemory 注解完成對通用屬性的配置
- 在自定義注解 JoinUserVOOnId 的 keyFromSourceData 屬性上,添加 @AliasFor 注解,將配置暴露給使用方
有了自定義注解,使用變的非常方便:
@Data
public class OrderListVOAnnV2 extends OrderListVO {
private final OrderVO order;
// 只需配置參數(shù)即可,其他配置由 JoinUserVOOnId 進行管理
@JoinUserVOOnId(keyFromSourceData = "#{order.userId}")
private UserVO user;
}
@Data
public class OrderDetailVOAnnV2 extends OrderDetailVO {
private final OrderVO order;
// 只需配置參數(shù)即可,其他配置由 JoinUserVOOnId 進行管理
@JoinUserVOOnId(keyFromSourceData = "#{order.userId}")
private UserVO user;
}
其他使用方式不變,但實現(xiàn)了邏輯簡化:
- 新增綁定數(shù)據(jù),只需自定義綁定注解
- VO 需新的綁定數(shù)據(jù),只需在屬性上添加綁定注解
5.3. 開啟并發(fā) @JoinInMemoryConfig 來幫忙
如果擔心性能,可以一鍵開啟并發(fā)綁定,示例如下:
@Data
@JoinInMemoryConfig(executorType = JoinInMemeoryExecutorType.PARALLEL)
public class OrderListVOAnnV3 extends OrderListVO {
private final OrderVO order;
@JoinUserVOOnId(keyFromSourceData = "#{order.userId}")
private UserVO user;
@JoinAddressVOOnId(keyFromSourceData = "#{order.addressId}")
private AddressVO address;
@JoinProductVOOnId(keyFromSourceData = "#{order.productId}")
private ProductVO product;
}
JoinInMemoryConfig 配置如下:
6. 最佳實踐
6.1.將定義注解視為最佳實踐
@JoinInMemory 注解上配置的信息太多,如果直接在業(yè)務代碼中使用,非常難以維護,當每個配置發(fā)生變化后,很難一次性修改到位。所以,建議只將他作為“原注解”使用。
整體思路詳見:
6.2. 注意線程池隔離
對于不同的數(shù)據(jù)綁定需求,建議使用不同的線程池,從資源層面對不同功能進行隔離,從而將由于依賴接口發(fā)生阻塞導致線程耗盡所造成的影響控制在最小范圍。
@JoinInMemoryConfig 的 executorName 屬性配置的便是執(zhí)行器名稱,不配置直接使用 “defaultExecutor”,具體代碼如下:
@Bean
public ExecutorService defaultExecutor(){
BasicThreadFactory basicThreadFactory = new BasicThreadFactory.Builder()
.namingPattern("JoinInMemory-Thread-%d")
.daemon(true)
.build();
int maxSize = Runtime.getRuntime().availableProcessors() * 3;
return new ThreadPoolExecutor(0, maxSize,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
basicThreadFactory,
new ThreadPoolExecutor.CallerRunsPolicy());
}
如需使用自定義線程池需:
- 自定義線程池,并將其注冊到Spring 容器
- @JoinInMemoryConfig executorName 設置為線程池的 bean name
7. 小結
推導邏輯有點長不知道你get到多少,先簡單回顧一下:
- 今天面對的問題是:如何在應用成進行數(shù)據(jù) Join 操作;
- 我們以我的訂單和訂單詳情兩個接口為業(yè)務切入點,層層進行抽象,發(fā)現(xiàn)變化、封裝變化、管理變化
- 首先是手寫代碼,包括 foreach+單條抓取,批量查詢+內存Join,并行查詢 + 內存Join。在這個層次基本沒有抽象可言,存在大量重復代碼,系統(tǒng)擴展性低
- 其次是 Fetcher方案,為了分離“變化”與“不變”抽取出 Fetcher 和 FetcherExecutor 兩個接口,并使用模板方法和責任鏈模式對其進行抽象,提升系統(tǒng)的擴展性,但實現(xiàn)過于繁瑣不便于推廣
- 最后是注解方案,使用 @JoinInMemory 注解完成繁瑣的配置工作,將通用配置保留在自定義注解進行統(tǒng)一管理,基于 @AliasFor 完成入?yún)⒌呐渲?,還可以使用 @JoinInMemoryConfig 開啟并發(fā)處理