自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

GO 實現(xiàn)高并發(fā)高可用分布式系統(tǒng):Log微服務(wù)的實現(xiàn)

開發(fā) 架構(gòu) 分布式
在大數(shù)據(jù)時代,具備高并發(fā),高可用,理解微服務(wù)系統(tǒng)設(shè)計的人員需求很大,如果你想從事后臺開發(fā),在JD的描述中最常見的要求就是有所謂的“高并發(fā)”系統(tǒng)開發(fā)經(jīng)驗。

本文轉(zhuǎn)載自微信公眾號「Coding迪斯尼」,作者陳屹。轉(zhuǎn)載本文請聯(lián)系Coding迪斯尼公眾號。

在大數(shù)據(jù)時代,具備高并發(fā),高可用,理解微服務(wù)系統(tǒng)設(shè)計的人員需求很大,如果你想從事后臺開發(fā),在JD的描述中最常見的要求就是有所謂的“高并發(fā)”系統(tǒng)開發(fā)經(jīng)驗。但我發(fā)現(xiàn)在市面上并沒有直接針對“高并發(fā)”,“高可用”的教程,你搜到的資料往往都是只言片語,要不就是闡述那些令人摸不著頭腦的理論。但是技術(shù)的掌握必須從實踐中來,我找了很久發(fā)現(xiàn)很少有指導(dǎo)人動手實踐基于微服務(wù)的高并發(fā)系統(tǒng)開發(fā),因此我希望結(jié)合自己的學(xué)習(xí)和實踐經(jīng)驗跟大家分享一下這方面的技術(shù),特別是要強調(diào)具體的動手實踐來理解和掌握分布式系統(tǒng)設(shè)計的理論和技術(shù)。

所謂“微服務(wù)”其實沒什么神奇的地方,它只不過是把我們原來聚合在一起的模塊分解成多個獨立的,基于服務(wù)器程序存在的形式,假設(shè)我們開發(fā)的后臺系統(tǒng)分為日志,存儲,業(yè)務(wù)邏輯,算法邏輯等模塊,以前這些模塊會聚合成一個整體形成一個復(fù)雜龐大的應(yīng)用程序:

這種方式存在很多問題,第一是過多模塊糅合在一起會使得系統(tǒng)設(shè)計過于復(fù)雜,因為模塊直接存在各種邏輯耦合,這使得隨著時間的推移,系統(tǒng)的開發(fā)和維護變得越來越困難。第二是系統(tǒng)越來越脆弱,只要其中一個模塊發(fā)送錯誤或奔潰,整個系統(tǒng)可能就會垮塌。第三是可擴展性不強,系統(tǒng)很難通過硬件性能的增強而實現(xiàn)相應(yīng)擴展。

要實現(xiàn)高并發(fā),高可用,其基本思路就是將模塊拆解,然后讓他們成為獨立運行的服務(wù)器程序,各個模塊之間通過消息發(fā)送的方式完成配合:

這種模式的好處在于:1,模塊之間解耦合,一個模塊出問題對整個系統(tǒng)影響很小。2,可擴展,高可用,我們可以將模塊部署到不同服務(wù)器上,當(dāng)流量增加,我們只要簡單的增加服務(wù)器數(shù)量就能使得系統(tǒng)的響應(yīng)能力實現(xiàn)同等擴展。3,魯棒性增強,由于模塊能備份多個,其中一個模塊出問題,請求可以重定向到其他同樣模塊,于是系統(tǒng)的可靠性能大大增強。

當(dāng)然任何收益都有對應(yīng)代價,分布式系統(tǒng)的設(shè)計開發(fā)相比于原來的聚合性系統(tǒng)會多出很多難點。例如負(fù)載均衡,服務(wù)發(fā)現(xiàn),模塊協(xié)商,共識達(dá)成等,分布式算法強調(diào)的就是這些問題的解決,但是理論總是抽象難以理解,倘若不能動手實現(xiàn)一個高可用高并發(fā)系統(tǒng),你看多少理論都是霧里看花,越看越糊涂,所以我們必須通過動手實踐來理解和掌握理論,首先我們從最簡單的服務(wù)入手,那就是日志服務(wù),我們將使用GO來實現(xiàn)。

首先創(chuàng)建根目錄,可以命名為go_distributed_system,后面所有服務(wù)模塊都實現(xiàn)在該目錄下,然后創(chuàng)建子目錄proglog,進(jìn)去后我們再創(chuàng)建子目錄internel/server/在這里我們實現(xiàn)日志服務(wù)的邏輯模塊,首先在internel/server下面執(zhí)行初始化命令:

  1. go mod init internal/server 

