Guava并發(fā):ListenableFuture與RateLimiter示例
概念
ListenableFuture顧名思義就是可以監(jiān)聽的Future,它是對java原生Future的擴(kuò)展增強(qiáng)。我們知道Future表示一個(gè)異步計(jì)算任務(wù),當(dāng)任務(wù)完成時(shí)可以得到計(jì)算結(jié)果。如果我們希望一旦計(jì)算完成就拿到結(jié)果展示給用戶或者做另外的計(jì)算,就必須使用另一個(gè)線程不斷的查詢計(jì)算狀態(tài)。這樣做,代碼復(fù)雜,而且效率低下。使用ListenableFuture Guava幫我們檢測Future是否完成了,如果完成就自動調(diào)用回調(diào)函數(shù),這樣可以減少并發(fā)程序的復(fù)雜度。
推薦使用第二種方法,因?yàn)榈诙N方法可以直接得到Future的返回值,或者處理錯(cuò)誤情況。本質(zhì)上第二種方法是通過調(diào)動***種方法實(shí)現(xiàn)的,做了進(jìn)一步的封裝。
另外ListenableFuture還有其他幾種內(nèi)置實(shí)現(xiàn):
1.SettableFuture:不需要實(shí)現(xiàn)一個(gè)方法來計(jì)算返回值,而只需要返回一個(gè)固定值來做為返回值,可以通過程序設(shè)置此Future的返回值或者異常信息。
2.CheckedFuture: 這是一個(gè)繼承自ListenableFuture接口,他提供了checkedGet()方法,此方法在Future執(zhí)行發(fā)生異常時(shí),可以拋出指定類型的異常。
RateLimiter類似于JDK的信號量Semphore,他用來限制對資源并發(fā)訪問的線程數(shù),本文介紹RateLimiter使用。
代碼示例
- import java.util.concurrent.Callable;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
- import com.google.common.util.concurrent.FutureCallback;
- import com.google.common.util.concurrent.Futures;
- import com.google.common.util.concurrent.ListenableFuture;
- import com.google.common.util.concurrent.ListeningExecutorService;
- import com.google.common.util.concurrent.MoreExecutors;
- import com.google.common.util.concurrent.RateLimiter;
- public class ListenableFutureDemo {
- public static void main(String[] args) {
- testRateLimiter();
- testListenableFuture();
- }
- /**
- * RateLimiter類似于JDK的信號量Semphore,他用來限制對資源并發(fā)訪問的線程數(shù)
- */
- public static void testRateLimiter() {
- ListeningExecutorService executorService = MoreExecutors
- .listeningDecorator(Executors.newCachedThreadPool());
- RateLimiter limiter = RateLimiter.create(5.0); // 每秒不超過4個(gè)任務(wù)被提交
- for (int i = 0; i < 10; i++) {
- limiter.acquire(); // 請求RateLimiter, 超過permits會被阻塞
- final ListenableFuture<Integer> listenableFuture = executorService
- .submit(new Task("is "+ i));
- }
- }
- public static void testListenableFuture() {
- ListeningExecutorService executorService = MoreExecutors
- .listeningDecorator(Executors.newCachedThreadPool());
- final ListenableFuture<Integer> listenableFuture = executorService
- .submit(new Task("testListenableFuture"));
- //同步獲取調(diào)用結(jié)果
- try {
- System.out.println(listenableFuture.get());
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- } catch (ExecutionException e1) {
- e1.printStackTrace();
- }
- //***種方式
- listenableFuture.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- System.out.println("get listenable future's result "
- + listenableFuture.get());
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- }
- }, executorService);
- //第二種方式
- Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
- @Override
- public void onSuccess(Integer result) {
- System.out
- .println("get listenable future's result with callback "
- + result);
- }
- @Override
- public void onFailure(Throwable t) {
- t.printStackTrace();
- }
- });
- }
- }
- class Task implements Callable<Integer> {
- String str;
- public Task(String str){
- this.str = str;
- }
- @Override
- public Integer call() throws Exception {
- System.out.println("call execute.." + str);
- TimeUnit.SECONDS.sleep(1);
- return 7;
- }
- }
Guava版本
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>14.0.1</version>
- </dependency>