GO 實現(xiàn)高并發(fā)高可用分布式系統(tǒng):Log微服務(wù)的實現(xiàn)
本文轉(zhuǎn)載自微信公眾號「Coding迪斯尼」,作者陳屹。轉(zhuǎn)載本文請聯(lián)系Coding迪斯尼公眾號。
在大數(shù)據(jù)時代,具備高并發(fā),高可用,理解微服務(wù)系統(tǒng)設(shè)計的人員需求很大,如果你想從事后臺開發(fā),在JD的描述中最常見的要求就是有所謂的“高并發(fā)”系統(tǒng)開發(fā)經(jīng)驗。但我發(fā)現(xiàn)在市面上并沒有直接針對“高并發(fā)”,“高可用”的教程,你搜到的資料往往都是只言片語,要不就是闡述那些令人摸不著頭腦的理論。但是技術(shù)的掌握必須從實踐中來,我找了很久發(fā)現(xiàn)很少有指導(dǎo)人動手實踐基于微服務(wù)的高并發(fā)系統(tǒng)開發(fā),因此我希望結(jié)合自己的學(xué)習(xí)和實踐經(jīng)驗跟大家分享一下這方面的技術(shù),特別是要強調(diào)具體的動手實踐來理解和掌握分布式系統(tǒng)設(shè)計的理論和技術(shù)。
所謂“微服務(wù)”其實沒什么神奇的地方,它只不過是把我們原來聚合在一起的模塊分解成多個獨立的,基于服務(wù)器程序存在的形式,假設(shè)我們開發(fā)的后臺系統(tǒng)分為日志,存儲,業(yè)務(wù)邏輯,算法邏輯等模塊,以前這些模塊會聚合成一個整體形成一個復(fù)雜龐大的應(yīng)用程序:
這種方式存在很多問題,第一是過多模塊糅合在一起會使得系統(tǒng)設(shè)計過于復(fù)雜,因為模塊直接存在各種邏輯耦合,這使得隨著時間的推移,系統(tǒng)的開發(fā)和維護變得越來越困難。第二是系統(tǒng)越來越脆弱,只要其中一個模塊發(fā)送錯誤或奔潰,整個系統(tǒng)可能就會垮塌。第三是可擴展性不強,系統(tǒng)很難通過硬件性能的增強而實現(xiàn)相應(yīng)擴展。
要實現(xiàn)高并發(fā),高可用,其基本思路就是將模塊拆解,然后讓他們成為獨立運行的服務(wù)器程序,各個模塊之間通過消息發(fā)送的方式完成配合:
這種模式的好處在于:1,模塊之間解耦合,一個模塊出問題對整個系統(tǒng)影響很小。2,可擴展,高可用,我們可以將模塊部署到不同服務(wù)器上,當(dāng)流量增加,我們只要簡單的增加服務(wù)器數(shù)量就能使得系統(tǒng)的響應(yīng)能力實現(xiàn)同等擴展。3,魯棒性增強,由于模塊能備份多個,其中一個模塊出問題,請求可以重定向到其他同樣模塊,于是系統(tǒng)的可靠性能大大增強。
當(dāng)然任何收益都有對應(yīng)代價,分布式系統(tǒng)的設(shè)計開發(fā)相比于原來的聚合性系統(tǒng)會多出很多難點。例如負(fù)載均衡,服務(wù)發(fā)現(xiàn),模塊協(xié)商,共識達(dá)成等,分布式算法強調(diào)的就是這些問題的解決,但是理論總是抽象難以理解,倘若不能動手實現(xiàn)一個高可用高并發(fā)系統(tǒng),你看多少理論都是霧里看花,越看越糊涂,所以我們必須通過動手實踐來理解和掌握理論,首先我們從最簡單的服務(wù)入手,那就是日志服務(wù),我們將使用GO來實現(xiàn)。
首先創(chuàng)建根目錄,可以命名為go_distributed_system,后面所有服務(wù)模塊都實現(xiàn)在該目錄下,然后創(chuàng)建子目錄proglog,進(jìn)去后我們再創(chuàng)建子目錄internel/server/在這里我們實現(xiàn)日志服務(wù)的邏輯模塊,首先在internel/server下面執(zhí)行初始化命令:
- go mod init internal/server
這里開發(fā)的模塊會被其他模塊引用,所以我們需要創(chuàng)建mod文件。首先我們需要完成日志系統(tǒng)所需的底層數(shù)據(jù)結(jié)構(gòu),創(chuàng)建log.go文件,相應(yīng)代碼如下:
- package server
- import (
- "fmt"
- "sync"
- )
- type Log struct {
- mu sync.Mutex
- records [] Record
- }
- func NewLog() *Log {
- return &Log{ch : make(chan Record),}
- }
- func(c *Log) Append(record Record) (uint64, error) {
- c.mu.Lock()
- defer c.mu.Unlock()
- record.Offset = uint64(len(c.records))
- c.records = append(c.records, record)
- return record.Offset, nil
- }
- func (c *Log) Read(offset uint64)(Record, error) {
- c.mu.Lock()
- defer c.mu.Unlock()
- if offset >= uint64(len(c.records)) {
- return Record{}, ErrOffsetNotFound
- }
- return c.records[offset], nil
- }
- type Record struct {
- Value []byte `json:"value"`
- Offset uint64 `json:"offset"`
- }
- var ErrOffsetNotFound = fmt.Errorf("offset not found")
由于我們的日志服務(wù)將以http服務(wù)器程序的方式接收日志讀寫請求,因此多個讀或?qū)懻埱髸瑫r執(zhí)行,所以我們需要對records數(shù)組進(jìn)行互斥操作,因此使用了互斥鎖,在每次讀取records數(shù)組前先獲得鎖,這樣能防止服務(wù)在同時接收多個讀寫請求時破壞掉數(shù)據(jù)的一致性。
所有的日志讀寫請求會以http POST 和 GET的方式發(fā)起,數(shù)據(jù)通過json來封裝,所以我們下面將創(chuàng)建一個http服務(wù)器對象,新建文件http.go,完成如下代碼:
- package server
- import (
- "encoding/json"
- "net/http"
- "github.com/gorilla/mux"
- )
- func NewHttpServer(addr string) *http.Server {
- httpsrv := newHttpServer()
- r := mux.NewRouter()
- r.HandleFunc("/", httpsrv.handleLogWrite).Methods("POST")
- r.HandleFunc("/", httpsrv.hadnleLogRead).Methods("GET")
- return &http.Server{
- Addr : addr,
- Handler: r,
- }
- }
- type httpServer struct{
- Log *Log
- }
- func newHttpServer() *httpServer {
- return &httpServer {
- Log: NewLog(),
- }
- }
- type WriteRequest struct {
- Record Record `json:"record"`
- }
- type WriteResponse struct {
- Offset uint64 `json:"offset"`
- }
- type ReadRequest struct {
- Offset uint64 `json:"offset"`
- }
- type ReadResponse struct {
- Record Record `json:"record"`
- }
- func (s *httpServer) handleLogWrite(w http.ResponseWriter, r * http.Request) {
- var req WriteRequest
- //服務(wù)以json格式接收請求
- err := json.NewDecoder(r.Body).Decode(&req)
- if err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
- return
- }
- off, err := s.Log.Append(req.Record)
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- res := WriteResponse{Offset: off}
- //服務(wù)以json格式返回結(jié)果
- err = json.NewEncoder(w).Encode(res)
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- }
- func (s *httpServer) hadnleLogRead(w http.ResponseWriter, r *http.Request) {
- var req ReadRequest
- err := json.NewDecoder(r.Body).Decode(&req)
- if err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
- return
- }
- record, err := s.Log.Read(req.Offset)
- if err == ErrOffsetNotFound {
- http.Error(w, err.Error(), http.StatusNotFound)
- return
- }
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- res := ReadResponse{Record: record}
- err = json.NewEncoder(w).Encode(res)
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- }
上面代碼顯示出“分布式”,“微服務(wù)”的特點。相應(yīng)的功能代碼以單獨服務(wù)器的形式運行,通過網(wǎng)絡(luò)來接收服務(wù)請求,這對應(yīng)“分布式”,每個獨立模塊只完成一個特定任務(wù),這就對應(yīng)“微服務(wù)”,由于這種方式可以同時在不同的機器上運行,于是展示了“可擴展性”。
同時服務(wù)既然以http 服務(wù)器的形式存在,因此服務(wù)的請求和返回也要走Http形式,同時數(shù)據(jù)以Json方式進(jìn)行封裝。同時實現(xiàn)的邏輯很簡單,但有日志寫請求時,我們把請求解析成Record結(jié)構(gòu)體后加入到隊列末尾,當(dāng)有讀取日志的請求時,我們獲得客戶端發(fā)來的讀取偏移,然后取出對應(yīng)的記錄,封裝成json格式后返回給客戶。
完成了服務(wù)器的代碼后,我們需要將服務(wù)器運行起來,為了達(dá)到模塊化的目的,我們把服務(wù)器的啟動放置在另一個地方,在proglog根目錄下創(chuàng)建cmd/server,在里面添加main.go:
- package main
- import (
- "log"
- "internal/server"
- )
- func main() {
- srv := server.NewHttpServer(":8080")
- log.Fatal(srv.ListenAndServe())
- }
同時為了能夠引用internal/server下面的模塊,我們需要在cmd/server下先通過go mod init cmd/server進(jìn)行初始化,然后在go.mod文件中添加如下一行:
- replace internal/server => ../../internal/server
然后執(zhí)行命令 go mod tidy,這樣本地模塊就知道根據(jù)給定的目錄轉(zhuǎn)換去引用模塊,最后使用go run main.go啟動日志服務(wù),現(xiàn)在我們要做的是測試服務(wù)器的可用性,我們同樣在目錄下創(chuàng)建server_test.go,然后編寫測試代碼,基本邏輯就是想服務(wù)器發(fā)送日志寫請求,然后再發(fā)送讀請求,并比較讀到的數(shù)據(jù)是否和我們寫入的數(shù)據(jù)一致,代碼如下:
- package main
- import(
- "encoding/json"
- "net/http"
- "internal/server"
- "bytes"
- "testing"
- "io/ioutil"
- )
- func TestServerLogWrite(t *testing.T) {
- var tests = []struct {
- request server.WriteRequest
- want_response server.WriteResponse
- } {
- {request: server.WriteRequest{server.Record{[]byte(`this is log request 1`), 0}},
- want_response: server.WriteResponse{Offset: 0, },},
- {request: server.WriteRequest{server.Record{[]byte(`this is log request 2`), 0}},
- want_response: server.WriteResponse{Offset: 1, },},
- {request: server.WriteRequest{server.Record{[]byte(`this is log request 3`), 0}},
- want_response: server.WriteResponse{Offset: 2, },},
- }
- for _, test := range tests {
- //將請求轉(zhuǎn)換成json格式并post給日志服務(wù)
- request := &test.request
- request_json, err := json.Marshal(request)
- if err != nil {
- t.Errorf("convert request to json fail")
- return
- }
- resp, err := http.Post("http://localhost:8080", "application/json",bytes.NewBuffer(request_json))
- defer resp.Body.Close()
- if err != nil {
- t.Errorf("http post request fail: %v", err)
- return
- }
- //解析日志服務(wù)返回結(jié)果
- body, err := ioutil.ReadAll(resp.Body)
- var response server.WriteResponse
- err = json.Unmarshal([]byte(body), &response)
- if err != nil {
- t.Errorf("Unmarshal write response fail: %v", err)
- }
- //檢測結(jié)果是否與預(yù)期一致
- if response.Offset != test.want_response.Offset {
- t.Errorf("got offset: %d, but want offset: %d", response.Offset, test.want_response.Offset)
- }
- }
- var read_tests = []struct {
- request server.ReadRequest
- want server.ReadResponse
- } {
- {request: server.ReadRequest{Offset : 0,},
- want: server.ReadResponse{server.Record{[]byte(`this is log request 1`), 0}} },
- {request: server.ReadRequest{Offset : 1,},
- want: server.ReadResponse{server.Record{[]byte(`this is log request 2`), 0}} },
- {request: server.ReadRequest{Offset : 2,},
- want: server.ReadResponse{server.Record{[]byte(`this is log request 3`), 0}} },
- }
- for _, test := range read_tests {
- request := test.request
- request_json , err := json.Marshal(request)
- if err != nil {
- t.Errorf("convert read request to json fail")
- return
- }
- //將請求轉(zhuǎn)換為json并放入GET請求體
- client := &http.Client{}
- req, err := http.NewRequest(http.MethodGet, "http://localhost:8080", bytes.NewBuffer(request_json))
- req.Header.Set("Content-Type", "application/json")
- resp, err := client.Do(req)
- if err != nil {
- t.Errorf("read request fail: %v", err)
- return
- }
- //解析讀請求返回的結(jié)果
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- var response server.ReadResponse
- err = json.Unmarshal([]byte(body), &response)
- if err != nil {
- t.Errorf("Unmarshal read response fail: %v", err)
- return
- }
- res := bytes.Compare(response.Record.Value, test.want.Record.Value)
- if res != 0 {
- t.Errorf("got value: %q, but want value : %q", response.Record.Value, test.want.Record.Value)
- }
- }
- }
完成上面代碼后,使用go test運行,結(jié)果如下圖所示:
從結(jié)果看到,我們的測試能通過,也就是無論是向日志服務(wù)提交寫入請求還是讀取請求,所得的結(jié)果跟我們預(yù)想的一致??偨Y(jié)一下,本節(jié)我們設(shè)計了一個簡單的JSON/HTTP 日志服務(wù),它能夠接收基于JSON的http寫請求和讀請求,后面我們還會研究基于gPRC技術(shù)的微服務(wù)開發(fā)技術(shù).
代碼獲取
https://github.com/wycl16514/golang_distribute_system_log_service.git