聊聊保證線程安全的幾個(gè)小技巧
前言
對(duì)于從事后端開發(fā)的同學(xué)來說,線程安全問題是我們每天都需要考慮的問題。
線程安全問題通俗的講:主要是在多線程的環(huán)境下,不同線程同時(shí)讀和寫公共資源(臨界資源),導(dǎo)致的數(shù)據(jù)異常問題。
比如:變量a=0,線程1給該變量+1,線程2也給該變量+1。此時(shí),線程3獲取a的值有可能不是2,而是1。線程3這不就獲取了錯(cuò)誤的數(shù)據(jù)?
線程安全問題會(huì)直接導(dǎo)致數(shù)據(jù)異常,從而影響業(yè)務(wù)功能的正常使用,所以這個(gè)問題還是非常嚴(yán)重的。
那么,如何解決線程安全問題呢?
今天跟大家一起聊聊,保證線程安全的10個(gè)小技巧,希望對(duì)你有所幫助。
1. 無狀態(tài)
我們都知道只有多個(gè)線程訪問公共資源的時(shí)候,才可能出現(xiàn)數(shù)據(jù)安全問題,那么如果我們沒有公共資源,是不是就沒有這個(gè)問題呢?
例如:
public class NoStatusService {
public void add(String status) {
System.out.println("add status:" + status);
}
public void update(String status) {
System.out.println("update status:" + status);
}
}
這個(gè)例子中NoStatusService沒有定義公共資源,換句話說是無狀態(tài)的。
這種場(chǎng)景中,NoStatusService類肯定是線程安全的。
2. 不可變
如果多個(gè)線程訪問的公共資源是不可變的,也不會(huì)出現(xiàn)數(shù)據(jù)的安全性問題。
例如:
public class NoChangeService {
public static final String DEFAULT_NAME = "abc";
public void add(String status) {
System.out.println(DEFAULT_NAME);
}
}
DEFAULT_NAME被定義成了static final的常量,在多線程中環(huán)境中不會(huì)被修改,所以這種情況,也不會(huì)出現(xiàn)線程安全問題。
3. 無修改權(quán)限
有時(shí)候,我們定義了公共資源,但是該資源只暴露了讀取的權(quán)限,沒有暴露修改的權(quán)限,這樣也是線程安全的。
例如:
public class SafePublishService {
private String name;
public String getName() {
return name;
}
public void add(String status) {
System.out.println("add status:" + status);
}
}
這個(gè)例子中,沒有對(duì)外暴露修改name字段的入口,所以不存在線程安全問題。
4. synchronized
使用JDK內(nèi)部提供的同步機(jī)制,這也是使用比較多的手段,分為:同步方法 和 同步代碼塊。
我們優(yōu)先使用同步代碼塊,因?yàn)橥椒椒ǖ牧6仁钦麄€(gè)方法,范圍太大,相對(duì)來說,更消耗代碼的性能。
其實(shí),每個(gè)對(duì)象內(nèi)部都有一把鎖,只有搶到那把鎖的線程,才被允許進(jìn)入對(duì)應(yīng)的代碼塊執(zhí)行相應(yīng)的代碼。
當(dāng)代碼塊執(zhí)行完之后,JVM底層會(huì)自動(dòng)釋放那把鎖。
例如:
public class SyncService {
private int age = 1;
private Object object = new Object();
//同步方法
public synchronized void add(int i) {
age = age + i;
System.out.println("age:" + age);
}
public void update(int i) {
//同步代碼塊,對(duì)象鎖
synchronized (object) {
age = age + i;
System.out.println("age:" + age);
}
}
public void update(int i) {
//同步代碼塊,類鎖
synchronized (SyncService.class) {
age = age + i;
System.out.println("age:" + age);
}
}
}
5. Lock
除了使用synchronized關(guān)鍵字實(shí)現(xiàn)同步功能之外,JDK還提供了Lock接口,這種顯示鎖的方式。
通常我們會(huì)使用Lock接口的實(shí)現(xiàn)類:ReentrantLock,它包含了:公平鎖、非公平鎖、可重入鎖、讀寫鎖 等更多更強(qiáng)大的功能。
例如:
public class LockService {
private ReentrantLock reentrantLock = new ReentrantLock();
public int age = 1;
public void add(int i) {
try {
reentrantLock.lock();
age = age + i;
System.out.println("age:" + age);
} finally {
reentrantLock.unlock();
}
}
}
但如果使用ReentrantLock,它也帶來了有個(gè)小問題就是:需要在finally代碼塊中手動(dòng)釋放鎖。
不過說句實(shí)話,在使用Lock顯示鎖的方式,解決線程安全問題,給開發(fā)人員提供了更多的靈活性。
6. 分布式鎖
如果是在單機(jī)的情況下,使用synchronized和Lock保證線程安全是沒有問題的。
但如果在分布式的環(huán)境中,即某個(gè)應(yīng)用如果部署了多個(gè)節(jié)點(diǎn),每一個(gè)節(jié)點(diǎn)使用可以synchronized和Lock保證線程安全,但不同的節(jié)點(diǎn)之間,沒法保證線程安全。
這就需要使用:分布式鎖了。
分布式鎖有很多種,比如:數(shù)據(jù)庫(kù)分布式鎖,zookeeper分布式鎖,redis分布式鎖等。
其中我個(gè)人更推薦使用redis分布式鎖,其效率相對(duì)來說更高一些。
使用redis分布式鎖的偽代碼如下:
try{
String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);
if ("OK".equals(result)) {
return true;
}
return false;
} finally {
unlock(lockKey);
}
同樣需要在finally代碼塊中釋放鎖。
如果你對(duì)redis分布式鎖的用法和常見的坑,比較感興趣的話,可以看看我的另一篇文章《聊聊redis分布式鎖的8大坑》,里面有更詳細(xì)的介紹。
7. volatile
有時(shí)候,我們有這樣的需求:如果在多個(gè)線程中,有任意一個(gè)線程,把某個(gè)開關(guān)的狀態(tài)設(shè)置為false,則整個(gè)功能停止。
簡(jiǎn)單的需求分析之后發(fā)現(xiàn):只要求多個(gè)線程間的可見性,不要求原子性。
如果一個(gè)線程修改了狀態(tài),其他的所有線程都能獲取到最新的狀態(tài)值。
這樣一分析這就好辦了,使用volatile就能快速滿足需求。
例如:
@Service
public CanalService {
private volatile boolean running = false;
private Thread thread;
@Autowired
private CanalConnector canalConnector;
public void handle() {
//連接canal
while(running) {
//業(yè)務(wù)處理
}
}
public void start() {
thread = new Thread(this::handle, "name");
running = true;
thread.start();
}
public void stop() {
if(!running) {
return;
}
running = false;
}
}
需要特別注意的地方是:volatile不能用于計(jì)數(shù)和統(tǒng)計(jì)等業(yè)務(wù)場(chǎng)景。因?yàn)関olatile不能保證操作的原子性,可能會(huì)導(dǎo)致數(shù)據(jù)異常。
8. ThreadLocal
除了上面幾種解決思路之外,JDK還提供了另外一種用空間換時(shí)間的新思路:ThreadLocal。
當(dāng)然ThreadLocal并不能完全取代鎖,特別是在一些秒殺更新庫(kù)存中,必須使用鎖。
ThreadLocal的核心思想是:共享變量在每個(gè)線程都有一個(gè)副本,每個(gè)線程操作的都是自己的副本,對(duì)另外的線程沒有影響。
溫馨提醒一下:我們平常在使用ThreadLocal時(shí),如果使用完之后,一定要記得在finally代碼塊中,調(diào)用它的remove方法清空數(shù)據(jù),不然可能會(huì)出現(xiàn)內(nèi)存泄露問題。
例如:
public class ThreadLocalService {
private ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
public void add(int i) {
Integer integer = threadLocal.get();
threadLocal.set(integer == null ? 0 : integer + i);
}
}
如果對(duì)ThreadLocal感興趣的小伙伴,可以看看我的另一篇文章《ThreadLocal奪命11連問》,里面有對(duì)ThreadLocal的原理、用法和坑,有非常詳細(xì)的介紹。
9. 線程安全集合
有時(shí)候,我們需要使用的公共資源放在某個(gè)集合當(dāng)中,比如:ArrayList、HashMap、HashSet等。
如果在多線程環(huán)境中,有線程往這些集合中寫數(shù)據(jù),另外的線程從集合中讀數(shù)據(jù),就可能會(huì)出現(xiàn)線程安全問題。
為了解決集合的線程安全問題,JDK專門給我們提供了能夠保證線程安全的集合。
比如:CopyOnWriteArrayList、ConcurrentHashMap、CopyOnWriteArraySet、ArrayBlockingQueue等等。
例如:
public class HashMapTest {
private static ConcurrentHashMap<String, Object> hashMap = new ConcurrentHashMap<>();
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
hashMap.put("key1", "value1");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
hashMap.put("key2", "value2");
}
}).start();
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(hashMap);
}
}
在JDK底層,或者spring框架當(dāng)中,使用ConcurrentHashMap保存加載配置參數(shù)的場(chǎng)景非常多。
比較出名的是spring的refresh方法中,會(huì)讀取配置文件,把配置放到很多的ConcurrentHashMap緩存起來。
10. CAS
JDK除了使用鎖的機(jī)制解決多線程情況下數(shù)據(jù)安全問題之外,還提供了CAS機(jī)制。
這種機(jī)制是使用CPU中比較和交換指令的原子性,JDK里面是通過Unsafe類實(shí)現(xiàn)的。
CAS內(nèi)部包含了四個(gè)值:舊數(shù)據(jù)、期望數(shù)據(jù)、新數(shù)據(jù) 和 地址,比較舊數(shù)據(jù) 和 期望的數(shù)據(jù),如果一樣的話,就把舊數(shù)據(jù)改成新數(shù)據(jù)。如果不一樣的話,當(dāng)前線程不斷自旋,一直到成功為止。
不過,使用CAS保證線程安全,可能會(huì)出現(xiàn)ABA問題,需要使用AtomicStampedReference增加版本號(hào)解決。
其實(shí),實(shí)際工作中很少直接使用Unsafe類的,一般用atomic包下面的類即可。
public class AtomicService {
private AtomicInteger atomicInteger = new AtomicInteger();
public int add(int i) {
return atomicInteger.getAndAdd(i);
}
}
11. 數(shù)據(jù)隔離
有時(shí)候,我們?cè)诓僮骷蠑?shù)據(jù)時(shí),可以通過數(shù)據(jù)隔離,來保證線程安全。
例如:
public class ThreadPoolTest {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(8, //corePoolSize線程池中核心線程數(shù)
10, //maximumPoolSize 線程池中最大線程數(shù)
60, //線程池中線程的最大空閑時(shí)間,超過這個(gè)時(shí)間空閑線程將被回收
TimeUnit.SECONDS,//時(shí)間單位
new ArrayBlockingQueue(500), //隊(duì)列
new ThreadPoolExecutor.CallerRunsPolicy()); //拒絕策略
List<User> userList = Lists.newArrayList(
new User(1L, "蘇三", 18, "成都"),
new User(2L, "蘇三說技術(shù)", 20, "四川"),
new User(3L, "技術(shù)", 25, "云南"));
for (User user : userList) {
threadPool.submit(new Work(user));
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(userList);
}
static class Work implements Runnable {
private User user;
public Work(User user) {
this.user = user;
}
@Override
public void run() {
user.setName(user.getName() + "測(cè)試");
}
}
}
這個(gè)例子中,使用線程池處理用戶信息。
每個(gè)用戶只被線程池中的一個(gè)線程處理,不存在多個(gè)線程同時(shí)處理一個(gè)用戶的情況。所以這種人為的數(shù)據(jù)隔離機(jī)制,也能保證線程安全。
數(shù)據(jù)隔離還有另外一種場(chǎng)景:kafka生產(chǎn)者把同一個(gè)訂單的消息,發(fā)送到同一個(gè)partion中。每一個(gè)partion都部署一個(gè)消費(fèi)者,在kafka消費(fèi)者中,使用單線程接收消息,并且做業(yè)務(wù)處理。
這種場(chǎng)景下,從整體上看,不同的partion是用多線程處理數(shù)據(jù)的,但同一個(gè)partion則是用單線程處理的,所以也能解決線程安全問題。