自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

StackExchange.Redis跑起來,為什么這么溜?

數(shù)據(jù)庫 Redis
在很多的高性能IO庫中,使用的都是管道技術(shù),比如Java的NIO、Windows的IOCP、Linux的epoll,本質(zhì)上都是通過一個類似管道的東西來統(tǒng)籌管理數(shù)據(jù)傳輸,減少不必要的調(diào)用和檢查,達到高效通信的目的。

StackExchange.Redis 是一個高性能的 Redis 客戶端庫,主要用于 .NET 環(huán)境下與 Redis 服務器進行通信,大名鼎鼎的stackoverflow 網(wǎng)站就使用它。它使用異步編程模型,能夠高效處理大量請求。支持 Redis 的絕大部分功能,包括發(fā)布/訂閱、事務、Lua 腳本等。由 StackExchange 團隊維護,質(zhì)量和更新頻率有保障。這篇文章就來給大家分享下 StackExchange.Redis 為什么玩的這么溜。

我將通過分析 StackExchange.Redis 中的同步調(diào)用和異步調(diào)用邏輯,來給大家一步步揭開它的神秘面紗。

同步API

向Redis發(fā)送消息

Redis 客戶端的 Get、Set 等操作都會封裝成為 Message,操作最終會走到這個方法,我們先大致看下代碼:

ConnectionMultiplexer.cs

internal T? ExecuteSyncImpl<T>(Message message, ResultProcessor<T>? processor, ServerEndPoint? server, T? defaultValue = default)
{
   ...
        // 創(chuàng)建一個ResultBox對象,這個對象將會放到Message中用來承載Redis的返回值
        var source = SimpleResultBox<T>.Get();

        WriteResult result;

        // 鎖住ResultBox對象,下邊會有大用
        lock (source)
        {
            // 將Message發(fā)送到Redis服務器
            result = TryPushMessageToBridgeSync(message, processor, source, ref server);
           
            ...
            
            // 調(diào)用 Monitor.Wait 釋放對 ResultBox 對象的鎖,同時讓當前線程停在這里
            if (Monitor.Wait(source, TimeoutMilliseconds))
            {
                Trace("Timely response to " + message);
            }
            ...
        }

        // 最終從 ResultBox 取出結(jié)果
        var val = source.GetResult(out var ex, canRecycle: true);
        ...
        return val;
    ...
}

仔細說一下大概的處理邏輯。

  1. 先構(gòu)造一個ResultBox對象,用來承載Message的執(zhí)行結(jié)果。
  2. 然后嘗試把這個Message推送到Redis服務器,注意程序內(nèi)部會把當前Message和ResultBox做一個綁定。
  3. 等待Redis服務器返回,返回結(jié)果賦值到ResultBox對象上。
  4. 最后從ResultBox對象中取出結(jié)果,返回給調(diào)用方。

注意這里用到了鎖(lock),還使用了Monitor.Wait,這是什么目的呢?

Monitor.Wait 一般和  Monitor.Pulse 搭配使用,用來在線程間通信。

  1. 調(diào)用 Monitor.Wait 時,lock住的ResultBox會被釋放,同時當前線程就會掛起,停在這里。
  2. Redis服務器返回結(jié)果后,把結(jié)果數(shù)據(jù)賦值到ResultBox上。
  3. 其它線程lock住這個ResultBox,調(diào)用Monitor.Pulse,之前被掛起的線程繼續(xù)執(zhí)行。

通過這種方式,我們就達成了一個跨線程的同步調(diào)用效果。

為什么會跨線程呢?直接調(diào)用Redis等著返回結(jié)果不行嗎?

因為 StackExchange.Redis 底層使用了 System.IO.Pipelines 來優(yōu)化網(wǎng)絡IO,這個庫采用了生產(chǎn)者/消費者的異步模式來處理網(wǎng)絡請求和響應,發(fā)送數(shù)據(jù)和接收數(shù)據(jù)很可能是在不同的線程中。

