Pipeline 與 Valve 的秘密花園
前言
“在上一篇中,我們對 Tomcat 的 Pipeline 和 Valve 組件進(jìn)行了初步的探討,并繪制了其整體架構(gòu)圖。本篇將深入源碼,對這些組件進(jìn)行更加細(xì)致的剖析?!?/p>
Valve
Valve作為業(yè)務(wù)邏輯的執(zhí)行者,在 Tomcat 的請求處理鏈中扮演著基礎(chǔ)而又關(guān)鍵的角色。接下來,我們將深入探究 Valve 接口,了解其所提供的方法。
public interface Valve {
// 獲取下一個(gè)閥門
public Valve getNext();
// 設(shè)置下一個(gè)閥門
public void setNext(Valve valve);
// 后臺執(zhí)行邏輯,主要在類加載上下文中使用到
public void backgroundProcess();
// 執(zhí)行業(yè)務(wù)邏輯
public void invoke(Request request, Response response)
throws IOException, ServletException;
// 是否異步執(zhí)行
public boolean isAsyncSupported();
}
Contained
ValveBase、Pipeline以及其他相關(guān)組件均實(shí)現(xiàn)了Contained接口,用于管理其所屬的容器。Contained 接口提供了簡單而有效的get/set容器操作方法,方便組件在運(yùn)行時(shí)動態(tài)地獲取或設(shè)置其容器引用。
public interface Contained {
/**
* Get the {@link Container} with which this instance is associated.
*
* @return The Container with which this instance is associated or
* <code>null</code> if not associated with a Container
*/
Container getContainer();
/**
* Set the <code>Container</code> with which this instance is associated.
*
* @param container The Container instance with which this instance is to
* be associated, or <code>null</code> to disassociate this instance
* from any Container
*/
void setContainer(Container container);
}
ValveBase
Tomcat 中的 Valve 組件呈現(xiàn)出清晰的繼承體系,絕大多數(shù) Valve 都以ValveBase為基類。為了深入理解 Valve 的工作原理,我們有必要對 ValveBase 這個(gè)抽象類進(jìn)行細(xì)致的分析。
圖片
public abstract class ValveBase extends LifecycleMBeanBase implements Contained, Valve {
// 國際化管理器,可以支持多國語言
protected static final StringManager sm = StringManager.getManager(ValveBase.class);
//------------------------------------------------------ Instance Variables
// 無參構(gòu)造方法,默認(rèn)不支持異步
public ValveBase() {
this(false);
}
// 有參構(gòu)造方法,可傳入異步支持標(biāo)記
public ValveBase(boolean asyncSupported) {
this.asyncSupported = asyncSupported;
}
//------------------------------------------------------ Instance Variables
// 異步標(biāo)記
protected boolean asyncSupported;
// 所屬容器
protected Container container = null;
// 容器日志組件對象
protected Log containerLog = null;
// 下一個(gè)閥門
protected Valve next = null;
//-------------------------------------------------------------- Properties
// 獲取所屬容器
@Override
public Container getContainer() {
return container;
}
// 設(shè)置所屬容器
@Override
public void setContainer(Container container) {
this.container = container;
}
// 是否異步執(zhí)行
@Override
public boolean isAsyncSupported() {
return asyncSupported;
}
// 設(shè)置是否異步執(zhí)行
public void setAsyncSupported(boolean asyncSupported) {
this.asyncSupported = asyncSupported;
}
// 獲取下一個(gè)待執(zhí)行的閥門
@Override
public Valve getNext() {
return next;
}
// 設(shè)置下一個(gè)待執(zhí)行的閥門
@Override
public void setNext(Valve valve) {
this.next = valve;
}
//---------------------------------------------------------- Public Methods
// 后臺執(zhí)行,子類實(shí)現(xiàn)
@Override
public void backgroundProcess() {
// NOOP by default
}
// 初始化邏輯
@Override
protected void initInternal() throws LifecycleException {
super.initInternal();
// 設(shè)置容器日志組件對象到當(dāng)前閥門的containerLog屬性
containerLog = getContainer().getLogger();
}
// 啟動邏輯
@Override
protected synchronized void startInternal() throws LifecycleException {
setState(LifecycleState.STARTING);
}
// 停止邏輯
@Override
protected synchronized void stopInternal() throws LifecycleException {
setState(LifecycleState.STOPPING);
}
// 重寫toString,格式為[${containerName}]
@Override
public String toString() {
StringBuilder sb = new StringBuilder(this.getClass().getName());
sb.append('[');
if (container == null) {
sb.append("Container is null");
} else {
sb.append(container.getName());
}
sb.append(']');
return sb.toString();
}
// -------------------- JMX and Registration --------------------
// 設(shè)置獲取MBean對象的keyProperties,格式如:a=b,c=d,e=f...
@Override
public String getObjectNameKeyProperties() {
StringBuilder name = new StringBuilder("type=Valve");
Container container = getContainer();
name.append(container.getMBeanKeyProperties());
int seq = 0;
// Pipeline may not be present in unit testing
Pipeline p = container.getPipeline();
if (p != null) {
for (Valve valve : p.getValves()) {
// Skip null valves
if (valve == null) {
continue;
}
// Only compare valves in pipeline until we find this valve
if (valve == this) {
break;
}
if (valve.getClass() == this.getClass()) {
// Duplicate valve earlier in pipeline
// increment sequence number
seq ++;
}
}
}
if (seq > 0) {
name.append(",seq=");
name.append(seq);
}
String className = this.getClass().getName();
int period = className.lastIndexOf('.');
if (period >= 0) {
className = className.substring(period + 1);
}
name.append(",name=");
name.append(className);
return name.toString();
}
// 獲取所屬域,從container獲取
@Override
public String getDomainInternal() {
Container c = getContainer();
if (c == null) {
return null;
} else {
return c.getDomain();
}
}
}
Pipeline
Pipeline 可視作一個(gè) Valve 的容器,這些 Valve 在 Pipeline 中首尾相連,形成一條處理鏈。當(dāng)請求抵達(dá) Pipeline 時(shí),它會沿著這條鏈逐個(gè)激活 Valve,觸發(fā)每個(gè) Valve 的 invoke()
方法,從而完成一系列的業(yè)務(wù)處理。
具體代碼如下:
public interface Pipeline {
// ------------------------------------------------------------- Properties
// 獲取基本閥門
public Valve getBasic();
// 設(shè)置基本閥門
public void setBasic(Valve valve);
// --------------------------------------------------------- Public Methods
// 添加閥門
public void addValve(Valve valve);
// 獲取閥門數(shù)組
public Valve[] getValves();
// 刪除閥門
public void removeValve(Valve valve);
// 獲取首個(gè)閥門
public Valve getFirst();
// 管道內(nèi)所有閥門是否異步執(zhí)行
public boolean isAsyncSupported();
// 獲取管道所屬的容器
public Container getContainer();
// 設(shè)置管道所屬的容器
public void setContainer(Container container);
// 查找非異步執(zhí)行的所有閥門,并放置到result參數(shù)中,所以result不允許為null
public void findNonAsyncValves(Set<String> result);
}
StandardPipeline
接下來,我們將目光轉(zhuǎn)向** StandardPipeline,這是 Pipeline 接口的唯一實(shí)現(xiàn)**。盡管代碼規(guī)模較大,但其內(nèi)部的實(shí)現(xiàn)邏輯卻清晰易懂。
public class StandardPipeline extends LifecycleBase
implements Pipeline, Contained {
private static final Log log = LogFactory.getLog(StandardPipeline.class);
// ----------------------------------------------------------- Constructors
// 構(gòu)造一個(gè)沒有所屬容器的管道
public StandardPipeline() {
this(null);
}
// 構(gòu)造一個(gè)有所屬容器的管道
public StandardPipeline(Container container) {
super();
setContainer(container);
}
// ----------------------------------------------------- Instance Variables
/**
* 基本閥門,最后執(zhí)行的閥門
*/
protected Valve basic = null;
/**
* 管道所屬的容器
*/
protected Container container = null;
/**
* 管道里面的首個(gè)執(zhí)行的閥門
*/
protected Valve first = null;
// --------------------------------------------------------- Public Methods
// 是否異步執(zhí)行,如果一個(gè)閥門都沒有,或者所有閥門都是異步執(zhí)行的,才返回true
@Override
public boolean isAsyncSupported() {
Valve valve = (first!=null)?first:basic;
boolean supported = true;
while (supported && valve!=null) {
supported = supported & valve.isAsyncSupported();
valve = valve.getNext();
}
return supported;
}
// 查找所有未異步執(zhí)行的閥門
@Override
public void findNonAsyncValves(Set<String> result) {
Valve valve = (first!=null) ? first : basic;
while (valve != null) {
if (!valve.isAsyncSupported()) {
result.add(valve.getClass().getName());
}
valve = valve.getNext();
}
}
// ------------------------------------------------------ Contained Methods
// 獲取所屬容器
@Override
public Container getContainer() {
return (this.container);
}
// 設(shè)置所屬容器
@Override
public void setContainer(Container container) {
this.container = container;
}
// 初始化邏輯,默認(rèn)沒有任何邏輯
@Override
protected void initInternal() {
// NOOP
}
// 開始邏輯,調(diào)用所有閥門的start方法
@Override
protected synchronized void startInternal() throws LifecycleException {
// Start the Valves in our pipeline (including the basic), if any
Valve current = first;
if (current == null) {
current = basic;
}
while (current != null) {
if (current instanceof Lifecycle)
((Lifecycle) current).start();
current = current.getNext();
}
setState(LifecycleState.STARTING);
}
// 停止邏輯,調(diào)用所有閥門的stop方法
@Override
protected synchronized void stopInternal() throws LifecycleException {
setState(LifecycleState.STOPPING);
// Stop the Valves in our pipeline (including the basic), if any
Valve current = first;
if (current == null) {
current = basic;
}
while (current != null) {
if (current instanceof Lifecycle)
((Lifecycle) current).stop();
current = current.getNext();
}
}
// 銷毀邏輯,移掉所有閥門,調(diào)用removeValve方法
@Override
protected void destroyInternal() {
Valve[] valves = getValves();
for (Valve valve : valves) {
removeValve(valve);
}
}
/**
* 重新toString方法
*/
@Override
public String toString() {
StringBuilder sb = new StringBuilder("Pipeline[");
sb.append(container);
sb.append(']');
return sb.toString();
}
// ------------------------------------------------------- Pipeline Methods
// 獲取基礎(chǔ)閥門
@Override
public Valve getBasic() {
return (this.basic);
}
// 設(shè)置基礎(chǔ)閥門
@Override
public void setBasic(Valve valve) {
// Change components if necessary
Valve oldBasic = this.basic;
if (oldBasic == valve)
return;
// Stop the old component if necessary
// 老的基礎(chǔ)閥門會被調(diào)用stop方法且所屬容器置為null
if (oldBasic != null) {
if (getState().isAvailable() && (oldBasic instanceof Lifecycle)) {
try {
((Lifecycle) oldBasic).stop();
} catch (LifecycleException e) {
log.error("StandardPipeline.setBasic: stop", e);
}
}
if (oldBasic instanceof Contained) {
try {
((Contained) oldBasic).setContainer(null);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
}
}
}
// Start the new component if necessary
// 新的閥門會設(shè)置所屬容器,并調(diào)用start方法
if (valve == null)
return;
if (valve instanceof Contained) {
((Contained) valve).setContainer(this.container);
}
if (getState().isAvailable() && valve instanceof Lifecycle) {
try {
((Lifecycle) valve).start();
} catch (LifecycleException e) {
log.error("StandardPipeline.setBasic: start", e);
return;
}
}
// Update the pipeline
// 替換pipeline中的基礎(chǔ)閥門,就是講基礎(chǔ)閥門的前一個(gè)閥門的next指向當(dāng)前閥門
Valve current = first;
while (current != null) {
if (current.getNext() == oldBasic) {
current.setNext(valve);
break;
}
current = current.getNext();
}
this.basic = valve;
}
// 添加閥門
@Override
public void addValve(Valve valve) {
// Validate that we can add this Valve
// 設(shè)置所屬容器
if (valve instanceof Contained)
((Contained) valve).setContainer(this.container);
// Start the new component if necessary
// 調(diào)用閥門的start方法
if (getState().isAvailable()) {
if (valve instanceof Lifecycle) {
try {
((Lifecycle) valve).start();
} catch (LifecycleException e) {
log.error("StandardPipeline.addValve: start: ", e);
}
}
}
// Add this Valve to the set associated with this Pipeline
// 設(shè)置閥門,將閥門添加到基礎(chǔ)閥門的前一個(gè)
if (first == null) {
first = valve;
valve.setNext(basic);
} else {
Valve current = first;
while (current != null) {
if (current.getNext() == basic) {
current.setNext(valve);
valve.setNext(basic);
break;
}
current = current.getNext();
}
}
container.fireContainerEvent(Container.ADD_VALVE_EVENT, valve);
}
// 獲取閥門數(shù)組
@Override
public Valve[] getValves() {
ArrayList<Valve> valveList = new ArrayList<>();
Valve current = first;
if (current == null) {
current = basic;
}
while (current != null) {
valveList.add(current);
current = current.getNext();
}
return valveList.toArray(new Valve[0]);
}
// JMX方法,在此忽略
public ObjectName[] getValveObjectNames() {
ArrayList<ObjectName> valveList = new ArrayList<>();
Valve current = first;
if (current == null) {
current = basic;
}
while (current != null) {
if (current instanceof JmxEnabled) {
valveList.add(((JmxEnabled) current).getObjectName());
}
current = current.getNext();
}
return valveList.toArray(new ObjectName[0]);
}
// 移除閥門
@Override
public void removeValve(Valve valve) {
Valve current;
if(first == valve) {
// 如果待移出的閥門是首個(gè)閥門,則首個(gè)閥門的下一個(gè)閥門變成首個(gè)閥門
first = first.getNext();
current = null;
} else {
current = first;
}
// 遍歷閥門集合,并進(jìn)行移除
while (current != null) {
if (current.getNext() == valve) {
current.setNext(valve.getNext());
break;
}
current = current.getNext();
}
if (first == basic) first = null;
// 設(shè)置閥門所屬容器為null
if (valve instanceof Contained)
((Contained) valve).setContainer(null);
// 調(diào)用待移除閥門的stop方法和destroy方法,并觸發(fā)移除閥門事件
if (valve instanceof Lifecycle) {
// Stop this valve if necessary
if (getState().isAvailable()) {
try {
((Lifecycle) valve).stop();
} catch (LifecycleException e) {
log.error("StandardPipeline.removeValve: stop: ", e);
}
}
try {
((Lifecycle) valve).destroy();
} catch (LifecycleException e) {
log.error("StandardPipeline.removeValve: destroy: ", e);
}
}
container.fireContainerEvent(Container.REMOVE_VALVE_EVENT, valve);
}
// 獲取首個(gè)閥門,如果閥門列表為null,返回基礎(chǔ)閥門
@Override
public Valve getFirst() {
if (first != null) {
return first;
}
return basic;
}
}
總結(jié)
經(jīng)過對代碼的深入剖析,我們發(fā)現(xiàn)其中蘊(yùn)含著兩種經(jīng)典的設(shè)計(jì)模式:
- 模板方法模式: Pipeline 接口作為抽象類,定義了請求處理的整體流程骨架。StandardPipeline 作為具體子類,通過實(shí)現(xiàn)抽象方法來填充骨架中的細(xì)節(jié),從而定制化請求處理過程。這種模式賦予了系統(tǒng)良好的擴(kuò)展性,使得我們可以通過繼承 StandardPipeline 來創(chuàng)建新的 Pipeline 實(shí)現(xiàn)。
- 責(zé)任鏈模式: Valve 的組織方式體現(xiàn)了責(zé)任鏈模式的精髓。每個(gè) Valve 都持有指向下一個(gè) Valve 的引用,形成一條處理鏈。當(dāng)請求到達(dá)時(shí),它沿著這條鏈依次傳遞,直到被某個(gè) Valve 處理或到達(dá)鏈尾。這種模式將請求的處理過程分解成一系列的環(huán)節(jié),每個(gè)環(huán)節(jié)負(fù)責(zé)處理特定的業(yè)務(wù)邏輯,從而提高了系統(tǒng)的模塊化和可維護(hù)性。