RocketMQ消息回溯實踐與解析
1 問題背景
前段時間,小A公司的短信服務(wù)出現(xiàn)了問題,導(dǎo)致一段時間內(nèi)的短信沒有發(fā)出去,等服務(wù)修復(fù)后,需要重新補發(fā)這批數(shù)據(jù)。
由于短信服務(wù)是直接通過RocketMQ觸發(fā),因此在修復(fù)這些數(shù)據(jù)的時候,小A犯了難,于是就有了以下對話
領(lǐng)導(dǎo):小A呀,這數(shù)據(jù)這么多,你準備怎么修呀?
小A:頭大呀領(lǐng)導(dǎo),一般業(yè)務(wù)我們都有一個本地消息表來做冪等,我只需要把數(shù)據(jù)庫表的狀態(tài)重置,然后把數(shù)據(jù)撈出來重新循環(huán)執(zhí)行就可以啦,但是短信服務(wù)我們沒有本地表呀!
領(lǐng)導(dǎo):那你有什么想法嗎?
小A:簡單的話,那就讓上游重發(fā)吧,我們再消費一遍就好了。
領(lǐng)導(dǎo):這樣問題就更嚴重了呀,你想,上游重發(fā)一遍,那是不是所有的消費者組都要重新消費一遍,到時候其他業(yè)務(wù)同學(xué)就要來找你了。
小A:那就不好辦了。。。
領(lǐng)導(dǎo):其實RocketMQ有專門的消息回溯的能力,你可以試試
小A:這么神奇?我研究研究。。。
2 驗證
2.1 生產(chǎn)者啟動
準備一個新的topic,并發(fā)送1W條消息
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 10000; i++) {
try {
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
2.2 消費者啟動
準備一個新的消費者組,消費topic下數(shù)據(jù)并記錄總條數(shù)
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
final AtomicInteger count = new AtomicInteger();
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
count.incrementAndGet();
System.out.printf("%s Receive New Messages End: %s %n", Thread.currentThread().getName(), msgs);
System.out.println(count.get());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
消費者消息記錄
2.3 執(zhí)行回溯
命令行執(zhí)行
mqadmin.cmd resetOffsetByTime -n 127.0.0.1:9876 -t TopicTest -g please_rename_unique_group_name_4 -s 1722240069000
以下為mqadmin.cmd的內(nèi)容,因此也可以直接通過調(diào)用MQAdminStartup的main方法執(zhí)行
MQAdminStartup手動執(zhí)行
代碼執(zhí)行:
public static void main(String[] args) {
String[] params = new String[]{"resetOffsetByTime","-n","127.0.0.1:9876","-t", "TopicTest", "-g", "please_rename_unique_group_name_4", "-s", "1722240069000"};
MQAdminStartup.main(params);
}
2.4 結(jié)果驗證
客戶端重置成功記錄
消費者重新消費記錄
2.5 驗證小結(jié)
從結(jié)果上來看,消費者offset被重置到了指定的時間戳位置,由于指定時間戳早于最早消息的創(chuàng)建時間,因此重新消費了所有未被刪除的消息。
那rocketmq究竟做了什么呢?
2.5.1 分析參數(shù)
動作標(biāo)識:resetOffsetByTime
額外參數(shù):
-n nameserver的地址
-t 指定topic名稱
-g 指定消費者組名稱
-s 指定回溯時間
2.5.2 思考
消息回溯思考
3 分析
以下源碼部分均出自4.2.0版本,展示代碼有所精簡。
3.1 策略模式,解析命令行
org.apache.rocketmq.tools.command.MQAdminStartup#main
/*根據(jù)動作標(biāo)識解析除對應(yīng)的處理類,我們本次請求實際處理策略類:ResetOffsetByTimeCommand*/
SubCommand cmd = findSubCommand(args[0]);
/*解析命令行*/
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
new PosixParser());
/*提交請求執(zhí)行*/
cmd.execute(commandLine, options, rpcHook);
3.2 創(chuàng)建客戶端,與服務(wù)端交互
org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand#execute
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
String group = commandLine.getOptionValue("g").trim();//消費者組
String topic = commandLine.getOptionValue("t").trim();//主題
String timeStampStr = commandLine.getOptionValue("s").trim();//重置時間戳
long timestamp = timeStampStr.equals("now") ? System.currentTimeMillis() : Long.parseLong(timeStampStr);//重置時間戳
boolean isC = false;//是否C客戶端
boolean force = true;//是否強制重置,這里提前解釋一下,有可能時間戳對應(yīng)的offset比當(dāng)前消費進度要大,強制的話會出現(xiàn)部分消息消費不到
if (commandLine.hasOption('f')) {
force = Boolean.valueOf(commandLine.getOptionValue("f").trim());
}
/*與nameserver以及broker交互的客戶端啟動*/
defaultMQAdminExt.start();
/*正式執(zhí)行命令*/
Map<MessageQueue, Long> offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, force, isC);
}
3.3 獲取topic對應(yīng)的broker地址,提交重置請求
org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetByTimestamp
public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
boolean isC)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
/*從nameserver處獲取broker地址*/
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
/*由于消息數(shù)據(jù)分區(qū)分片,topic下的messagequeue可能存在多個broker上,因此這是個列表*/
List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, Long>();
if (brokerDatas != null) {
for (BrokerData brokerData : brokerDatas) {
String addr = brokerData.selectBrokerAddr();
if (addr != null) {
/*循環(huán)與各個broker交互,執(zhí)行重置操作*/
Map<MessageQueue, Long> offsetTable =
this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce,
timeoutMillis, isC);
if (offsetTable != null) {
allOffsetTable.putAll(offsetTable);
}
}
}
}
return allOffsetTable;
}
3.4 與 nameserver交互獲取broker地址
org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#examineTopicRouteInfo
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
/*同樣的組裝參數(shù),請求碼:105*/
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
/*創(chuàng)建請求與nameserver交互*/
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
3.4.1 nameserver收到請求,獲取路由信息并返回
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
/*nameserver內(nèi)部存儲topic的路由信息*/
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
3.4.2 RouteInfoManager的核心屬性
//topic路由信息,根據(jù)這個做負載均衡,QueueData里面記錄brokerName
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//broke基本信息 名稱 所在集群信息 主備broke地址 brokerId=0表示master >0表示slave
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//集群信息,包含集群所有的broke信息
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//存活的broke信息,以及對應(yīng)的channel
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//broke的過濾類信息
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
3.5 與broker交互,執(zhí)行重置操作
org.apache.rocketmq.client.impl.MQClientAPIImpl#invokeBrokerToResetOffset
public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
throws RemotingException, MQClientException, InterruptedException {
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
requestHeader.setTimestamp(timestamp);
requestHeader.setForce(isForce);
/*同樣的組裝參數(shù),請求碼:222*/
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
if (isC) {
request.setLanguage(LanguageCode.CPP);
}
/*創(chuàng)建請求與broker交互,注意這里是同步invokeSync*/
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
if (response.getBody() != null) {
ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
return body.getOffsetTable();
}
}
broker收到請求,開始處理;
org.apache.rocketmq.broker.client.net.Broker2Client#resetOffset
public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
boolean isC) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
/*記錄下該消費者組消費topic下的隊列要重置到哪條offset*/
Map<MessageQueue/*隊列*/, Long/*offser*/> offsetTable = new HashMap<MessageQueue, Long>();
for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue();
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
mq.setTopic(topic);
mq.setQueueId(i);
/*broker可以獲取該topic下的consumergroup下的某個隊列的offset*/
long consumerOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);//消費者組當(dāng)前已經(jīng)消費的offset
if (-1 == consumerOffset) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("THe consumer group <%s> not exist", group));
return response;
}
long timeStampOffset;
if (timeStamp == -1) {
//沒有指定表示當(dāng)前隊列最大的offset
timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
} else {
//根據(jù)時間戳查到隊列下對應(yīng)的offset
timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
}
if (timeStampOffset < 0) {
//<0表示消息已經(jīng)被刪掉了
log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
timeStampOffset = 0;
}
/*如果isForce=false,則要重置的offset<當(dāng)前正在消費的offset才會重置。也過來,也就是說重置不僅會回溯,消費進度過慢也可以往后撥,加快消費進度*/
if (isForce || timeStampOffset < consumerOffset) {
offsetTable.put(mq, timeStampOffset);
} else {
offsetTable.put(mq, consumerOffset);
}
}
/*確定了要先重置的offset之后開始與客戶端交互,準備客戶端重置,請求碼220*/
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
requestHeader.setTimestamp(timeStamp);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
if (isC) {
// c++ language
ResetOffsetBodyForC body = new ResetOffsetBodyForC();
List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);
body.setOffsetTable(offsetList);
request.setBody(body.encode());
} else {
// other language
ResetOffsetBody body = new ResetOffsetBody();
body.setOffsetTable(offsetTable);
request.setBody(body.encode());
}
/*拿到與當(dāng)前broker建立連接的消費者組客戶端信息*/
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
//獲取長連接channel
ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
consumerGroupInfo.getChannelInfoTable();
for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
int version = entry.getValue().getVersion();
/*這里版本可以判斷,只有客戶端版本>3.0.7才支持重置*/
if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
try {
/*注意這里是只管發(fā)不管收,可以簡單理解為異步了invokeOneway*/
this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
topic, group, entry.getValue().getClientId());
} catch (Exception e) {
log.error("[reset-offset] reset offset exception. topic={}, group={}",
new Object[] {topic, group}, e);
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("the client does not support this feature. versinotallow="
+ MQVersion.getVersionDesc(version));
log.warn("[reset-offset] the client does not support this feature. versinotallow={}",
RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
return response;
}
}
} else {
String errorInfo =
String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
requestHeader.getGroup(),
requestHeader.getTopic(),
requestHeader.getTimestamp());
log.error(errorInfo);
response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
response.setRemark(errorInfo);
return response;
}
response.setCode(ResponseCode.SUCCESS);
ResetOffsetBody resBody = new ResetOffsetBody();
resBody.setOffsetTable(offsetTable);
response.setBody(resBody.encode());
return response;
}
3.6 消費客戶端收到請求,開始處理
org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset
public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
DefaultMQPushConsumerImpl consumer = null;
try {
/*根據(jù)消費者組找到對應(yīng)的消費實現(xiàn),即我們熟悉的DefaultMQPushConsumerImpl或者DefaultMQPullConsumerImpl*/
MQConsumerInner impl = this.consumerTable.get(group);
if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
consumer = (DefaultMQPushConsumerImpl) impl;
} else {
//由于PullConsumer消費進度自己控制,因此直接返回
log.info("[reset-offset] consumer dose not exist. group={}", group);
return;
}
consumer.suspend();//暫停消費
/*暫停消息拉取,以及待處理的消息緩存都清掉*/
ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
MessageQueue mq = entry.getKey();
if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
ProcessQueue pq = entry.getValue();
pq.setDropped(true);
pq.clear();
}
}
/*這里的等待實現(xiàn)比較簡單,與broker交互是同步,broker與consumer交互是異步,因此這里阻塞10秒是為了保證所有的consumer都在這里存儲offset并觸發(fā)reblance*/
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
}
Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
while (iterator.hasNext()) {
MessageQueue mq = iterator.next();
//獲取messagequeue應(yīng)該被重置的offset
Long offset = offsetTable.get(mq);
if (topic.equals(mq.getTopic()) && offset != null) {
try {
/*更新更新本地offset,這里注意集群模式是先修改本地,然后定時任務(wù)每五秒上報broker,而廣播模式offset在本地存儲,因此只需要修改消費者本地的offset即可*/
consumer.updateConsumeOffset(mq, offset);
consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
iterator.remove();
} catch (Exception e) {
log.warn("reset offset failed. group={}, {}", group, mq, e);
}
}
}
} finally {
if (consumer != null) {
/*重新觸發(fā)reblance,由于broker已經(jīng)重置的該消費者組的offset,因此重分配后以broker為準*/
consumer.resume();
}
}
}
4 核心流程
消息回溯全流程
5 總結(jié)
消息回溯功能是 RocketMQ 提供給業(yè)務(wù)方的定心丸,業(yè)務(wù)在出現(xiàn)任何無法恢復(fù)的問題后,都可以及時通過消息回溯來恢復(fù)業(yè)務(wù)或者訂正數(shù)據(jù)。特別是在流或者批計算的場景,重跑數(shù)據(jù)往往是常態(tài)。
RocketMQ 能實現(xiàn)消息回溯功能得益于其簡單的位點管理機制,可以很容易通過 mqadmin 工具重置位點。但要注意,由于topic的消息實際都是存儲在broker上,且有一定的刪除機制,因此首先要確認需要消息回溯的集群broker不能下線節(jié)點或者回溯數(shù)據(jù)被刪除之前的時間點,確保消息不會丟失。
6 延申
通過消息回溯的功能,我們可以任意向前或者向后撥動offset,那當(dāng)我們想要指定一個區(qū)間進行消費,這個時候怎么辦呢。比如當(dāng)消費進度過慢,我們選擇向后撥動offset,那就會有一部分未消費的消息出現(xiàn),針對這部分消息,我們應(yīng)該在空余時間把他消費完成,就需要指定區(qū)間來消費了。
其實通過上面代碼org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset中我們可以看到,對于DefaultMQPullConsumerImpl類型的消費者,消息重置是不生效的,這是因為DefaultMQPullConsumerImpl的消費進度完全由消費者來控制,那我們就可以采用拉模式來進行消費。
示例代碼:
public class PullConsumerLocalTest {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
private static final Map<MessageQueue, Pair<Long/*最小offset*/,Long/*最大offset*/>> QUEUE_OFFSE_SECTION_TABLE = new HashMap<>();
private static final Long MIN_TIMESTAMP = 1722240069000L;//最小時間戳
private static final Long MAX_TIMESTAMP = 1722240160000L;//最大時間戳
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
/*初始化待處理的offset*/
String topic = "TopicTest";
init(consumer, topic);
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
//check max offset and dosomething...
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static void init(DefaultMQPullConsumer consumer, String topic) throws MQClientException {
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
for (MessageQueue mq : mqs) {
long minOffset = consumer.searchOffset(mq, MIN_TIMESTAMP);
long maxOffset = consumer.searchOffset(mq, MAX_TIMESTAMP);
//記錄區(qū)間內(nèi)范圍內(nèi)最小以及最大的offset
QUEUE_OFFSE_SECTION_TABLE.put(mq, new Pair<>(minOffset, maxOffset));
//將最小offset寫為下次消費的初始offset
OFFSE_TABLE.put(mq, minOffset);
}
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
}
7 對比
方式 | 優(yōu)點 | 缺點 |
消費者本地消息表 | 業(yè)務(wù)完全可控 | 額外存儲開銷,重復(fù)消費需要單獨開發(fā) |
消息重置 | 無需業(yè)務(wù)修改,支持廣播/集群,順序/無序消息(有冪等操作的需要重置狀態(tài)) | 低版本3.0.7之前不支持 |
pull手動控制 | 消費進度完全可控 | 需要考慮offset維護,復(fù)雜度較高 |