以上就是向Redis服務器發(fā)送消息的一個宏觀理解,但是這里有一個隱藏的問題:

異步情況下怎么把Redis的返回結(jié)果和消息對應上?

我們繼續(xù)跟蹤向 Redis 服務器發(fā)送 Message 的代碼,也就是深入 TryPushMessageToBridgeSync 的內(nèi)部。

一路跟隨,代碼會走到這里:

PhysicalBridge.cs

internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical, Message message)
{
    ...
    bool gotLock = false;

    try
    {
        ...
        // 獲取單寫鎖,同時只能寫一個Message
        gotLock = _singleWriterMutex.Wait(0);
        if (!gotLock)
        {
            gotLock = _singleWriterMutex.Wait(TimeoutMilliseconds);
            if (!gotLock) return TimedOutBeforeWrite(message);
        }

        ...

        // 繼續(xù)調(diào)用內(nèi)部方法寫數(shù)據(jù)
        WriteMessageInsideLock(physical, message);
        ...

        // 刷新網(wǎng)絡管道,將數(shù)據(jù)通過網(wǎng)絡發(fā)出去
        physical.FlushSync(false, TimeoutMilliseconds);
    }
    catch (Exception ex) { ... }
    finally
    {
        if (gotLock)
        {
            _singleWriterMutex.Release();
        }
    }
}

這里邊用信號量做了一個鎖,保證同時只有一個寫操作。

那么為什么要保證同時只能一個寫操作呢?

我們繼續(xù)跟蹤代碼:

private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection connection, Message message)
{
  ...
  // 把消息添加到隊列
  connection.EnqueueInsideWriteLock(message);

  // 把消息寫到網(wǎng)絡接口
  message.WriteTo(connection);
  ...
}

這里有兩個操作,一是將Message添加到隊列,二是向網(wǎng)絡接口寫數(shù)據(jù)。

保證同時只有一個寫操作,或者加鎖的目的,就是讓它倆一起完成,能對應起來,不會錯亂。

那么我們還要繼續(xù)問:寫隊列和寫網(wǎng)絡對應起來有什么用?

這個問題不好回答,我們先來看看這兩個操作都是干什么用的?

為什么要把Message寫入隊列?

同步IO可以直接拿到當前消息的返回結(jié)果,但是 System.IO.Pipelines 底層是異步操作,當處理結(jié)果從Redis返回時,我們需要把它對應到一個Messge上。加入隊列就是為了方便找到對應的消息。至于為什么用隊列,而不用集合,因為隊列能夠很好的滿足這個需求,下邊會有說明。

寫隊列代碼在這里:

PhysicalConnection.cs

internal void EnqueueInsideWriteLock(Message next)
{
    ...

    bool wasEmpty;
    lock (_writtenAwaitingResponse)
    {
        ...
        _writtenAwaitingResponse.Enqueue(next);
    }
    ...
}

入隊列需要先加鎖,因為可能是多線程環(huán)境下操作,Queue自身不是線程安全的。

再看一下把消息寫到網(wǎng)絡接口,這個的目的就是把消息發(fā)送到Redis服務器,看一下代碼:

PhysicalConnection.cs

internal static void WriteUnifiedPrefixedString(PipeWriter? maybeNullWriter, byte[]? prefix, string? value)
{
    ...
    // writer 就是管道的寫入接口
    var span = writer.GetSpan(3 + Format.MaxInt32TextLen);
    span[0] = (byte)'$';
    int bytes = WriteRaw(span, totalLength, offset: 1);
    writer.Advance(bytes);

    if (prefixLength != 0) writer.Write(prefix);
    if (encodedLength != 0) WriteRaw(writer, value, encodedLength);
    WriteCrlf(writer);
   ...
}

源碼最底層是通過 System.IO.Pipelines 中的 PipeWriter 把 Message 命令發(fā)送到Redis服務器的,這段代碼比較復雜,大家先大概知道做什么用的就行了。

