自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

構(gòu)建一個(gè)即時(shí)消息應(yīng)用(四):消息

開發(fā) 后端
在這篇文章中,我們將對(duì)端點(diǎn)進(jìn)行編碼,以創(chuàng)建一條消息并列出它們,同時(shí)還將編寫一個(gè)端點(diǎn)以更新參與者上次閱讀消息的時(shí)間。

[[345179]]

本文是該系列的第四篇。

在這篇文章中,我們將對(duì)端點(diǎn)進(jìn)行編碼,以創(chuàng)建一條消息并列出它們,同時(shí)還將編寫一個(gè)端點(diǎn)以更新參與者上次閱讀消息的時(shí)間。 首先在 main() 函數(shù)中添加這些路由。

  1. router.HandleFunc("POST", "/api/conversations/:conversationID/messages", requireJSON(guard(createMessage)))
  2. router.HandleFunc("GET", "/api/conversations/:conversationID/messages", guard(getMessages))
  3. router.HandleFunc("POST", "/api/conversations/:conversationID/read_messages", guard(readMessages))

消息會(huì)進(jìn)入對(duì)話,因此端點(diǎn)包含對(duì)話 ID。

創(chuàng)建消息

該端點(diǎn)處理對(duì) /api/conversations/{conversationID}/messages 的 POST 請(qǐng)求,其 JSON 主體僅包含消息內(nèi)容,并返回新創(chuàng)建的消息。它有兩個(gè)副作用:更新對(duì)話 last_message_id 以及更新參與者 messages_read_at。

  1. func createMessage(w http.ResponseWriter, r *http.Request) {
  2. var input struct {
  3. Content string `json:"content"`
  4. }
  5. defer r.Body.Close()
  6. if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
  7. http.Error(w, err.Error(), http.StatusBadRequest)
  8. return
  9. }
  10.  
  11. errs := make(map[string]string)
  12. input.Content = removeSpaces(input.Content)
  13. if input.Content == "" {
  14. errs["content"] = "Message content required"
  15. } else if len([]rune(input.Content)) > 480 {
  16. errs["content"] = "Message too long. 480 max"
  17. }
  18. if len(errs) != 0 {
  19. respond(w, Errors{errs}, http.StatusUnprocessableEntity)
  20. return
  21. }
  22.  
  23. ctx := r.Context()
  24. authUserID := ctx.Value(keyAuthUserID).(string)
  25. conversationID := way.Param(ctx, "conversationID")
  26.  
  27. tx, err := db.BeginTx(ctx, nil)
  28. if err != nil {
  29. respondError(w, fmt.Errorf("could not begin tx: %v", err))
  30. return
  31. }
  32. defer tx.Rollback()
  33.  
  34. isParticipant, err := queryParticipantExistance(ctx, tx, authUserID, conversationID)
  35. if err != nil {
  36. respondError(w, fmt.Errorf("could not query participant existance: %v", err))
  37. return
  38. }
  39.  
  40. if !isParticipant {
  41. http.Error(w, "Conversation not found", http.StatusNotFound)
  42. return
  43. }
  44.  
  45. var message Message
  46. if err := tx.QueryRowContext(ctx, `
  47. INSERT INTO messages (content, user_id, conversation_id) VALUES
  48. ($1, $2, $3)
  49. RETURNING id, created_at
  50. `, input.Content, authUserID, conversationID).Scan(
  51. &message.ID,
  52. &message.CreatedAt,
  53. ); err != nil {
  54. respondError(w, fmt.Errorf("could not insert message: %v", err))
  55. return
  56. }
  57.  
  58. if _, err := tx.ExecContext(ctx, `
  59. UPDATE conversations SET last_message_id = $1
  60. WHERE id = $2
  61. `, message.ID, conversationID); err != nil {
  62. respondError(w, fmt.Errorf("could not update conversation last message ID: %v", err))
  63. return
  64. }
  65.  
  66. if err = tx.Commit(); err != nil {
  67. respondError(w, fmt.Errorf("could not commit tx to create a message: %v", err))
  68. return
  69. }
  70.  
  71. go func() {
  72. if err = updateMessagesReadAt(nil, authUserID, conversationID); err != nil {
  73. log.Printf("could not update messages read at: %v\n", err)
  74. }
  75. }()
  76.  
  77. message.Content = input.Content
  78. message.UserID = authUserID
  79. message.ConversationID = conversationID
  80. // TODO: notify about new message.
  81. message.Mine = true
  82.  
  83. respond(w, message, http.StatusCreated)
  84. }

