開源框架中的責(zé)任鏈模式實(shí)踐
一、責(zé)任鏈介紹
在GoF 的《設(shè)計(jì)模式》一書中對(duì)責(zé)任鏈模定義的:將請(qǐng)求的發(fā)送和接收解耦,讓多個(gè)接收對(duì)象都有機(jī)會(huì)處理這個(gè)請(qǐng)求。將這些接收對(duì)象串成一條鏈,并沿著這條鏈傳遞這個(gè)請(qǐng)求,直到鏈上的某個(gè)接收對(duì)象能夠處理它為止或者所有接收對(duì)象處理一遍。
用通俗的話解釋在責(zé)任鏈模式中,多個(gè)處理器(接收對(duì)象)依次處理同一個(gè)請(qǐng)求。一個(gè)請(qǐng)求先經(jīng)過(guò) A 處理器處理,然后再把請(qǐng)求傳遞給 B 處理器,B 處理器處理完后再傳遞給 C 處理器,以此類推,形成一個(gè)鏈條。鏈條上的每個(gè)處理器各自承擔(dān)各自的處理職責(zé),所以叫作責(zé)任鏈模式。
責(zé)任鏈模式有效地降低了發(fā)送和接收者之間的耦合度,增強(qiáng)了系統(tǒng)的可擴(kuò)展性。在責(zé)任鏈的模式下不僅能夠針對(duì)單個(gè)處理器對(duì)象進(jìn)行定制升級(jí)(每個(gè)處理器對(duì)象關(guān)注各自的任務(wù)),而且能夠?qū)φ麄€(gè)責(zé)任鏈的處理器對(duì)象的順序的調(diào)整以及增刪。
本文約定:責(zé)任鏈上的接收對(duì)象統(tǒng)一稱為處理器;本文中介紹的責(zé)任鏈屬于GOF定義中責(zé)任鏈的變種即責(zé)任鏈上的所有處理器都會(huì)參與任務(wù)的處理。
二、責(zé)任鏈實(shí)現(xiàn)
責(zé)任鏈模式有多種實(shí)現(xiàn)方式,從驅(qū)動(dòng)責(zé)任鏈上處理器方式的角度可以分類兩類,即責(zé)任鏈驅(qū)動(dòng) 和 責(zé)任鏈處理器自驅(qū)動(dòng)。
2.1 處理器自驅(qū)動(dòng)
// 1、定義抽象類
public abstract class AbstractHandler {
protected Handler next = null;
// 綁定處理器
public void setSuccessor(Handler next) {
this.next = next;
}
// 處理器執(zhí)行操作并驅(qū)動(dòng)下一個(gè)處理器
public abstract void handle();
}
// 2、定義處理器A
public class HandlerA extends AbstractHandler {
@Override
public void handle() {
// do something
if (next != null) {
next.handle();
}
}
}
// 3、定義處理器B
public class HandlerB extends AbstractHandler {
@Override
public void handle() {
// do something
if (next != null) {
next.handle();
}
}
}
// 4、構(gòu)建責(zé)任鏈并添加處理器
public class HandlerChain {
// 通過(guò)鏈表的形式保存責(zé)任鏈
private AbstractHandler head = null;
private AbstractHandler tail = null;
public void addHandler(AbstractHandler handler) {
handler.setSuccessor(null);
if (head == null) {
head = handler;
tail = handler;
return;
}
tail.setSuccessor(handler);
tail = handler;
}
public void handle() {
if (head != null) {
head.handle();
}
}
}
// 5、整體構(gòu)建責(zé)任鏈添加處理器并進(jìn)行驅(qū)動(dòng)
public class Application {
public static void main(String[] args) {
// 構(gòu)建責(zé)任鏈并添加處理器
HandlerChain chain = new HandlerChain();
chain.addHandler(new HandlerA());
chain.addHandler(new HandlerB());
// 責(zé)任鏈負(fù)責(zé)觸發(fā)
chain.handle();
}
}
說(shuō)明:
- 責(zé)任鏈上的每個(gè)處理器對(duì)象維護(hù)下一個(gè)處理器對(duì)象,整個(gè)責(zé)任鏈的驅(qū)動(dòng)由每個(gè)處理器對(duì)象自行驅(qū)動(dòng)。
- 每個(gè)處理器對(duì)象Handler中包含下一個(gè)處理器對(duì)象next的變量,通過(guò)鏈表形式維護(hù)責(zé)任鏈的關(guān)系。
2.2 責(zé)任鏈驅(qū)動(dòng)
// 1、定義抽象接口
public interface IHandler {
void doSomething();
}
// 2、定義處理器A
public class HandlerA implements IHandler {
@Override
public void doSomething() {
// do something
}
}
// 3、定義處理器B
public class HandlerB implements IHandler {
@Override
public void doSomething() {
// do something
}
}
// 4、構(gòu)建責(zé)任鏈并添加處理器
public class HandlerChain {
// 通過(guò)數(shù)組的形式保存處理器
private List<IHandler> handlers = new ArrayList<>();
public void addHandler(IHandler handler) {
handlers.add(handler);
}
// 由責(zé)任鏈負(fù)責(zé)遍歷所有的處理器并進(jìn)行調(diào)用
public void handle() {
for (IHandler handler : handlers) {
handler.handle();
}
}
}
// 5、整體構(gòu)建責(zé)任鏈添加處理器并進(jìn)行驅(qū)動(dòng)
public class Application {
public static void main(String[] args) {
HandlerChain chain = new HandlerChain();
chain.addHandler(new HandlerA());
chain.addHandler(new HandlerB());
chain.handle();
}
}
說(shuō)明:
- 責(zé)任鏈對(duì)象本身以數(shù)組的形式維護(hù)處理器對(duì)象,即上述代碼中的handlers 。
- 責(zé)任鏈的處理器的執(zhí)行由責(zé)任鏈對(duì)象循環(huán)調(diào)用處理器對(duì)象驅(qū)動(dòng),即上述代碼中的handle方法。
三、開源框架中責(zé)任鏈應(yīng)用
責(zé)任鏈低耦合高擴(kuò)展的特點(diǎn)讓它在很多開源的框架中被采用,本文選取了開源框架中的Spring Interceptor、Servlet Filter、Dubbo、Sentinel進(jìn)行責(zé)任鏈的實(shí)現(xiàn)介紹,通過(guò)對(duì)常用框架中責(zé)任鏈應(yīng)用的了解能夠更好掌握責(zé)任鏈落地并在日常的開發(fā)中積極的使用。
3.1 Spring Interceptor
3.1.1 Interceptor介紹
圖片
- Spring中的攔截器(Interceptor) 用于攔截控制器方法的執(zhí)行,可以在方法執(zhí)行前后添加自定義邏輯類似于AOP編程思想。
- Inteceptor的作用時(shí)機(jī)是在請(qǐng)求(request)進(jìn)入servlet后,在進(jìn)入Controller之前進(jìn)行預(yù)處理。
- Inteceptor的實(shí)際應(yīng)用包括:認(rèn)證授權(quán)、日志記錄、字符編碼轉(zhuǎn)換,敏感詞過(guò)濾等等。
- Inteceptor中責(zé)任鏈的實(shí)現(xiàn)會(huì)從處理器的介紹,責(zé)任鏈的構(gòu)建以及責(zé)任鏈的執(zhí)行三個(gè)角度進(jìn)行闡述。
3.1.2 處理器介紹
public interface HandlerInterceptor {
boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception;
void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception;
void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception;
}
@Component
public class TimeInterceptor extends HandlerInterceptorAdapter {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 前置處理
System.out.println("time interceptor preHandle");
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
// 后置處理
System.out.println("time interceptor postHandle");
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
System.out.println("time interceptor afterCompletion");
}
}
說(shuō)明:
- 處理器Interceptor的接口HandlerInterceptor定義了三個(gè)方法,可在控制器方法執(zhí)行前后添加自定義邏輯。
- 自定義處理器如上的TimeInterceptor需要自定義實(shí)現(xiàn)上述3個(gè)方法實(shí)現(xiàn)自我的邏輯。
- 所有的自定義處理會(huì)串聯(lián)在HandlerExecutionChain類實(shí)現(xiàn)的責(zé)任鏈上。
3.1.3 責(zé)任鏈構(gòu)建
public class HandlerExecutionChain {
private final Object handler;
private HandlerInterceptor[] interceptors;
private List<HandlerInterceptor> interceptorList;
private int interceptorIndex = -1;
public void addInterceptor(HandlerInterceptor interceptor) {
// 添加攔截器
initInterceptorList().add(interceptor);
}
public void addInterceptors(HandlerInterceptor... interceptors) {
if (!ObjectUtils.isEmpty(interceptors)) {
CollectionUtils.mergeArrayIntoCollection(interceptors, initInterceptorList());
}
}
private List<HandlerInterceptor> initInterceptorList() {
if (this.interceptorList == null) {
this.interceptorList = new ArrayList<HandlerInterceptor>();
if (this.interceptors != null) {
// An interceptor array specified through the constructor
CollectionUtils.mergeArrayIntoCollection(this.interceptors, this.interceptorList);
}
}
this.interceptors = null;
return this.interceptorList;
}
}
說(shuō)明:
- HandlerExecutionChain類作為串聯(lián)Interceptor處理器的責(zé)任鏈負(fù)責(zé)責(zé)任鏈的構(gòu)建和執(zhí)行。
- HandlerExecutionChain類通過(guò)集合對(duì)象interceptorList保存所有相關(guān)的處理器對(duì)象。
3.1.4 責(zé)任鏈執(zhí)行
public class DispatcherServlet extends FrameworkServlet {
protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception {
try {
try {
// mappedHandler代表的是HandlerExecutionChain責(zé)任鏈 mappedHandler = getHandler(processedRequest);
HandlerAdapter ha = getHandlerAdapter(mappedHandler.getHandler());
// 1、執(zhí)行mappedHandler的applyPreHandle方法
if (!mappedHandler.applyPreHandle(processedRequest, response)) {
return;
}
// 2、執(zhí)行controller的執(zhí)行邏輯
mv = ha.handle(processedRequest, response, mappedHandler.getHandler());
if (asyncManager.isConcurrentHandlingStarted()) {
return;
}
applyDefaultViewName(processedRequest, mv);
// 執(zhí)行mappedHandler的applyPostHandle方法
mappedHandler.applyPostHandle(processedRequest, response, mv);
}
catch (Exception ex) {
}
processDispatchResult(processedRequest, response, mappedHandler, mv, dispatchException);
}
catch (Exception ex) {
}
finally {
}
}
}
public class HandlerExecutionChain {
private final Object handler;
private HandlerInterceptor[] interceptors;
private List<HandlerInterceptor> interceptorList;
private int interceptorIndex = -1;
boolean applyPreHandle(HttpServletRequest request, HttpServletResponse response) throws Exception {
HandlerInterceptor[] interceptors = getInterceptors();
if (!ObjectUtils.isEmpty(interceptors)) {
// 責(zé)任鏈從前往后的順序執(zhí)行
for (int i = 0; i < interceptors.length; i++) {
HandlerInterceptor interceptor = interceptors[i];
if (!interceptor.preHandle(request, response, this.handler)) {
triggerAfterCompletion(request, response, null);
return false;
}
this.interceptorIndex = i;
}
}
return true;
}
void applyPostHandle(HttpServletRequest request, HttpServletResponse response, ModelAndView mv) throws Exception {
HandlerInterceptor[] interceptors = getInterceptors();
if (!ObjectUtils.isEmpty(interceptors)) {
// 責(zé)任鏈從后往前的順序執(zhí)行
for (int i = interceptors.length - 1; i >= 0; i--) {
HandlerInterceptor interceptor = interceptors[i];
interceptor.postHandle(request, response, this.handler, mv);
}
}
}
}
說(shuō)明:
- 在servlet的doDispatch方法中依次觸發(fā)責(zé)任鏈的applyPreHandle的前置處理方法、applyPostHandle的后置處理方法。
- 前置處理方法applyPreHandle會(huì)遍歷責(zé)任鏈上的處理器從前往后依次處理,后置處理方法applyPostHandle會(huì)遍歷責(zé)任鏈上的處理器從后往前依次處理。
- 處理器的驅(qū)動(dòng)由責(zé)任鏈對(duì)象負(fù)責(zé)依次觸發(fā),非處理器對(duì)象自驅(qū)執(zhí)行。
3.2 Servlet Filter
3.2.1 Filter介紹
- Servlet過(guò)濾器是在Java Servlet規(guī)范2.3中定義的,它能夠?qū)ervlet容器的請(qǐng)求和響應(yīng)對(duì)象進(jìn)行檢查和修改,是個(gè)典型的責(zé)任鏈。
- 在Servlet被調(diào)用之前檢查Request對(duì)象并支持修改Request Header和Request內(nèi)容。
- 在Servlet被調(diào)用之后檢查Response對(duì)象并支修改Response Header和Response內(nèi)容。
3.2.2 處理器介紹
public interface Filter {
public void init(FilterConfig filterConfig) throws ServletException;
public void doFilter ( ServletRequest request, ServletResponse response, FilterChain chain ) throws IOException, ServletException;
public void destroy();
}
public class TimeFilter implements Filter {
@Override
public void init(FilterConfig filterConfig) throws ServletException {
System.out.println("time filter init");
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
// 1、執(zhí)行處理的邏輯
System.out.println("time filter doFilter");
// 2、執(zhí)行責(zé)任鏈當(dāng)中的下一個(gè) Filter 對(duì)象,等價(jià)于執(zhí)行 FilterChain 的internalDoFilter方法
filterChain.doFilter(servletRequest, servletResponse);
}
}
說(shuō)明:
- Servlet過(guò)濾器類要實(shí)現(xiàn)javax.servlet.Filter接口,該接口定義了通用的3個(gè)方法。
- init方法:負(fù)責(zé)Servlet過(guò)濾器的初始化方法,Servlet容器創(chuàng)建Servlet過(guò)濾器實(shí)例過(guò)程中調(diào)用這個(gè)方法。
- doFilter方法:當(dāng)客戶請(qǐng)求訪問(wèn)與過(guò)濾器關(guān)聯(lián)的URL時(shí),Servlet容器會(huì)調(diào)用該方法。
- destroy方法:Servlet容器在銷毀過(guò)濾器實(shí)例前調(diào)用該方法,可以釋放過(guò)濾器占用的資源。
3.2.3 責(zé)任鏈構(gòu)建
public final class ApplicationFilterChain implements FilterChain {
// 責(zé)任鏈上 Filter 的維護(hù)對(duì)象
private ApplicationFilterConfig[] filters = new ApplicationFilterConfig[0];
//責(zé)任鏈上待執(zhí)行的 Filter 對(duì)象
private int pos = 0;
// 責(zé)任鏈上擁有的 Filter 數(shù)量
private int n = 0;
void addFilter(ApplicationFilterConfig filterConfig) {
// 避免重復(fù)添加Filter
for(ApplicationFilterConfig filter:filters)
if(filter==filterConfig)
return;
// 按需進(jìn)行擴(kuò)容
if (n == filters.length) {
ApplicationFilterConfig[] newFilters =
new ApplicationFilterConfig[n + INCREMENT];
System.arraycopy(filters, 0, newFilters, 0, n);
filters = newFilters;
}
// 保存Filter 對(duì)象
filters[n++] = filterConfig;
}
}
說(shuō)明:
- ApplicationFilterChain作為Filter的責(zé)任鏈,負(fù)責(zé)責(zé)任鏈的構(gòu)建和執(zhí)行。
- 責(zé)任鏈通過(guò)ApplicationFilterConfig類型的數(shù)組對(duì)象filters保存Filter處理器。
- 責(zé)任鏈上處理器的添加通過(guò)保存到數(shù)組filters來(lái)實(shí)現(xiàn)。
3.2.4 責(zé)任鏈執(zhí)行
public final class ApplicationFilterChain implements FilterChain {
// 責(zé)任鏈上 Filter 的維護(hù)對(duì)象
private ApplicationFilterConfig[] filters = new ApplicationFilterConfig[0];
//責(zé)任鏈上待執(zhí)行的 Filter 對(duì)象
private int pos = 0;
// 責(zé)任鏈上擁有的 Filter 數(shù)量
private int n = 0;
// 責(zé)任鏈的執(zhí)行
private void internalDoFilter(ServletRequest request, ServletResponse response)
throws IOException, ServletException {
// 在責(zé)任鏈未執(zhí)行完的情況下執(zhí)行責(zé)任鏈 if (pos < n) {
// 獲取當(dāng)前待執(zhí)行的 Filter,同時(shí)遞增下一次待執(zhí)行責(zé)任鏈的下標(biāo)
ApplicationFilterConfig filterConfig = filters[pos++];
try {
Filter filter = filterConfig.getFilter();
if( Globals.IS_SECURITY_ENABLED ) {
// 省略相關(guān)代碼
} else {
filter.doFilter(request, response, this);
}
} catch (Throwable e) {
}
return;
}
try {
if ((request instanceof HttpServletRequest) &&
(response instanceof HttpServletResponse) &&
Globals.IS_SECURITY_ENABLED ) {
// 執(zhí)行正常的業(yè)務(wù)邏輯
} else {
servlet.service(request, response);
}
} catch (Throwable e) {
e = ExceptionUtils.unwrapInvocationTargetException(e);
throw new ServletException(sm.getString("filterChain.servlet"), e);
}
}
}
說(shuō)明:
- 整個(gè)責(zé)任鏈上Filter處理器的執(zhí)行通過(guò)處理器自驅(qū)進(jìn)行實(shí)現(xiàn),而非由責(zé)任鏈對(duì)象驅(qū)動(dòng)。
- Filter處理器的在處理過(guò)程中除了執(zhí)行自我邏輯,會(huì)通過(guò)filterChain.doFilter
(servletRequest, servletResponse)觸發(fā)下一個(gè)處理器的執(zhí)行。
3.3 Dubbo
3.3.1 Dubbo Filter介紹
- Dubbo的Filter作用時(shí)機(jī)如上圖所示,F(xiàn)ilter實(shí)現(xiàn)是專門為服務(wù)提供方和服務(wù)消費(fèi)方調(diào)用過(guò)程進(jìn)行攔截,Dubbo本身的大多功能均基于此擴(kuò)展點(diǎn)實(shí)現(xiàn),每次遠(yuǎn)程方法執(zhí)行該攔截都會(huì)被執(zhí)行。
- Dubbo官方針對(duì)Filter做了很多的原生支持,目前大致有20來(lái)個(gè)吧,包括我們熟知的RpcContext,accesslog功能都是通過(guò)filter來(lái)實(shí)現(xiàn)了。
- 在實(shí)際業(yè)務(wù)開發(fā)中會(huì)對(duì)Filter接口進(jìn)行擴(kuò)展,在服務(wù)調(diào)用鏈路中嵌入我們自身的處理邏輯,如日志打印、調(diào)用耗時(shí)統(tǒng)計(jì)等。
3.3.2 處理器介紹
@Activate(group = PROVIDER, value = ACCESS_LOG_KEY)
public class AccessLogFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
try {
if (ConfigUtils.isNotEmpty(accessLogKey)) {
AccessLogData logData = buildAccessLogData(invoker, inv);
log(accessLogKey, logData);
}
} catch (Throwable t) {
}
// 執(zhí)行下一個(gè)invoker
return invoker.invoke(inv);
}
}
說(shuō)明:
- Dubbo中的自定義Filter需要實(shí)現(xiàn)org.apache.dubbo.rpc.Filter類,內(nèi)部通過(guò)實(shí)現(xiàn)invoke方法來(lái)實(shí)現(xiàn)自定義邏輯。
- 自定義Filter內(nèi)部除了實(shí)現(xiàn)必要的自定義邏輯外,核心的需要通過(guò)invoker.invoke(inv)觸發(fā)下一個(gè)過(guò)濾器的執(zhí)行。
3.3.3 責(zé)任鏈構(gòu)建
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
public ProtocolFilterWrapper(Protocol protocol) {
this.protocol = protocol;
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
// 最后的 Invoker 對(duì)象
Invoker<T> last = invoker;
// 遍歷所有 Filter 對(duì)象,構(gòu)建責(zé)任鏈 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
// 每個(gè) Filter 封裝成一個(gè) Invoker 對(duì)象,通過(guò) filter.invoke進(jìn)行串聯(lián)
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
};
}
}
return last;
}
}
// 封裝了Filter的invoker對(duì)象
static final class ProtocolFilterWrapper.1 implements Invoker < T > {
final Invoker val$invoker;
final Filter val$filter;
// 指向下一個(gè)Invoker的變量
final Invoker val$next;
public Result invoke(Invocation invocation) throws RpcException {
return this.val$filter.invoke(this.val$next, invocation);
}
ProtocolFilterWrapper.1(Invoker invoker, Filter filter, Invoker invoker2) {
this.val$invoker = invoker;
this.val$filter = filter;
this.val$next = invoker2;
}
}
說(shuō)明:
- ProtocolFilterWrapper通過(guò)
buildInvokerChain構(gòu)建Dubbo Filter的責(zé)任鏈。 - 責(zé)任鏈上的處理器對(duì)象是將Filter封裝的Invoker對(duì)象,每個(gè)Invoker對(duì)象指向下一個(gè)處理器封裝的Invoker對(duì)象。
3.3.4 責(zé)任鏈執(zhí)行
public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
public FailfastClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
// 執(zhí)行封裝了Filter的invoker對(duì)象,驅(qū)動(dòng)處理器的執(zhí)行
return invoker.invoke(invocation);
} catch (Throwable e) {
}
}
}
static final class ProtocolFilterWrapper.1 implements Invoker < T > {
final Invoker val$invoker;
final Filter val$filter;
final Invoker val$next;
public Result invoke(Invocation invocation) throws RpcException {
return this.val$filter.invoke(this.val$next, invocation);
}
ProtocolFilterWrapper.1(Invoker invoker, Filter filter, Invoker invoker2) {
this.val$invoker = invoker;
this.val$filter = filter;
this.val$next = invoker2;
}
說(shuō)明:
- 每個(gè)Invoker對(duì)象invoke方法會(huì)執(zhí)行自定義邏輯,并觸發(fā)下一個(gè)處理器的執(zhí)行。
- 整個(gè)責(zé)任鏈上處理器的執(zhí)行通過(guò)Invoker對(duì)象的驅(qū)動(dòng),而非責(zé)任鏈對(duì)象的驅(qū)動(dòng)。
3.4 Sentinel
3.4.1 Sentinel Slot介紹
- Sentinel是面向分布式服務(wù)架構(gòu)的流量治理組件,以流量為切入點(diǎn)提供熔斷限流的功能保證系統(tǒng)的穩(wěn)定性。
- Sentinel 里面以Entry作為限流的資源對(duì)象,每個(gè)Entry創(chuàng)建的同時(shí)會(huì)關(guān)聯(lián)一系列功能插槽(slot chain)。
- Sentinel提供了通用的原生Slot處理不同的邏輯,同時(shí)支持自定義Slot來(lái)定制功能。
3.4.2 處理器介紹
public interface ProcessorSlot<T> {
void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,Object... args) throws Throwable;
void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,Object... args) throws Throwable;
void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
private AbstractLinkedProcessorSlot<?> next = null;
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
// 觸發(fā)下一個(gè)處理器對(duì)象的處理
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
// 執(zhí)行具體處理器的邏輯,由具體的處理器自行實(shí)現(xiàn)
entry(context, resourceWrapper, t, count, prioritized, args);
}
public void setNext(AbstractLinkedProcessorSlot<?> next) {
// 綁定下一個(gè)處理器的邏輯
this.next = next;
}
}
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
// 1、處理器處理本身的邏輯
DefaultNode node = map.get(context.getName());
context.setCurNode(node);
// 2、處理器驅(qū)動(dòng)觸發(fā)下一個(gè)處理器
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
說(shuō)明:
- Sentinel中的Slot需要實(shí)現(xiàn)
com.alibaba.csp.sentinel.slotchain.ProcessorSlot的通用接口。 - 自定義Slot一般繼承抽象類AbstractLinkedProcessorSlot且只要改寫entry/exit方法實(shí)現(xiàn)自定義邏輯。
- Slot通過(guò)next變量保存下一個(gè)處理器Slot對(duì)象。
- 在自定義實(shí)現(xiàn)的entry方法中需要通過(guò)fireEntry觸發(fā)下一個(gè)處理器的執(zhí)行,在exit方法中通過(guò)fireExit觸發(fā)下一個(gè)處理器的執(zhí)行。
3.4.3 責(zé)任鏈構(gòu)建
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
// 責(zé)任鏈的頭部對(duì)象ProcessorSlotChain
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// sortedSlotList獲取所有的處理器對(duì)象
List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
continue;
}
// 通過(guò)尾添法將職責(zé)slot添加到DefaultProcessorSlotChain當(dāng)中
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
}
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
// 創(chuàng)建DefaultProcessorSlotChain的頭尾節(jié)點(diǎn)first和end
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
super.fireExit(context, resourceWrapper, count, args);
}
};
AbstractLinkedProcessorSlot<?> end = first;
@Override
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
end.setNext(protocolProcessor);
end = protocolProcessor;
}
}
說(shuō)明:
- ProcessorSlotChain作為Slot的責(zé)任鏈,負(fù)責(zé)責(zé)任鏈的構(gòu)建和執(zhí)行。
- 責(zé)任鏈上的處理器對(duì)象
AbstractLinkedProcessorSlot通過(guò)保存指向下一個(gè)處理器的對(duì)象的進(jìn)行關(guān)聯(lián),整體以鏈表的形式進(jìn)行串聯(lián)。 - 責(zé)任鏈上的第一個(gè)處理器對(duì)象first本身不起任何作用,只是保存鏈表的頭部。
3.4.4 責(zé)任鏈執(zhí)行
public class CtSph implements Sph {
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
// 省略相關(guān)代碼
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
Entry e = new CtEntry(resourceWrapper, chain, context);
// 驅(qū)動(dòng)責(zé)任鏈上的第一個(gè)處理器,進(jìn)而由處理器自驅(qū)動(dòng)執(zhí)行下一個(gè)處理器
chain.entry(context, resourceWrapper, null, count, prioritized, args);
return e;
}
}
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
// 創(chuàng)建DefaultProcessorSlotChain的頭尾節(jié)點(diǎn)first和end
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
super.fireExit(context, resourceWrapper, count, args);
}
};
AbstractLinkedProcessorSlot<?> end = first;
@Override
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
end.setNext(protocolProcessor);
end = protocolProcessor;
}
}
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
private AbstractLinkedProcessorSlot<?> next = null;
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
// 觸發(fā)下一個(gè)處理器對(duì)象的處理
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
// 執(zhí)行具體處理器的邏輯,由具體的處理器自行實(shí)現(xiàn)
entry(context, resourceWrapper, t, count, prioritized, args);
}
public void setNext(AbstractLinkedProcessorSlot<?> next) {
// 綁定下一個(gè)處理器的邏輯
this.next = next;
}
}
說(shuō)明:
- 整個(gè)責(zé)任鏈上處理器的執(zhí)行通過(guò)Invoker對(duì)象的驅(qū)動(dòng),而非責(zé)任鏈對(duì)象的驅(qū)動(dòng)。
- DefaultProcessorSlotChain的entry首先頭部對(duì)象first,進(jìn)而觸發(fā)處理器的自驅(qū)實(shí)現(xiàn)處理器的執(zhí)行。
- 整體按照entry →fireEntry →
transformEntry→ entry的循環(huán)順序依次觸發(fā)處理器的自驅(qū)。
四、實(shí)踐總結(jié)
在日常項(xiàng)目實(shí)踐中,責(zé)任鏈的設(shè)計(jì)模式會(huì)在很多業(yè)務(wù)場(chǎng)景中落地。
譬如對(duì)于支持用戶生成內(nèi)容(UGC)的應(yīng)用來(lái)說(shuō),用戶生成的內(nèi)容可能包含一些敏感內(nèi)容如敏感言論或者圖片等。針對(duì)這種應(yīng)用場(chǎng)景,可以通過(guò)責(zé)任鏈模式設(shè)置多個(gè)處理器來(lái)處理不同的任務(wù),如文本過(guò)濾器處理敏感詞,圖片過(guò)濾器處理敏感圖片等等。
譬如對(duì)于電商服務(wù)中的下單流程來(lái)說(shuō),一個(gè)下單流程包含訂單拆合單,優(yōu)惠計(jì)算,訂單生成等多個(gè)步驟,我們可以通過(guò)責(zé)任鏈模式設(shè)置多個(gè)處理器來(lái)處理不同的任務(wù)等等。
責(zé)任鏈的應(yīng)用場(chǎng)景非常廣泛,在常見的開源框架中有豐富的落地場(chǎng)景,同樣在業(yè)務(wù)開發(fā)中也可以根據(jù)場(chǎng)景靈活使用。