高性能PHP框架Webman服務(wù)端實(shí)現(xiàn)流式輸出有哪些解決方案?
概念
Stream流式輸出是一種數(shù)據(jù)處理方式,它將數(shù)據(jù)以流的形式進(jìn)行傳輸和處理。在這種處理方式中,數(shù)據(jù)不再是集中存儲在某個(gè)地方,而是以分散的方式存儲在各個(gè)節(jié)點(diǎn)上,并不斷流動(dòng)。數(shù)據(jù)流的處理是在流動(dòng)的過程中完成的,因此能夠?qū)崟r(shí)地處理數(shù)據(jù),提高了數(shù)據(jù)處理效率。
流式輸出優(yōu)點(diǎn)
- 實(shí)時(shí)性:Stream流式輸出能夠?qū)崟r(shí)地處理數(shù)據(jù),減少了數(shù)據(jù)處理的延遲,使得數(shù)據(jù)處理的結(jié)果更加及時(shí)。
- 高效性:由于數(shù)據(jù)是分散存儲的,因此可以并行處理數(shù)據(jù),提高了數(shù)據(jù)處理效率。同時(shí),由于數(shù)據(jù)處理是在流動(dòng)的過程中完成的,可以避免數(shù)據(jù)的重復(fù)傳輸和處理。
- 可擴(kuò)展性:Stream流式輸出具有良好的可擴(kuò)展性,當(dāng)數(shù)據(jù)量增加時(shí),可以通過增加節(jié)點(diǎn)來擴(kuò)展系統(tǒng)的處理能力。
- 靈活性:Stream流式輸出可以靈活地處理各種類型的數(shù)據(jù),包括結(jié)構(gòu)化數(shù)據(jù)、半結(jié)構(gòu)化數(shù)據(jù)和非結(jié)構(gòu)化數(shù)據(jù)。
流式輸出應(yīng)用場景
- 實(shí)時(shí)數(shù)據(jù)分析:通過Stream流式輸出,可以對海量數(shù)據(jù)進(jìn)行實(shí)時(shí)分析,從而得到實(shí)時(shí)的分析結(jié)果。例如,在金融領(lǐng)域中,可以對股票交易數(shù)據(jù)進(jìn)行實(shí)時(shí)分析,得到實(shí)時(shí)的股票走勢預(yù)測。
- 實(shí)時(shí)推薦系統(tǒng):通過Stream流式輸出,可以根據(jù)用戶的實(shí)時(shí)行為數(shù)據(jù),推薦個(gè)性化的內(nèi)容。例如,在電商平臺上,可以根據(jù)用戶的瀏覽和購買行為,推薦相關(guān)的商品和活動(dòng)。
- 實(shí)時(shí)監(jiān)控系統(tǒng):通過Stream流式輸出,可以對各種類型的實(shí)時(shí)數(shù)據(jù)進(jìn)行監(jiān)控,如網(wǎng)絡(luò)流量、設(shè)備運(yùn)行狀態(tài)等。這種系統(tǒng)可以及時(shí)發(fā)現(xiàn)異常情況,并采取相應(yīng)的措施進(jìn)行處理。
- 語音識別和自然語言處理:通過Stream流式輸出,可以對語音數(shù)據(jù)進(jìn)行實(shí)時(shí)識別和處理,實(shí)現(xiàn)語音轉(zhuǎn)文字、機(jī)器翻譯等功能。這種技術(shù)可以大大提高語音識別的準(zhǔn)確性和實(shí)時(shí)性。
- 物聯(lián)網(wǎng)數(shù)據(jù)處理:物聯(lián)網(wǎng)設(shè)備會產(chǎn)生大量的實(shí)時(shí)數(shù)據(jù),通過Stream流式輸出可以對這些數(shù)據(jù)進(jìn)行實(shí)時(shí)處理和分析,從而更好地了解設(shè)備的運(yùn)行狀況和環(huán)境情況。
流式輸出實(shí)現(xiàn)解決方案
webman SSE
SSE也就是Server-sent Events,是一種服務(wù)端推送技術(shù)。它的本質(zhì)是客戶端發(fā)送一個(gè)攜帶Accept: text/event-stream 頭的http請求后,連接不關(guān)閉,服務(wù)端可以在這個(gè)連接上不斷的給客戶端推送數(shù)據(jù)。
相關(guān)文檔:https://www.workerman.net/doc/workerman/http/SSE.html
process/EventStreamProcess.php自定義進(jìn)程
<?php
/**
* @desc EventStreamProcess.php
* @author Tinywan(ShaoBo Wan)
* @date 2024/10/31 下午9:12
*/
declare(strict_types=1);
namespace process;
use Workerman\Connection\TcpConnection;
use Workerman\Protocols\Http\Request;
use Workerman\Protocols\Http\ServerSentEvents;
use Workerman\Protocols\Http\Response;
use Workerman\Timer;
class EventStreamProcess
{
/**
* @desc onMessage
* @param TcpConnection $connection
* @param Request $request
* @return void
* @author Tinywan(ShaoBo Wan)
*/
public function onMessage(TcpConnection $connection, Request $request)
{
// 如果Accept頭是text/event-stream則說明是SSE請求
if ($request->header('accept') === 'text/event-stream') {
// 首先發(fā)送一個(gè) Content-Type: text/event-stream 頭的響應(yīng)
$connection->send(new Response(200, [
'Content-Type' => 'text/event-stream',
'Access-Control-Allow-Origin' => '*'
]));
// 定時(shí)向客戶端推送數(shù)據(jù)
$timerId = Timer::add(2, function () use ($connection, &$timerId){
// 連接關(guān)閉的時(shí)候要將定時(shí)器刪除,避免定時(shí)器不斷累積導(dǎo)致內(nèi)存泄漏
if ($connection->getStatus() !== TcpConnection::STATUS_ESTABLISHED) {
Timer::del($timerId);
return;
}
// 發(fā)送message事件,事件攜帶的數(shù)據(jù)為hello,消息id可以不傳
$connection->send(new ServerSentEvents(['event' => 'message', 'data' => '開源技術(shù)小棧 '.date('Y-m-d H:i:s'), 'id'=>time()]));
});
return;
}
$connection->send('ok');
}
}
config/process.php 加入配置。
return [
// ... 其它配置 ...
'event-stream' => [
'listen' => 'http://0.0.0.0:8288',
'handler' => \process\EventStreamProcess::class
]
];
啟動(dòng)webman。
php start.php start
Workerman[start.php] start in DEBUG mode
-------------------------------------------------- WORKERMAN --------------------------------------------------
Workerman version:4.1.15 PHP version:8.2.10 Event-Loop:\Workerman\Events\Event
--------------------------------------------------- WORKERS ---------------------------------------------------
proto user worker listen processes status
tcp root webman http://0.0.0.0:8217 24 [OK]
tcp root monitor none 1 [OK]
tcp root event-stream http://0.0.0.0:8288 1 [OK]
---------------------------------------------------------------------------------------------------------------
Press Ctrl+C to stop. Start success.
前端測試代碼;
<!DOCTYPE html>
<html>
<head>
<title>Title</title>
</head>
<body>
<h2>使用webman創(chuàng)建text/eventstream響應(yīng)</h2>
</body>
<script>
var source = new EventSource('http://127.0.0.1:8288');
source.addEventListener('message', function (event) {
var data = event.data;
console.log(data); // 輸出:開源技術(shù)小棧 2024-10-31 21:40:57
}, false);
</script>
</html>
效果圖;
圖片
圖片
webman Http Chunk
process/HttpChunkStreamProcess.php自定義進(jìn)程;
<?php
/**
* @desc webman Http Chunk 方案
* @author Tinywan(ShaoBo Wan)
*/
declare(strict_types=1);
namespace process;
use Workerman\Connection\TcpConnection;
use Workerman\Protocols\Http\Request;
use Workerman\Protocols\Http\Chunk;
use Workerman\Protocols\Http\Response;
use Workerman\Timer;
class HttpChunkStreamProcess
{
/**
* @param TcpConnection $connection
* @param Request $request
* @return void
* @author Tinywan(ShaoBo Wan)
*/
public function onMessage(TcpConnection $connection, Request $request)
{
// 首先發(fā)送一個(gè)帶Transfer-Encoding: chunked頭的Response響應(yīng)
$total_count = 10;
$connection->send(new Response(200, [
'Transfer-Encoding' => 'chunked',
'Access-Control-Allow-Origin' => '*'
], "開源技術(shù)小棧。共【{$total_count}】段數(shù)據(jù)<br>"));
$timer_id = Timer::add(2, function () use ($connection, &$timer_id, $total_count){
static $count = 0;
// 連接關(guān)閉的時(shí)候要將定時(shí)器刪除,避免定時(shí)器不斷累積導(dǎo)致內(nèi)存泄漏
if ($connection->getStatus() !== TcpConnection::STATUS_ESTABLISHED) {
Timer::del($timer_id);
return;
}
if ($count++ >= $total_count) {
// 發(fā)送一個(gè)空的''代表結(jié)束響應(yīng)
$connection->send(new Chunk(''));
return;
}
// 發(fā)送chunk數(shù)據(jù)
$connection->send(new Chunk("開源技術(shù)小棧。第【{$count}】段數(shù)據(jù)<br>"));
});
}
}
config/process.php 加入配置;
return [
// ... 其它配置 ...
'event-stream' => [
'listen' => 'http://0.0.0.0:8289',
'handler' => \process\HttpChunkStreamProcess::class
]
];
瀏覽器訪問 http://127.0.0.1:8289 頁面會定時(shí)輸出以下數(shù)據(jù);
圖片
相關(guān)文檔:https://www.workerman.net/doc/workerman/http/response.html#%E5%8F%91%E9%80%81http%20chunk%E6%95%B0%E6%8D%AE
webman push插件
官方文檔:https://www.workerman.net/plugin/2