到此,向Redis發(fā)送消息就處理完成了。

現(xiàn)在我們已經(jīng)大概了解向Redis服務器發(fā)送消息的過程:在最上層通過Monitor模擬了同步操作,在最底層使用了高效的異步IO,為了適配同步和異步,寫操作內(nèi)含了兩個子操作:寫隊列和寫網(wǎng)絡。

但是我們?nèi)匀徊荒芑卮鹨粋€問題:寫隊列和寫網(wǎng)絡為什么要放到一個鎖中執(zhí)行?或者說為什么要保證同時只能一個寫操作?

要回答這個問題,我們還得繼續(xù)看程序?qū)edis響應結(jié)果的處理。

處理Redis響應結(jié)果

Redis 客戶端與 Redis 服務器建立連接時,會創(chuàng)建一個死循環(huán),持續(xù)的從 System.IO.Pipelines 的管道中讀取Redis 服務器返回的消息,并進行相應的處理。最上層方法就是這個 ReadFromPipe:

PhysicalConnection.cs

private async Task ReadFromPipe()
{
  ...
  while (true)
  {
      ...
      // 沒有新數(shù)據(jù)從Redis服務器返回時,ReadAsync會等在這里
      readResult = await input.ReadAsync().ForAwait();
      ...
    
      var buffer = readResult.Buffer;
      ...
     
      if (!buffer.IsEmpty)
      {
          // 這里邊解析數(shù)據(jù),并賦值到相關(guān)對象上
          handled = ProcessBuffer(ref buffer);
      }
  }
}

對返回數(shù)據(jù)的處理重點在這個 ProcessBuffer 方法中。它會先對數(shù)據(jù)進行一個簡單的解析,然后再調(diào)用 MatchResult,從字面義上看就是匹配結(jié)果,匹配到那個結(jié)果呢?

private int ProcessBuffer(ref ReadOnlySequence<byte> buffer)
{
  ...
  var reader = new BufferReader(buffer);
  var result = TryParseResult(_protocol >= RedisProtocol.Resp3, _arena, in buffer, ref reader, IncludeDetailInExceptions, this);
  ...
  MatchResult(result);
  ...
}

還記得我們在上邊向Redis發(fā)送Message前,先創(chuàng)建了一個 ResultBox 對象,匹配的就是它。

怎么找到對應的 ResultBox 對象呢?

看下邊的代碼,程序從隊列中取出了一個Message 實例,就是要匹配到這個 Message 實例關(guān)聯(lián)的ResultBox。

private void MatchResult(in RawResult result)
 {
     ...

     // 從隊列中取出最早的一條Redis操作消息
     lock (_writtenAwaitingResponse)
     {
         if (!_writtenAwaitingResponse.TryDequeue(out msg))
         {
             throw new InvalidOperationException("Received response with no message waiting: " + result.ToString());
         }
     }
     ...

     // 將Redis返回的結(jié)果設(shè)置到取出的消息中
     if (msg.ComputeResult(this, result))
     {
         _readStatus = msg.ResultBoxIsAsync ? ReadStatus.CompletePendingMessageAsync : ReadStatus.CompletePendingMessageSync;

         // 完成Redis操作
         msg.Complete();
     }
     ...
  }

為什么從隊列取出的 Message 就一定能對應到 Redis 服務器當前返回的結(jié)果呢?

要破案了,還記得上邊的那個未解問題嗎:為什么要保證同時只能一個寫操作?

我們每次操作Redis都是:先把Message壓入隊列,然后再發(fā)送到Redis服務器,這兩個操作緊密的綁定在一起;而Redis服務器是單線程順序處理的,最先返回的就是最早壓入隊列的。加上每次只有一個寫操作的控制,我們就能保證最先寫入隊列的(也就是最先發(fā)到Redis服務器的)Message,就能對應到最先從Redis服務器返回的數(shù)據(jù)。

上面這段程序中的 msg.ComputeResult 就是用來將 Redis 最新返回的數(shù)據(jù)賦值到最新從隊列中拿出來的  Message 實例中。