這里開發(fā)的模塊會被其他模塊引用,所以我們需要創(chuàng)建mod文件。首先我們需要完成日志系統(tǒng)所需的底層數(shù)據(jù)結(jié)構(gòu),創(chuàng)建log.go文件,相應(yīng)代碼如下:

  1. package server 
  2.  
  3. import ( 
  4.     "fmt" 
  5.     "sync" 
  6.  
  7. type Log struct { 
  8.     mu sync.Mutex 
  9.     records [] Record  
  10.  
  11. func NewLog() *Log { 
  12.     return &Log{ch : make(chan Record),}  
  13.  
  14. func(c *Log) Append(record Record) (uint64, error) { 
  15.      c.mu.Lock() 
  16.     defer c.mu.Unlock() 
  17.     record.Offset = uint64(len(c.records)) 
  18.     c.records = append(c.records, record) 
  19.     return record.Offset, nil  
  20.  
  21. func (c *Log) Read(offset uint64)(Record, error) { 
  22.     c.mu.Lock() 
  23.     defer c.mu.Unlock() 
  24.     if offset >= uint64(len(c.records)) { 
  25.         return Record{}, ErrOffsetNotFound  
  26.     } 
  27.  
  28.     return c.records[offset], nil  
  29.  
  30. type Record struct { 
  31.     Value []byte `json:"value"
  32.     Offset uint64 `json:"offset"
  33.  
  34. var ErrOffsetNotFound = fmt.Errorf("offset not found"

由于我們的日志服務(wù)將以http服務(wù)器程序的方式接收日志讀寫請求,因此多個讀或?qū)懻埱髸瑫r執(zhí)行,所以我們需要對records數(shù)組進(jìn)行互斥操作,因此使用了互斥鎖,在每次讀取records數(shù)組前先獲得鎖,這樣能防止服務(wù)在同時接收多個讀寫請求時破壞掉數(shù)據(jù)的一致性。

所有的日志讀寫請求會以http POST 和 GET的方式發(fā)起,數(shù)據(jù)通過json來封裝,所以我們下面將創(chuàng)建一個http服務(wù)器對象,新建文件http.go,完成如下代碼:

  1. package server  
  2.  
  3. import ( 
  4.     "encoding/json" 
  5.     "net/http" 
  6.     "github.com/gorilla/mux" 
  7.  
  8. func NewHttpServer(addr string) *http.Server { 
  9.     httpsrv := newHttpServer() 
  10.     r := mux.NewRouter() 
  11.     r.HandleFunc("/", httpsrv.handleLogWrite).Methods("POST"
  12.     r.HandleFunc("/", httpsrv.hadnleLogRead).Methods("GET"
  13.  
  14.     return &http.Server{ 
  15.         Addr : addr, 
  16.         Handler: r, 
  17.     } 
  18.  
  19. type httpServer struct{ 
  20.     Log *Log  
  21.  
  22. func newHttpServer() *httpServer { 
  23.     return &httpServer { 
  24.         Log: NewLog(), 
  25.     } 
  26.  
  27. type WriteRequest struct { 
  28.     Record Record `json:"record"
  29.  
  30. type WriteResponse struct { 
  31.     Offset uint64 `json:"offset"
  32.  
  33. type ReadRequest struct { 
  34.     Offset uint64 `json:"offset"
  35.  
  36. type ReadResponse struct { 
  37.     Record Record `json:"record"
  38.  
  39. func (s *httpServer) handleLogWrite(w http.ResponseWriter, r * http.Request) { 
  40.     var req WriteRequest  
  41.     //服務(wù)以json格式接收請求 
  42.     err := json.NewDecoder(r.Body).Decode(&req) 
  43.     if err != nil { 
  44.         http.Error(w, err.Error(), http.StatusBadRequest) 
  45.         return  
  46.     } 
  47.  
  48.     off, err := s.Log.Append(req.Record) 
  49.     if err != nil { 
  50.         http.Error(w, err.Error(), http.StatusInternalServerError) 
  51.         return  
  52.     } 
  53.  
  54.     res := WriteResponse{Offset: off
  55.     //服務(wù)以json格式返回結(jié)果 
  56.     err = json.NewEncoder(w).Encode(res) 
  57.     if err != nil { 
  58.         http.Error(w, err.Error(), http.StatusInternalServerError) 
  59.         return  
  60.     } 
  61.  
  62. func (s *httpServer) hadnleLogRead(w http.ResponseWriter, r *http.Request) { 
  63.     var req ReadRequest  
  64.     err := json.NewDecoder(r.Body).Decode(&req) 
  65.     if err != nil { 
  66.         http.Error(w, err.Error(), http.StatusBadRequest) 
  67.         return  
  68.     } 
  69.  
  70.     record, err := s.Log.Read(req.Offset) 
  71.     if err == ErrOffsetNotFound { 
  72.         http.Error(w, err.Error(), http.StatusNotFound) 
  73.         return 
  74.     } 
  75.  
  76.     if err != nil { 
  77.         http.Error(w, err.Error(), http.StatusInternalServerError) 
  78.         return  
  79.     } 
  80.  
  81.     res := ReadResponse{Record: record} 
  82.     err = json.NewEncoder(w).Encode(res) 
  83.     if err != nil { 
  84.         http.Error(w, err.Error(), http.StatusInternalServerError) 
  85.         return 
  86.     } 

上面代碼顯示出“分布式”,“微服務(wù)”的特點。相應(yīng)的功能代碼以單獨服務(wù)器的形式運行,通過網(wǎng)絡(luò)來接收服務(wù)請求,這對應(yīng)“分布式”,每個獨立模塊只完成一個特定任務(wù),這就對應(yīng)“微服務(wù)”,由于這種方式可以同時在不同的機器上運行,于是展示了“可擴展性”。

同時服務(wù)既然以http 服務(wù)器的形式存在,因此服務(wù)的請求和返回也要走Http形式,同時數(shù)據(jù)以Json方式進(jìn)行封裝。同時實現(xiàn)的邏輯很簡單,但有日志寫請求時,我們把請求解析成Record結(jié)構(gòu)體后加入到隊列末尾,當(dāng)有讀取日志的請求時,我們獲得客戶端發(fā)來的讀取偏移,然后取出對應(yīng)的記錄,封裝成json格式后返回給客戶。

完成了服務(wù)器的代碼后,我們需要將服務(wù)器運行起來,為了達(dá)到模塊化的目的,我們把服務(wù)器的啟動放置在另一個地方,在proglog根目錄下創(chuàng)建cmd/server,在里面添加main.go:

  1. package main  
  2.  
  3. import ( 
  4.     "log" 
  5.     "internal/server" 
  6.  
  7. func main() { 
  8.     srv := server.NewHttpServer(":8080"
  9.     log.Fatal(srv.ListenAndServe()) 

同時為了能夠引用internal/server下面的模塊,我們需要在cmd/server下先通過go mod init cmd/server進(jìn)行初始化,然后在go.mod文件中添加如下一行:

  1. replace internal/server => ../../internal/server 

然后執(zhí)行命令 go mod tidy,這樣本地模塊就知道根據(jù)給定的目錄轉(zhuǎn)換去引用模塊,最后使用go run main.go啟動日志服務(wù),現(xiàn)在我們要做的是測試服務(wù)器的可用性,我們同樣在目錄下創(chuàng)建server_test.go,然后編寫測試代碼,基本邏輯就是想服務(wù)器發(fā)送日志寫請求,然后再發(fā)送讀請求,并比較讀到的數(shù)據(jù)是否和我們寫入的數(shù)據(jù)一致,代碼如下:

  1. package main 
  2.  
  3. import( 
  4.     "encoding/json" 
  5.     "net/http" 
  6.     "internal/server" 
  7.     "bytes" 
  8.     "testing" 
  9.     "io/ioutil" 
  10.  
  11. func TestServerLogWrite(t *testing.T) { 
  12.     var tests = []struct { 
  13.         request server.WriteRequest 
  14.         want_response server.WriteResponse  
  15.     } { 
  16.         {request: server.WriteRequest{server.Record{[]byte(`this is log request 1`), 0}},  
  17.          want_response:  server.WriteResponse{Offset: 0, },}, 
  18.          {request: server.WriteRequest{server.Record{[]byte(`this is log request 2`), 0}},  
  19.          want_response:  server.WriteResponse{Offset: 1, },}, 
  20.          {request: server.WriteRequest{server.Record{[]byte(`this is log request 3`), 0}},  
  21.          want_response:  server.WriteResponse{Offset: 2, },}, 
  22.     } 
  23.  
  24.     for _, test := range tests { 
  25.         //將請求轉(zhuǎn)換成json格式并post給日志服務(wù) 
  26.         request := &test.request  
  27.         request_json, err := json.Marshal(request) 
  28.         if err != nil { 
  29.             t.Errorf("convert request to json fail"
  30.             return  
  31.         } 
  32.  
  33.         resp, err := http.Post("http://localhost:8080""application/json",bytes.NewBuffer(request_json)) 
  34.         defer resp.Body.Close() 
  35.         if err != nil { 
  36.             t.Errorf("http post request fail: %v", err) 
  37.             return 
  38.         } 
  39.  
  40.         //解析日志服務(wù)返回結(jié)果 
  41.         body, err := ioutil.ReadAll(resp.Body) 
  42.         var response server.WriteResponse  
  43.         err = json.Unmarshal([]byte(body), &response) 
  44.         if err != nil { 
  45.             t.Errorf("Unmarshal write response fail: %v", err) 
  46.         } 
  47.  
  48.         //檢測結(jié)果是否與預(yù)期一致 
  49.         if response.Offset != test.want_response.Offset { 
  50.             t.Errorf("got offset: %d, but want offset: %d", response.Offset, test.want_response.Offset) 
  51.         } 
  52.  
  53.     } 
  54.  
  55.     var read_tests = []struct { 
  56.         request server.ReadRequest  
  57.         want server.ReadResponse  
  58.     } { 
  59.         {request: server.ReadRequest{Offset : 0,},  
  60.         want: server.ReadResponse{server.Record{[]byte(`this is log request 1`), 0}} }, 
  61.         {request: server.ReadRequest{Offset : 1,},  
  62.         want: server.ReadResponse{server.Record{[]byte(`this is log request 2`), 0}} }, 
  63.         {request: server.ReadRequest{Offset : 2,},  
  64.         want: server.ReadResponse{server.Record{[]byte(`this is log request 3`), 0}} }, 
  65.     } 
  66.  
  67.     for _, test := range read_tests { 
  68.         request := test.request  
  69.         request_json , err := json.Marshal(request) 
  70.         if err != nil { 
  71.             t.Errorf("convert read request to json fail"
  72.             return  
  73.         } 
  74.  
  75.         //將請求轉(zhuǎn)換為json并放入GET請求體 
  76.         client := &http.Client{} 
  77.         req, err := http.NewRequest(http.MethodGet, "http://localhost:8080", bytes.NewBuffer(request_json)) 
  78.         req.Header.Set("Content-Type""application/json"
  79.         resp, err := client.Do(req) 
  80.         if err != nil { 
  81.             t.Errorf("read request fail: %v", err) 
  82.             return  
  83.         } 
  84.  
  85.         //解析讀請求返回的結(jié)果 
  86.         defer resp.Body.Close() 
  87.         body, err := ioutil.ReadAll(resp.Body) 
  88.         var response server.ReadResponse 
  89.         err = json.Unmarshal([]byte(body), &response) 
  90.         if err != nil { 
  91.             t.Errorf("Unmarshal read response fail: %v", err) 
  92.             return  
  93.         } 
  94.  
  95.         res := bytes.Compare(response.Record.Value, test.want.Record.Value) 
  96.         if res != 0 { 
  97.             t.Errorf("got value: %q, but want value : %q", response.Record.Value, test.want.Record.Value) 
  98.         } 
  99.     } 
  100.  

完成上面代碼后,使用go test運行,結(jié)果如下圖所示:

從結(jié)果看到,我們的測試能通過,也就是無論是向日志服務(wù)提交寫入請求還是讀取請求,所得的結(jié)果跟我們預(yù)想的一致??偨Y(jié)一下,本節(jié)我們設(shè)計了一個簡單的JSON/HTTP 日志服務(wù),它能夠接收基于JSON的http寫請求和讀請求,后面我們還會研究基于gPRC技術(shù)的微服務(wù)開發(fā)技術(shù).

代碼獲取

https://github.com/wycl16514/golang_distribute_system_log_service.git

 

責(zé)任編輯:武曉燕 來源: Coding迪斯尼
相關(guān)推薦

2022-01-26 00:03:00

高可用gRPC微服務(wù)

2022-05-11 13:55:18

高可用性分布式彈性

2017-12-12 14:51:15

分布式緩存設(shè)計

2023-08-25 16:26:49

微服務(wù)架構(gòu)

2019-09-25 09:01:53

高并發(fā)架構(gòu)分布式

2019-12-17 11:18:37

高并發(fā)分布式架構(gòu)

2020-02-10 19:16:52

服務(wù)端高并發(fā)架構(gòu)

2024-09-02 22:49:33

2020-10-13 07:44:45

理解分布式

2021-12-03 10:30:25

WOT技術(shù)峰會技術(shù)

2021-01-25 15:00:44

微服務(wù)分布式日志

2017-04-17 09:54:34

分布式數(shù)據(jù)庫PhxSQL

2019-10-16 10:34:33

數(shù)據(jù)庫大數(shù)據(jù)腳本語言

2024-10-08 11:21:11

2020-09-23 22:36:27

分布式架構(gòu)系統(tǒng)

2020-11-26 09:38:19

分布式架構(gòu)系統(tǒng)

2021-09-23 12:14:50

Redis分布式優(yōu)化

2021-05-24 09:28:41

軟件開發(fā) 技術(shù)

2022-11-25 17:29:27

分布式事務(wù)

2021-12-09 10:45:19

分布式事務(wù)框架
點贊
收藏

51CTO技術(shù)棧公眾號