SpringBoot與Curator整合,實(shí)現(xiàn)票務(wù)預(yù)訂系統(tǒng)
在票務(wù)預(yù)訂系統(tǒng)中,多個(gè)服務(wù)實(shí)例需要競(jìng)爭(zhēng)性地處理票務(wù)預(yù)訂請(qǐng)求,以確保每個(gè)座位只被預(yù)訂一次。為了滿足這個(gè)基本的要求,我們決定使用 Apache Curator 實(shí)現(xiàn)一個(gè)分布式鎖來(lái)控制對(duì)票務(wù)資源的訪問(wèn)。
一、我們?yōu)槭裁催x擇Curator?
1. 簡(jiǎn)化ZooKeeper的使用
Apache Curator 是一個(gè)高級(jí)的 Java 客戶端庫(kù),專(zhuān)門(mén)用于簡(jiǎn)化對(duì) ZooKeeper 的操作。它提供了許多實(shí)用的功能和抽象,使得我們更容易地與 ZooKeeper 進(jìn)行交互,而無(wú)需處理底層的復(fù)雜細(xì)節(jié)。
- 自動(dòng)重試機(jī)制:Curator 提供了多種重試策略(如指數(shù)退避),確保在連接中斷時(shí)能夠自動(dòng)重試。
- 路徑管理:Curator 自動(dòng)處理節(jié)點(diǎn)的創(chuàng)建、刪除等操作,簡(jiǎn)化了路徑管理的工作。
- 事件監(jiān)聽(tīng):Curator 提供了更簡(jiǎn)潔的事件監(jiān)聽(tīng)接口,方便處理各種 ZooKeeper 事件。
2. 強(qiáng)大的分布式鎖實(shí)現(xiàn)
Curator Recipes 是 Curator 提供的一組高級(jí)功能模塊,其中包括了實(shí)現(xiàn)分布式鎖所需的工具。特別是InterProcessMutex類(lèi),它是基于 ZooKeeper 實(shí)現(xiàn)的一個(gè)可重入的分布式互斥鎖。
- 可重入性:InterProcessMutex 支持線程級(jí)別的可重入性,確保同一個(gè)線程多次獲取鎖不會(huì)導(dǎo)致死鎖。
- 公平性:InterProcessMutex 默認(rèn)是公平鎖,確保按照請(qǐng)求順序依次獲取鎖。
- 靈活性:可以通過(guò)配置不同的參數(shù)來(lái)滿足不同的需求,例如超時(shí)時(shí)間、重試次數(shù)等。
二、哪些公司使用了Curator?
- 歐洲核子研究組織(CERN)使用 Curator 來(lái)管理其高性能計(jì)算環(huán)境中的協(xié)調(diào)任務(wù)。
- Uber 使用 Curator 來(lái)實(shí)現(xiàn)其微服務(wù)架構(gòu)中的服務(wù)發(fā)現(xiàn)和協(xié)調(diào)。
- Twitter 使用 Curator 來(lái)管理其微服務(wù)架構(gòu)中的配置和服務(wù)協(xié)調(diào)。
- eBay 使用 Curator 來(lái)管理其分布式系統(tǒng)的配置和服務(wù)協(xié)調(diào)。
- LinkedIn 使用 Curator 來(lái)實(shí)現(xiàn)集群管理和協(xié)調(diào)任務(wù)調(diào)度。
- Airbnb 使用 Curator 來(lái)處理其基礎(chǔ)設(shè)施中的各種協(xié)調(diào)任務(wù)。
- Yahoo! 使用 Curator 來(lái)管理其大規(guī)模分布式系統(tǒng)的配置和服務(wù)發(fā)現(xiàn)。
- Facebook 使用 Curator 來(lái)管理其分布式系統(tǒng)的配置和服務(wù)協(xié)調(diào)。
三、代碼實(shí)操
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>ticket-booking-system</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Ticket Booking System</name>
<description>A simple example of using Apache Curator for distributed locking in a Spring Boot application</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.5</version>
<relativePath/><!-- lookup parent from repository -->
</parent>
<properties>
<java.version>11</java.version>
<curator.version>5.3.0</curator.version>
</properties>
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Apache Curator Framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>
<!-- Apache Curator Recipes (for InterProcessMutex) -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
<!-- SLF4J Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
四、application.properties
zookeeper.connect-string=localhost:2181
server.port=8080
initial.tickets=100
五、配置 Curator 客戶端
package com.example.ticketbooking.config;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CuratorConfig {
@Value("${zookeeper.connect-string}")
private String zookeeperConnectString;
/**
* 配置并返回一個(gè) CuratorFramework 實(shí)例,用于連接 ZooKeeper。
* 使用指數(shù)退避重試策略(ExponentialBackoffRetry)來(lái)處理連接失敗的情況。
*
* @return 初始化并啟動(dòng)的 CuratorFramework 實(shí)例
*/
@Bean(initMethod = "start", destroyMethod = "close")
public CuratorFramework curatorFramework() {
return CuratorFrameworkFactory.newClient(zookeeperConnectString,
new ExponentialBackoffRetry(1000, 3));
}
}
六、Service
package com.example.ticketbooking.service;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
@Service
public class BookingService {
private static final Logger logger = LoggerFactory.getLogger(BookingService.class);
@Autowired
private CuratorFramework client;
@Value("${initial.tickets}")
private int initialTickets;
private static final String TICKETS_PATH = "/tickets";
/**
* 在服務(wù)啟動(dòng)時(shí)初始化票數(shù)。
* 如果 ZooKeeper 中不存在 /tickets 節(jié)點(diǎn),則創(chuàng)建該節(jié)點(diǎn)并設(shè)置初始票數(shù)。
*
* @throws Exception 如果操作過(guò)程中發(fā)生異常
*/
@PostConstruct
public void initializeTickets() throws Exception {
if (client.checkExists().forPath(TICKETS_PATH) == null) {
client.create().creatingParentsIfNeeded()
.forPath(TICKETS_PATH, String.valueOf(initialTickets).getBytes(StandardCharsets.UTF_8));
}
}
/**
* 獲取當(dāng)前可用的票數(shù)。
*
* @return 當(dāng)前可用的票數(shù)
* @throws Exception 如果操作過(guò)程中發(fā)生異常
*/
public synchronized int getCurrentTickets() throws Exception {
byte[] data = client.getData().forPath(TICKETS_PATH);
return Integer.parseInt(new String(data, StandardCharsets.UTF_8));
}
/**
* 嘗試預(yù)訂指定數(shù)量的票。
* 使用分布式鎖確保同一時(shí)間只有一個(gè)實(shí)例能夠更新票數(shù)。
*
* @param quantity 需要預(yù)訂的票數(shù)
* @return 如果預(yù)訂成功則返回 true,否則返回 false
* @throws Exception 如果操作過(guò)程中發(fā)生異常
*/
public boolean bookTicket(int quantity) throws Exception {
InterProcessMutex lock = new InterProcessMutex(client, "/locks/ticket-booking");
try {
// 嘗試在 10 秒內(nèi)獲取鎖
if (lock.acquire(10, TimeUnit.SECONDS)) {
try {
int currentTickets = getCurrentTickets();
if (currentTickets >= quantity) {
int newTickets = currentTickets - quantity;
client.setData().forPath(TICKETS_PATH, String.valueOf(newTickets).getBytes(StandardCharsets.UTF_8));
logger.info("Booked {} tickets. Remaining: {}", quantity, newTickets);
returntrue;
} else {
logger.warn("Insufficient tickets. Current: {}, Requested: {}", currentTickets, quantity);
returnfalse;
}
} finally {
// 確保釋放鎖
lock.release();
}
} else {
logger.warn("Failed to acquire lock for booking tickets.");
returnfalse;
}
} catch (Exception e) {
logger.error("Error booking tickets", e);
throw e;
}
}
}
七、Controller
package com.example.ticketbooking.controller;
import com.example.ticketbooking.service.BookingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class BookingController {
private static final Logger logger = LoggerFactory.getLogger(BookingController.class);
@Autowired
private BookingService bookingService;
/**
* 獲取當(dāng)前可用的票數(shù)。
*
* @return 當(dāng)前可用的票數(shù)
*/
@GetMapping("/get-tickets")
public String getTickets() {
try {
int currentTickets = bookingService.getCurrentTickets();
return"Current Tickets Available: " + currentTickets;
} catch (Exception e) {
logger.error("Error getting ticket count", e);
return"An error occurred while getting ticket count";
}
}
/**
* 處理預(yù)訂票務(wù)的請(qǐng)求。
* 使用分布式鎖確保同一時(shí)間只有一個(gè)實(shí)例能夠處理預(yù)訂請(qǐng)求。
*
* @param quantity 需要預(yù)訂的票數(shù)
* @return 預(yù)訂結(jié)果
*/
@GetMapping("/book-ticket")
public String bookTicket(@RequestParam int quantity) {
try {
boolean success = bookingService.bookTicket(quantity);
if (success) {
return"Successfully booked " + quantity + " tickets";
} else {
return"Failed to book " + quantity + " tickets";
}
} catch (Exception e) {
logger.error("Error booking tickets", e);
return"An error occurred while booking tickets";
}
}
}
八、Application
package com.example.ticketbooking;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TicketBookingApplication {
public static void main(String[] args) {
SpringApplication.run(TicketBookingApplication.class, args);
}
}
九、測(cè)試
確保 ZooKeeper 服務(wù)器正在運(yùn)行。
1. 終端 1
(1) 獲取當(dāng)前票數(shù):
curl http://localhost:8080/get-tickets
Respons:
Current Tickets Available: 100
(2) 預(yù)訂 10 張票:
curl http://localhost:8080/book-ticket?quantity=10
Respons:
Successfully booked 10 tickets
(3) 再次獲取當(dāng)前票數(shù):
curl http://localhost:8080/get-tickets
Respons:
Current Tickets Available: 90
2. 終端 2
(1) 在 Terminal 1 正在處理預(yù)訂請(qǐng)求的同時(shí),在 Terminal 2 中嘗試預(yù)訂 10 張票:
curl http://localhost:8080/book-ticket?quantity=10
Respons:
Successfully booked 10 tickets
(2) 再次獲取當(dāng)前票數(shù):
curl http://localhost:8080/get-tickets
Respons:
Current Tickets Available: 80
3. 嘗試預(yù)訂超過(guò)剩余票數(shù)
(1) 嘗試預(yù)訂超過(guò)剩余票數(shù)的情況:
curl http://localhost:8080/book-ticket?quantity=100
Respons:
Failed to book 100 tickets
(2) 再次獲取當(dāng)前票數(shù):
curl http://localhost:8080/get-tickets
Respons:
Current Tickets Available: 80