現(xiàn)在 Message 實例 已經(jīng)獲取到了 Redis返回結(jié)果,還記得之前的發(fā)送線程一直在掛起等待嗎?

下邊的 msg.Complete 就是來讓發(fā)送線程恢復執(zhí)行的,看這個代碼 :

Message.cs(Message)

public void Complete()
 {
     ...
     // ResultBox激活繼續(xù)處理
     currBox?.ActivateContinuations();
 }

還有一層封裝,繼續(xù)看這個代碼:

ResultBox.cs(SimpleResultBox)。

void IResultBox.ActivateContinuations()
 {
     lock (this)
     { 
         // 通知等待Redis響應的線程,Redis返回結(jié)果了,請繼續(xù)你的表演
         Monitor.PulseAll(this);
     }
     ...
 }

Monitor.PulseAll 一出,發(fā)送線程立馬恢復執(zhí)行,向調(diào)用方返回執(zhí)行結(jié)果。

一次同步調(diào)用就這樣完成了。

異步API

異步API和同步API使用相同的通信底層,包括寫隊列和寫網(wǎng)絡管道的處理,只是在處理返回值的方式上存在不同。大家可以看一下異步和同步調(diào)試堆棧的對比圖:

圖片圖片

圖片圖片

執(zhí)行到 PhysicalBridge.WriteMessageInsideLock 這一步時處理就同步了。這一步的代碼上邊也貼過了,這里再給大家看看:其中的主要邏輯就是寫隊列和寫網(wǎng)絡管道。

private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection connection, Message message)
{
  ...
  // 把消息添加到隊列
  connection.EnqueueInsideWriteLock(message);

  // 把消息寫到網(wǎng)絡接口
  message.WriteTo(connection);
  ...
}

向Redis發(fā)送消息

我們再簡單看看異步API中是如何發(fā)送消息的,看代碼:

internal Task<T?> ExecuteAsyncImpl<T>(Message? message, ResultProcessor<T>? processor, object? state, ServerEndPoint? server)
{
    ...
    // 創(chuàng)建一個Task執(zhí)行狀態(tài)跟蹤對象
    TaskCompletionSource<T?>? tcs = null;
    
    // 創(chuàng)建一個ResultBox對象,這個對象將會放到Message中用來承載Redis的返回值
    // 異步這里特別將 ResultBox 和 TaskCompletionSource 綁定到了一起
    // 獲取到Redis服務器返回的數(shù)據(jù)后,TaskCompletionSource 的執(zhí)行狀態(tài)將被更新為完成
    IResultBox<T?>? source = null;
    if (!message.IsFireAndForget)
    {
        source = TaskResultBox<T?>.Create(out tcs, state);
    }

    // 將Message消息發(fā)送到 Redis服務器
    var write = TryPushMessageToBridgeAsync(message, processor, source!, ref server);
   
    ...

    // 返回Task,調(diào)用方可以 await
    return tcs.Task;
}

相比同步API,這里多創(chuàng)建了一個 TaskCompletionSource 的實例,它用來跟蹤異步任務的執(zhí)行狀態(tài),程序會在接收到Redis服務器的返回數(shù)據(jù)時,將 TaskCompletionSource 的狀態(tài)更新為完成執(zhí)行。

里邊的代碼我就不展開講了,大家有興趣的可以按照上方我截圖的調(diào)用堆棧去跟蹤下。

處理Redis響應結(jié)果

異步API和同步API使用同一個死循環(huán)方法:ReadFromPipe,程序啟動時也只有這一個死循環(huán)在運行。

代碼上邊都講過了,這里只說下最后“ResultBox激活繼續(xù)處理”的部分,這個 ResultBox 和同步調(diào)用的 ResultBox 略有不同,看代碼:

void IResultBox.ActivateContinuations()
{
   ...
   ActivateContinuationsImpl();
}

