招行一面:Java 的線程如何通信?
在 Java中,線程是執(zhí)行的最小單元,那么線程之間是如何通信的呢?這篇文章我們一起來分析五種常用的方式。
- 使用 wait()、notify() 和 notifyAll()
- 使用 BlockingQueue
- Exchanger
- 使用 Locks 和 Condition
- 使用 Semaphore
1. 使用 wait()、notify() 和 notifyAll()
Java的 Object 類提供了 wait()、notify() 和 notifyAll() 方法,這些方法可以用來實(shí)現(xiàn)線程之間的通信,這些方法必須在同步塊或同步方法中調(diào)用。
- **wait()**:使當(dāng)前線程進(jìn)入等待狀態(tài),直到其他線程調(diào)用 notify() 或 notifyAll()。
- **notify()**:喚醒在該對象監(jiān)視器上等待的單個(gè)線程。
- **notifyAll()**:喚醒在該對象監(jiān)視器上等待的所有線程。
示例代碼:
class SharedResource {
private int data;
private boolean hasData = false;
public synchronized void produce(int value) throws InterruptedException {
while (hasData) {
wait();
}
this.data = value;
hasData = true;
notify();
}
public synchronized int consume() throws InterruptedException {
while (!hasData) {
wait();
}
hasData = false;
notify();
return data;
}
}
public class ProducerConsumerExample {
public static void main(String[] args) {
SharedResource resource = new SharedResource();
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
resource.produce(i);
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
int data = resource.consume();
System.out.println("Consumed: " + data);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
2. 使用 BlockingQueue
BlockingQueue 是Java中一個(gè)強(qiáng)大的接口,提供了線程安全的隊(duì)列操作,并且可以在生產(chǎn)者-消費(fèi)者模式中使用。BlockingQueue 不需要顯式地使用同步機(jī)制,它內(nèi)部已經(jīng)處理好了線程同步問題。
示例代碼:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
queue.put(i);
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
int data = queue.take();
System.out.println("Consumed: " + data);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
3. 使用 Locks 和 Condition
Java提供了 java.util.concurrent.locks 包,其中包含了 Lock 接口和 Condition 接口。Condition 提供了類似于 wait()、notify() 和 notifyAll() 的方法,但它們與 Lock 對象一起使用,提供了更靈活的線程通信機(jī)制。
示例代碼:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class SharedResourceWithLock {
private int data;
private boolean hasData = false;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void produce(int value) throws InterruptedException {
lock.lock();
try {
while (hasData) {
condition.await();
}
this.data = value;
hasData = true;
condition.signal();
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (!hasData) {
condition.await();
}
hasData = false;
condition.signal();
return data;
} finally {
lock.unlock();
}
}
}
public class LockConditionExample {
public static void main(String[] args) {
SharedResourceWithLock resource = new SharedResourceWithLock();
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
resource.produce(i);
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
int data = resource.consume();
System.out.println("Consumed: " + data);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
4. 使用 Exchanger
Exchanger 是一個(gè)用于線程間交換數(shù)據(jù)的同步點(diǎn)。兩個(gè)線程可以在此同步點(diǎn)交換數(shù)據(jù),Exchanger 的 exchange() 方法用于在兩個(gè)線程之間交換數(shù)據(jù)。
示例代碼:
import java.util.concurrent.Exchanger;
public class ExchangerExample {
public static void main(String[] args) {
Exchanger<Integer> exchanger = new Exchanger<>();
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
System.out.println("Produced: " + i);
exchanger.exchange(i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
int data = exchanger.exchange(null);
System.out.println("Consumed: " + data);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
5. 使用 Semaphore
Semaphore 是一個(gè)計(jì)數(shù)信號量,通常用于限制對某些資源的訪問。它可以用于控制線程訪問共享資源的數(shù)量,這在某些情況下也可以用作線程間通信的機(jī)制。
示例代碼:
import java.util.concurrent.Semaphore;
class SemaphoreSharedResource {
private int data;
private Semaphore semaphore = new Semaphore(1);
public void produce(int value) throws InterruptedException {
semaphore.acquire();
try {
this.data = value;
System.out.println("Produced: " + value);
} finally {
semaphore.release();
}
}
public int consume() throws InterruptedException {
semaphore.acquire();
try {
System.out.println("Consumed: " + data);
return data;
} finally {
semaphore.release();
}
}
}
public class SemaphoreExample {
public static void main(String[] args) {
SemaphoreSharedResource resource = new SemaphoreSharedResource();
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
resource.produce(i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
resource.consume();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
結(jié)論
本文,我們分析了 Java線程通信的5種常見方式:
- wait()/notify() 是一種低級別的同步機(jī)制,適合需要精細(xì)控制的場合;
- BlockingQueue 和 Exchanger 提供了更高層次的抽象,簡化了線程間的數(shù)據(jù)交換;
- Locks 和 Condition 提供了更靈活的鎖機(jī)制,適合復(fù)雜的同步場景;
- Semaphore 則用于控制資源訪問。
在實(shí)際應(yīng)用中,需要選擇哪種方式取決于具體的應(yīng)用場景和需求。如何你有好的通信方式,歡迎評論區(qū)留言。