首先,它將請(qǐng)求正文解碼為包含消息內(nèi)容的結(jié)構(gòu)。然后,它驗(yàn)證內(nèi)容不為空并且少于 480 個(gè)字符。

  1. var rxSpaces = regexp.MustCompile("\\s+")
  2.  
  3. func removeSpaces(s string) string {
  4. if s == "" {
  5. return s
  6. }
  7.  
  8. lines := make([]string, 0)
  9. for _, line := range strings.Split(s, "\n") {
  10. line = rxSpaces.ReplaceAllLiteralString(line, " ")
  11. line = strings.TrimSpace(line)
  12. if line != "" {
  13. lines = append(lines, line)
  14. }
  15. }
  16. return strings.Join(lines, "\n")
  17. }

這是刪除空格的函數(shù)。它遍歷每一行,刪除兩個(gè)以上的連續(xù)空格,然后回非空行。

驗(yàn)證之后,它將啟動(dòng)一個(gè) SQL 事務(wù)。首先,它查詢對(duì)話中的參與者是否存在。

  1. func queryParticipantExistance(ctx context.Context, tx *sql.Tx, userID, conversationID string) (bool, error) {
  2. if ctx == nil {
  3. ctx = context.Background()
  4. }
  5. var exists bool
  6. if err := tx.QueryRowContext(ctx, `SELECT EXISTS (
  7. SELECT 1 FROM participants
  8. WHERE user_id = $1 AND conversation_id = $2
  9. )`, userID, conversationID).Scan(&exists); err != nil {
  10. return false, err
  11. }
  12. return exists, nil
  13. }

我將其提取到一個(gè)函數(shù)中,因?yàn)樯院罂梢灾赜谩?/p>

如果用戶不是對(duì)話參與者,我們將返回一個(gè) 404 NOT Found 錯(cuò)誤。

然后,它插入消息并更新對(duì)話 last_message_id。從這時(shí)起,由于我們不允許刪除消息,因此 last_message_id 不能為 NULL。

接下來提交事務(wù),并在 goroutine 中更新參與者 messages_read_at。

  1. func updateMessagesReadAt(ctx context.Context, userID, conversationID string) error {
  2. if ctx == nil {
  3. ctx = context.Background()
  4. }
  5.  
  6. if _, err := db.ExecContext(ctx, `
  7. UPDATE participants SET messages_read_at = now()
  8. WHERE user_id = $1 AND conversation_id = $2
  9. `, userID, conversationID); err != nil {
  10. return err
  11. }
  12. return nil
  13. }

在回復(fù)這條新消息之前,我們必須通知一下。這是我們將要在下一篇文章中編寫的實(shí)時(shí)部分,因此我在那里留一了個(gè)注釋。

獲取消息