private void ActivateContinuationsImpl()
{
    var val = _value;
    ...
    TrySetResult(val);
    ...
}
public bool TrySetResult(TResult result)
{
    // 設(shè)置異步任務執(zhí)行完成
    bool rval = _task.TrySetResult(result);
    ...
    return rval;
}

最重要的就是 _task.TrySetResult 這句,這里的 _task 就是發(fā)起異步調(diào)用時創(chuàng)建的 TaskCompletionSource 實例,TrySetResult 的作用就是設(shè)置異步任務執(zhí)行完成,對應的 await 代碼就可以繼續(xù)向下執(zhí)行了。

await client.SetAsync("hello", "fireflysoft.net");

// 繼續(xù)執(zhí)行下邊的代碼
...

總結(jié)

總體執(zhí)行邏輯

通過對同步API、異步API的執(zhí)行邏輯分析,我這里總結(jié)了一張圖,可以讓大家快速的理清其中的處理邏輯。

圖片圖片

我再用文字描述下這個執(zhí)行邏輯:

1、無論是同步調(diào)用還是異步調(diào)用,StackExchange.Redis 底層都是先會創(chuàng)建一個 Message 對象;每個 Message 對象都會關(guān)聯(lián)一個ResultBox對象(同步和異步調(diào)用對應的ResultBox對象略有不同),這個對象用來承載Redis執(zhí)行結(jié)果;

2、然后程序會把Message存入隊列、發(fā)送到網(wǎng)絡IO管道,寫隊列和寫網(wǎng)絡IO放到了一個互斥鎖中,同時只有一個Message寫入,這是為了保證收到Redis響應時正好對應隊列中的第一條數(shù)據(jù)。

執(zhí)行完這些操作后,API會等待,但是同步調(diào)用和異步調(diào)用等待的方式不同,同步會掛起線程等待其它線程同步結(jié)果,異步會使用await等待Task執(zhí)行結(jié)果;

3、Redis 命令被發(fā)送到網(wǎng)絡,抵達Redis服務器

4、接收到Redis服務器的響應數(shù)據(jù),這些數(shù)據(jù)會放到網(wǎng)絡IO管道中。

5、有一個線程持續(xù)監(jiān)聽IO管道中收到的數(shù)據(jù),一旦拿到數(shù)據(jù),就去隊列中取出一個Message,把服務器返回的數(shù)據(jù)寫到這個Message的ResultBox中。

給ResultBox賦值完,程序還會通知等待的API繼續(xù)執(zhí)行,同步調(diào)用是通過線程通信的方式通知,異步調(diào)用是通過更新Task的執(zhí)行結(jié)果狀態(tài)來通知。

最后API從ResultBox中取出數(shù)據(jù)返回給調(diào)用方。

管道技術(shù)

無論是同步調(diào)用還是異步調(diào)用,它們的底層通信方式都統(tǒng)一到了管道技術(shù),這是 StackExchange.Redis 性能出類拔萃的根基,這部分就專門來介紹下。

這里說的管道技術(shù)指的是使用System.IO.Pipelines庫,這個庫提供了一種高效的方式來優(yōu)化流式數(shù)據(jù)處理,具備更高的吞吐量、更低的延遲。具體用途:網(wǎng)絡上,可以用來構(gòu)建高性能的TCP或UDP服務器;對于大文件的讀寫操作,使用Pipelines可以減少內(nèi)存占用,提高處理速度。

PipeWriter和PipeReader是System.IO.Pipelines中的核心組件,它們用于構(gòu)建管道處理數(shù)據(jù)流。這里分享個例子:

using System;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;

class Program
{
    static async Task Main(string[] args)
    {
        // 創(chuàng)建一個管道
        var pipe = new Pipe();

        // 啟動一個任務來寫入數(shù)據(jù)
        var writing = FillPipeAsync(pipe.Writer);

        // 啟動一個任務來讀取數(shù)據(jù)
        var reading = ReadPipeAsync(pipe.Reader);

        await Task.WhenAll(reading, writing);
    }

