構(gòu)建一個(gè)即時(shí)消息應(yīng)用(四):消息
本文是該系列的第四篇。
在這篇文章中,我們將對(duì)端點(diǎn)進(jìn)行編碼,以創(chuàng)建一條消息并列出它們,同時(shí)還將編寫一個(gè)端點(diǎn)以更新參與者上次閱讀消息的時(shí)間。 首先在 main()
函數(shù)中添加這些路由。
router.HandleFunc("POST", "/api/conversations/:conversationID/messages", requireJSON(guard(createMessage)))
router.HandleFunc("GET", "/api/conversations/:conversationID/messages", guard(getMessages))
router.HandleFunc("POST", "/api/conversations/:conversationID/read_messages", guard(readMessages))
消息會(huì)進(jìn)入對(duì)話,因此端點(diǎn)包含對(duì)話 ID。
創(chuàng)建消息
該端點(diǎn)處理對(duì) /api/conversations/{conversationID}/messages
的 POST 請(qǐng)求,其 JSON 主體僅包含消息內(nèi)容,并返回新創(chuàng)建的消息。它有兩個(gè)副作用:更新對(duì)話 last_message_id
以及更新參與者 messages_read_at
。
func createMessage(w http.ResponseWriter, r *http.Request) {
var input struct {
Content string `json:"content"`
}
defer r.Body.Close()
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
errs := make(map[string]string)
input.Content = removeSpaces(input.Content)
if input.Content == "" {
errs["content"] = "Message content required"
} else if len([]rune(input.Content)) > 480 {
errs["content"] = "Message too long. 480 max"
}
if len(errs) != 0 {
respond(w, Errors{errs}, http.StatusUnprocessableEntity)
return
}
ctx := r.Context()
authUserID := ctx.Value(keyAuthUserID).(string)
conversationID := way.Param(ctx, "conversationID")
tx, err := db.BeginTx(ctx, nil)
if err != nil {
respondError(w, fmt.Errorf("could not begin tx: %v", err))
return
}
defer tx.Rollback()
isParticipant, err := queryParticipantExistance(ctx, tx, authUserID, conversationID)
if err != nil {
respondError(w, fmt.Errorf("could not query participant existance: %v", err))
return
}
if !isParticipant {
http.Error(w, "Conversation not found", http.StatusNotFound)
return
}
var message Message
if err := tx.QueryRowContext(ctx, `
INSERT INTO messages (content, user_id, conversation_id) VALUES
($1, $2, $3)
RETURNING id, created_at
`, input.Content, authUserID, conversationID).Scan(
&message.ID,
&message.CreatedAt,
); err != nil {
respondError(w, fmt.Errorf("could not insert message: %v", err))
return
}
if _, err := tx.ExecContext(ctx, `
UPDATE conversations SET last_message_id = $1
WHERE id = $2
`, message.ID, conversationID); err != nil {
respondError(w, fmt.Errorf("could not update conversation last message ID: %v", err))
return
}
if err = tx.Commit(); err != nil {
respondError(w, fmt.Errorf("could not commit tx to create a message: %v", err))
return
}
go func() {
if err = updateMessagesReadAt(nil, authUserID, conversationID); err != nil {
log.Printf("could not update messages read at: %v\n", err)
}
}()
message.Content = input.Content
message.UserID = authUserID
message.ConversationID = conversationID
// TODO: notify about new message.
message.Mine = true
respond(w, message, http.StatusCreated)
}
首先,它將請(qǐng)求正文解碼為包含消息內(nèi)容的結(jié)構(gòu)。然后,它驗(yàn)證內(nèi)容不為空并且少于 480 個(gè)字符。
var rxSpaces = regexp.MustCompile("\\s+")
func removeSpaces(s string) string {
if s == "" {
return s
}
lines := make([]string, 0)
for _, line := range strings.Split(s, "\n") {
line = rxSpaces.ReplaceAllLiteralString(line, " ")
line = strings.TrimSpace(line)
if line != "" {
lines = append(lines, line)
}
}
return strings.Join(lines, "\n")
}
這是刪除空格的函數(shù)。它遍歷每一行,刪除兩個(gè)以上的連續(xù)空格,然后回非空行。
驗(yàn)證之后,它將啟動(dòng)一個(gè) SQL 事務(wù)。首先,它查詢對(duì)話中的參與者是否存在。
func queryParticipantExistance(ctx context.Context, tx *sql.Tx, userID, conversationID string) (bool, error) {
if ctx == nil {
ctx = context.Background()
}
var exists bool
if err := tx.QueryRowContext(ctx, `SELECT EXISTS (
SELECT 1 FROM participants
WHERE user_id = $1 AND conversation_id = $2
)`, userID, conversationID).Scan(&exists); err != nil {
return false, err
}
return exists, nil
}
我將其提取到一個(gè)函數(shù)中,因?yàn)樯院罂梢灾赜谩?/p>
如果用戶不是對(duì)話參與者,我們將返回一個(gè) 404 NOT Found
錯(cuò)誤。
然后,它插入消息并更新對(duì)話 last_message_id
。從這時(shí)起,由于我們不允許刪除消息,因此 last_message_id
不能為 NULL
。
接下來提交事務(wù),并在 goroutine 中更新參與者 messages_read_at
。
func updateMessagesReadAt(ctx context.Context, userID, conversationID string) error {
if ctx == nil {
ctx = context.Background()
}
if _, err := db.ExecContext(ctx, `
UPDATE participants SET messages_read_at = now()
WHERE user_id = $1 AND conversation_id = $2
`, userID, conversationID); err != nil {
return err
}
return nil
}
在回復(fù)這條新消息之前,我們必須通知一下。這是我們將要在下一篇文章中編寫的實(shí)時(shí)部分,因此我在那里留一了個(gè)注釋。
獲取消息
這個(gè)端點(diǎn)處理對(duì) /api/conversations/{conversationID}/messages
的 GET 請(qǐng)求。 它用一個(gè)包含會(huì)話中所有消息的 JSON 數(shù)組進(jìn)行響應(yīng)。它還具有更新參與者 messages_read_at
的副作用。
func getMessages(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
authUserID := ctx.Value(keyAuthUserID).(string)
conversationID := way.Param(ctx, "conversationID")
tx, err := db.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
respondError(w, fmt.Errorf("could not begin tx: %v", err))
return
}
defer tx.Rollback()
isParticipant, err := queryParticipantExistance(ctx, tx, authUserID, conversationID)
if err != nil {
respondError(w, fmt.Errorf("could not query participant existance: %v", err))
return
}
if !isParticipant {
http.Error(w, "Conversation not found", http.StatusNotFound)
return
}
rows, err := tx.QueryContext(ctx, `
SELECT
id,
content,
created_at,
user_id = $1 AS mine
FROM messages
WHERE messages.conversation_id = $2
ORDER BY messages.created_at DESC
`, authUserID, conversationID)
if err != nil {
respondError(w, fmt.Errorf("could not query messages: %v", err))
return
}
defer rows.Close()
messages := make([]Message, 0)
for rows.Next() {
var message Message
if err = rows.Scan(
&message.ID,
&message.Content,
&message.CreatedAt,
&message.Mine,
); err != nil {
respondError(w, fmt.Errorf("could not scan message: %v", err))
return
}
messages = append(messages, message)
}
if err = rows.Err(); err != nil {
respondError(w, fmt.Errorf("could not iterate over messages: %v", err))
return
}
if err = tx.Commit(); err != nil {
respondError(w, fmt.Errorf("could not commit tx to get messages: %v", err))
return
}
go func() {
if err = updateMessagesReadAt(nil, authUserID, conversationID); err != nil {
log.Printf("could not update messages read at: %v\n", err)
}
}()
respond(w, messages, http.StatusOK)
}
首先,它以只讀模式開始一個(gè) SQL 事務(wù)。檢查參與者是否存在,并查詢所有消息。在每條消息中,我們使用當(dāng)前經(jīng)過身份驗(yàn)證的用戶 ID 來了解用戶是否擁有該消息(mine
)。 然后,它提交事務(wù),在 goroutine 中更新參與者 messages_read_at
并以消息響應(yīng)。
讀取消息
該端點(diǎn)處理對(duì) /api/conversations/{conversationID}/read_messages
的 POST 請(qǐng)求。 沒有任何請(qǐng)求或響應(yīng)主體。 在前端,每次有新消息到達(dá)實(shí)時(shí)流時(shí),我們都會(huì)發(fā)出此請(qǐng)求。
func readMessages(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
authUserID := ctx.Value(keyAuthUserID).(string)
conversationID := way.Param(ctx, "conversationID")
if err := updateMessagesReadAt(ctx, authUserID, conversationID); err != nil {
respondError(w, fmt.Errorf("could not update messages read at: %v", err))
return
}
w.WriteHeader(http.StatusNoContent)
}
它使用了與更新參與者 messages_read_at
相同的函數(shù)。
到此為止。實(shí)時(shí)消息是后臺(tái)僅剩的部分了。請(qǐng)等待下一篇文章。