這個(gè)端點(diǎn)處理對(duì) /api/conversations/{conversationID}/messages 的 GET 請(qǐng)求。 它用一個(gè)包含會(huì)話中所有消息的 JSON 數(shù)組進(jìn)行響應(yīng)。它還具有更新參與者 messages_read_at 的副作用。

  1. func getMessages(w http.ResponseWriter, r *http.Request) {
  2. ctx := r.Context()
  3. authUserID := ctx.Value(keyAuthUserID).(string)
  4. conversationID := way.Param(ctx, "conversationID")
  5.  
  6. tx, err := db.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
  7. if err != nil {
  8. respondError(w, fmt.Errorf("could not begin tx: %v", err))
  9. return
  10. }
  11. defer tx.Rollback()
  12.  
  13. isParticipant, err := queryParticipantExistance(ctx, tx, authUserID, conversationID)
  14. if err != nil {
  15. respondError(w, fmt.Errorf("could not query participant existance: %v", err))
  16. return
  17. }
  18.  
  19. if !isParticipant {
  20. http.Error(w, "Conversation not found", http.StatusNotFound)
  21. return
  22. }
  23.  
  24. rows, err := tx.QueryContext(ctx, `
  25. SELECT
  26. id,
  27. content,
  28. created_at,
  29. user_id = $1 AS mine
  30. FROM messages
  31. WHERE messages.conversation_id = $2
  32. ORDER BY messages.created_at DESC
  33. `, authUserID, conversationID)
  34. if err != nil {
  35. respondError(w, fmt.Errorf("could not query messages: %v", err))
  36. return
  37. }
  38. defer rows.Close()
  39.  
  40. messages := make([]Message, 0)
  41. for rows.Next() {
  42. var message Message
  43. if err = rows.Scan(
  44. &message.ID,
  45. &message.Content,
  46. &message.CreatedAt,
  47. &message.Mine,
  48. ); err != nil {
  49. respondError(w, fmt.Errorf("could not scan message: %v", err))
  50. return
  51. }
  52.  
  53. messages = append(messages, message)
  54. }
  55.  
  56. if err = rows.Err(); err != nil {
  57. respondError(w, fmt.Errorf("could not iterate over messages: %v", err))
  58. return
  59. }
  60.  
  61. if err = tx.Commit(); err != nil {
  62. respondError(w, fmt.Errorf("could not commit tx to get messages: %v", err))
  63. return
  64. }
  65.  
  66. go func() {
  67. if err = updateMessagesReadAt(nil, authUserID, conversationID); err != nil {
  68. log.Printf("could not update messages read at: %v\n", err)
  69. }
  70. }()
  71.  
  72. respond(w, messages, http.StatusOK)
  73. }

首先,它以只讀模式開始一個(gè) SQL 事務(wù)。檢查參與者是否存在,并查詢所有消息。在每條消息中,我們使用當(dāng)前經(jīng)過身份驗(yàn)證的用戶 ID 來了解用戶是否擁有該消息(mine)。 然后,它提交事務(wù),在 goroutine 中更新參與者 messages_read_at 并以消息響應(yīng)。

讀取消息

該端點(diǎn)處理對(duì) /api/conversations/{conversationID}/read_messages 的 POST 請(qǐng)求。 沒有任何請(qǐng)求或響應(yīng)主體。 在前端,每次有新消息到達(dá)實(shí)時(shí)流時(shí),我們都會(huì)發(fā)出此請(qǐng)求。

  1. func readMessages(w http.ResponseWriter, r *http.Request) {
  2. ctx := r.Context()
  3. authUserID := ctx.Value(keyAuthUserID).(string)
  4. conversationID := way.Param(ctx, "conversationID")
  5.  
  6. if err := updateMessagesReadAt(ctx, authUserID, conversationID); err != nil {
  7. respondError(w, fmt.Errorf("could not update messages read at: %v", err))
  8. return
  9. }
  10.  
  11. w.WriteHeader(http.StatusNoContent)
  12. }

它使用了與更新參與者 messages_read_at 相同的函數(shù)。


到此為止。實(shí)時(shí)消息是后臺(tái)僅剩的部分了。請(qǐng)等待下一篇文章。

 

責(zé)任編輯:龐桂玉 來源: 51CTO
相關(guān)推薦

2020-10-09 15:00:56

實(shí)時(shí)消息編程語言

2019-09-29 15:25:13

CockroachDBGoJavaScript

2019-10-28 20:12:40

OAuthGuard中間件編程語言

2020-03-31 12:21:20

JSON即時(shí)消息編程語言

2020-10-12 09:20:13

即時(shí)消息Access頁面編程語言

2020-10-19 16:20:38

即時(shí)消息Conversatio編程語言

2020-10-16 14:40:20

即時(shí)消息Home頁面編程語言

2020-10-10 20:51:10

即時(shí)消息編程語言

2021-03-25 08:29:33

SpringBootWebSocket即時(shí)消息

2023-08-14 08:01:12

websocket8g用戶

2015-03-18 15:37:19

社交APP場景

2011-10-19 09:30:23

jQuery

2023-03-27 08:33:32

2021-12-03 00:02:01

通訊工具即時(shí)

2010-05-24 09:51:37

System Cent

2021-05-10 15:05:18

消息通信本地網(wǎng)絡(luò)

2024-04-24 11:42:21

Redis延遲消息數(shù)據(jù)庫

2022-08-30 11:41:53

網(wǎng)絡(luò)攻擊木馬

2009-06-29 09:06:42

微軟Web版MSN

2025-01-02 09:23:05

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)