    private static async Task FillPipeAsync(PipeWriter writer)
    {
        for (int i = 0; i < 5; i++)
        {
            // 寫入一些數(shù)據(jù)到管道中
            string message = $"Message {i}";
            byte[] messageBytes = Encoding.UTF8.GetBytes(message);

            // 將數(shù)據(jù)寫入管道
            Memory<byte> memory = writer.GetMemory(messageBytes.Length);
            messageBytes.CopyTo(memory);
            writer.Advance(messageBytes.Length);

            // 通知管道有數(shù)據(jù)寫入
            FlushResult result = await writer.FlushAsync();

            if (result.IsCompleted)
            {
                break;
            }

            // 模擬一些延遲
            await Task.Delay(500);
        }

        // 告訴管道我們已經(jīng)完成寫入
        await writer.CompleteAsync();
    }

    private static async Task ReadPipeAsync(PipeReader reader)
    {
        while (true)
        {
            // 讀取管道中的數(shù)據(jù)
            ReadResult result = await reader.ReadAsync();
            var buffer = result.Buffer;

            // 處理讀取到的數(shù)據(jù)
            foreach (var segment in buffer)
            {
                string message = Encoding.UTF8.GetString(segment.Span);
                Console.WriteLine($"Read: {message}");
            }

            // 告訴管道我們已經(jīng)處理了這些數(shù)據(jù)
            reader.AdvanceTo(buffer.End);

            // 如果沒有更多數(shù)據(jù)可以讀取,退出循環(huán)
            if (result.IsCompleted)
            {
                break;
            }
        }

        // 告訴管道我們已經(jīng)完成讀取
        await reader.CompleteAsync();
    }
}

在這個示例中,我們創(chuàng)建了一個 Pipe 對象,并分別啟動了兩個任務來寫入和讀取數(shù)據(jù):

  1. FillPipeAsync 方法中,使用 PipeWriter 寫入數(shù)據(jù)到管道。
  2. ReadPipeAsync 方法中,使用 PipeReader 從管道中讀取數(shù)據(jù)并處理。

通過這種方式,我們可以高效地處理流式數(shù)據(jù),同時利用管道的優(yōu)勢來提高吞吐量和降低延遲。

其實在很多的高性能IO庫中,使用的都是管道技術(shù),比如Java的NIO、Windows的IOCP、Linux的epoll,本質(zhì)上都是通過一個類似管道的東西來統(tǒng)籌管理數(shù)據(jù)傳輸,減少不必要的調(diào)用和檢查,達到高效通信的目的。

責任編輯:武曉燕 來源: 螢火架構(gòu)
相關(guān)推薦

2009-04-29 14:40:17

2023-01-31 07:42:29

代碼JDKMaven

2023-03-02 23:09:53

Node.jsC++JS

2010-07-13 09:31:08

RubyRuby on Rai

2021-01-22 14:03:34

Flutter系統(tǒng)鴻蒙

2011-05-04 11:26:47

優(yōu)化

2021-01-12 11:12:58

大數(shù)據(jù)智慧交通

2023-08-03 09:02:32

LangChain開發(fā)GLM

2022-12-06 09:03:44

代碼fork系統(tǒng)

2015-08-04 17:46:19

戴爾anycloud云計算

2022-01-10 10:23:07

瀏覽器Vitenode

2012-05-15 13:29:20

HTML5

2017-11-17 15:25:02

Java線程安全

2015-05-18 13:49:37

OpenStack云計算應用

2021-11-10 10:00:48

鴻蒙HarmonyOS應用

2014-04-18 17:12:00

樂跑手環(huán)

2023-03-21 08:02:36

Redis6.0IO多線程

2024-03-01 17:01:15

GraphQL后端

2019-12-18 09:42:19

技術(shù) Linux網(wǎng)絡

2020-08-14 09:11:29

RedisQPS數(shù)據(jù)庫
點贊
收藏

51CTO技術(shù)棧公眾號