自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

深入理解Java線程池,剖析LinkedBlockingQueue源碼實現(xiàn)

開發(fā) 前端
LinkedBlockingQueue初始化的時候,不支持指定是否使用公平鎖,只能使用非公平鎖,而ArrayBlockingQueue是支持指定的。

引言

上篇文章我們講解了ArrayBlockingQueue源碼,這篇文章開始講解LinkedBlockingQueue源碼。從名字上就能看到ArrayBlockingQueue是基于數(shù)組實現(xiàn)的,而LinkedBlockingQueue是基于鏈表實現(xiàn)。

那么,LinkedBlockingQueue底層源碼實現(xiàn)是什么樣的?跟ArrayBlockingQueue有何不同?

LinkedBlockingQueue的應(yīng)用場景跟ArrayBlockingQueue有什么不一樣?

看完這篇文章,可以輕松解答這些問題。

由于LinkedBlockingQueue實現(xiàn)了BlockingQueue接口,而BlockingQueue接口中定義了幾組放數(shù)據(jù)和取數(shù)據(jù)的方法,來滿足不同的場景。

操作

拋出異常

返回特定值

一直阻塞

阻塞指定時間

放數(shù)據(jù)

add()

offer()

put()

offer(e, time, unit)

取數(shù)據(jù)(同時刪除數(shù)據(jù))

remove()

poll()

take()

poll(time, unit)

取數(shù)據(jù)(不刪除)

element()

peek()

不支持

不支持

這四組方法的區(qū)別是:

  1. 當(dāng)隊列滿的時候,再次添加數(shù)據(jù),add()會拋出異常,offer()會返回false,put()會一直阻塞,offer(e, time, unit)會阻塞指定時間,然后返回false。
  2. 當(dāng)隊列為空的時候,再次取數(shù)據(jù),remove()會拋出異常,poll()會返回null,take()會一直阻塞,poll(time, unit)會阻塞指定時間,然后返回null。

LinkedBlockingQueue也會有針對這幾組放數(shù)據(jù)和取數(shù)據(jù)方法的具體實現(xiàn)。 Java線程池中的固定大小線程池就是基于LinkedBlockingQueue實現(xiàn)的:

# 創(chuàng)建固定大小的線程池
ExecutorService executorService = Executors.newFixedThreadPool(10);

對應(yīng)的源碼實現(xiàn):

# 底層使用LinkedBlockingQueue隊列存儲任務(wù)
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

類結(jié)構(gòu)

先看一下LinkedBlockingQueue類里面有哪些屬性:

public class LinkedBlockingQueue<E>
        extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /**
     * 容量大小
     */
    private final int capacity;

    /**
     * 元素個數(shù)
     */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * 頭節(jié)點
     */
    transient Node<E> head;

    /**
     * 尾節(jié)點
     */
    private transient Node<E> last;

    /**
     * 取數(shù)據(jù)的鎖
     */
    private final ReentrantLock takeLock = new ReentrantLock();

    /**
     * 取數(shù)據(jù)的條件(隊列非空)
     */
    private final Condition notEmpty = takeLock.newCondition();

    /**
     * 放數(shù)據(jù)的鎖
     */
    private final ReentrantLock putLock = new ReentrantLock();

    /**
     * 放數(shù)據(jù)的條件(隊列非滿)
     */
    private final Condition notFull = putLock.newCondition();

    /**
     * 鏈表節(jié)點類
     */
    static class Node<E> {
        
        /**
         * 節(jié)點元素
         */
        E item;
        
        /**
         * 后繼節(jié)點
         */
        Node<E> next;

        Node(E x) {
            item = x;
        }
    }
}

圖片圖片

可以看出LinkedBlockingQueue底層是基于鏈表實現(xiàn)的,定義了頭節(jié)點head和尾節(jié)點last,由鏈表節(jié)點類Node可以看出是個單鏈表。 發(fā)現(xiàn)個問題,ArrayBlockingQueue中只使用了一把鎖,入隊出隊操作共用這把鎖。而LinkedBlockingQueue則使用了兩把鎖,分別是出隊鎖takeLock和入隊鎖putLock,為什么要這么設(shè)計呢?

LinkedBlockingQueue把兩把鎖分開,性能更好,為什么ArrayBlockingQueue不這樣設(shè)計呢?

原因是ArrayBlockingQueue是基于數(shù)組實現(xiàn)的,所有數(shù)據(jù)都存儲在同一個數(shù)組對象里面,對同一個對象沒辦法使用兩把鎖,會有數(shù)據(jù)可見性的問題。而LinkedBlockingQueue底層是基于鏈表實現(xiàn)的,從頭節(jié)點刪除,尾節(jié)點插入,頭尾節(jié)點分別是兩個對象,可以分別使用兩把鎖,提升操作性能。

另外也定義了兩個條件notEmpty和notFull,當(dāng)條件滿足的時候才允許放數(shù)據(jù)或者取數(shù)據(jù),下面會詳細(xì)講。

初始化

LinkedBlockingQueue常用的初始化方法有兩個:

  1. 無參構(gòu)造方法
  2. 指定容量大小的有參構(gòu)造方法
/**
 * 無參構(gòu)造方法
 */
BlockingQueue<Integer> blockingQueue1 = new LinkedBlockingQueue<>();

/**
 * 指定容量大小的構(gòu)造方法
 */
BlockingQueue<Integer> blockingQueue2 = new LinkedBlockingQueue<>(10);

再看一下對應(yīng)的源碼實現(xiàn):

/**
 * 無參構(gòu)造方法
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

/**
 * 指定容量大小的構(gòu)造方法
 */
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) {
        throw new IllegalArgumentException();
    }
    // 設(shè)置容量大小,初始化頭尾結(jié)點
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

可以看出LinkedBlockingQueue的無參構(gòu)造方法使用的鏈表容量是Integer的最大值,存儲大量數(shù)據(jù)的時候,會有內(nèi)存溢出的風(fēng)險,建議使用有參構(gòu)造方法,指定容量大小。

有參構(gòu)造方法還會初始化頭尾節(jié)點,節(jié)點值為null。

LinkedBlockingQueue初始化的時候,不支持指定是否使用公平鎖,只能使用非公平鎖,而ArrayBlockingQueue是支持指定的。

放數(shù)據(jù)源碼

放數(shù)據(jù)的方法有四個:

操作

拋出異常

返回特定值

阻塞

阻塞一段時間

放數(shù)據(jù)

add()

offer()

put()

offer(e, time, unit)

offer方法源碼

先看一下offer()方法源碼,其他放數(shù)據(jù)方法邏輯也是大同小異,都是在鏈表尾部插入。 offer()方法在隊列滿的時候,會直接返回false,表示插入失敗。

/**
 * offer方法入口
 *
 * @param e 元素
 * @return 是否插入成功
 */
public boolean offer(E e) {
    // 1. 判空,傳參不允許為null
    if (e == null) {
        throw new NullPointerException();
    }
    // 2. 如果隊列已滿,則直接返回false,表示插入失敗
    final AtomicInteger count = this.count;
    if (count.get() == capacity) {
        return false;
    }
    int c = -1;
    Node<E> node = new Node<E>(e);
    // 3. 獲取put鎖,并加鎖
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        // 4. 加鎖后,再次判斷隊列是否已滿,如果未滿,則入隊
        if (count.get() < capacity) {
            enqueue(node);
            // 5. 隊列個數(shù)加一
            c = count.getAndIncrement();
            // 6. 如果隊列未滿,則喚醒因為隊列已滿而等待放數(shù)據(jù)的線程(用來補(bǔ)償,不加也行)
            if (c + 1 < capacity) {
                notFull.signal();
            }
        }
    } finally {
        // 7. 釋放鎖
        putLock.unlock();
    }
    // 8. c等于0,表示插入前,隊列為空,是第一次插入,需要喚醒因為隊列為空而等待取數(shù)據(jù)的線程
    if (c == 0) {
        signalNotEmpty();
    }
    // 9. 返回是否插入成功
    return c >= 0;
}

/**
 * 入隊
 *
 * @param node 節(jié)點
 */
private void enqueue(LinkedBlockingQueue.Node<E> node) {
    // 直接追加到鏈表末尾
    last = last.next = node;
}

/**
 * 喚醒因為隊列為空而等待取數(shù)據(jù)的線程
 */
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

offer()方法邏輯也很簡單,追加元素到鏈表末尾,如果是第一次添加元素,就喚醒因為隊列為空而等待取數(shù)據(jù)的線程。

再看一下另外三個添加元素方法源碼:

add方法源碼

add()方法在數(shù)組滿的時候,會拋出異常,底層基于offer()實現(xiàn)。

/**
 * add方法入口
 *
 * @param e 元素
 * @return 是否添加成功
 */
public boolean add(E e) {
    if (offer(e)) {
        return true;
    } else {
        throw new IllegalStateException("Queue full");
    }
}

put方法源碼

put()方法在數(shù)組滿的時候,會一直阻塞,直到有其他線程取走數(shù)據(jù),空出位置,才能添加成功。

/**
 * put方法入口
 *
 * @param e 元素
 */
public void put(E e) throws InterruptedException {
    // 1. 判空,傳參不允許為null
    if (e == null) {
        throw new NullPointerException();
    }
    int c = -1;
    Node<E> node = new Node<E>(e);
    // 2. 加可中斷的鎖,防止一直阻塞
    final ReentrantLock putLock = this.putLock;
    putLock.lockInterruptibly();
    final AtomicInteger count = this.count;
    try {
        // 3. 如果隊列已滿,就一直阻塞,直到被喚醒
        while (count.get() == capacity) {
            notFull.await();
        }
        // 4. 如果隊列未滿,則直接入隊
        enqueue(node);
        c = count.getAndIncrement();
        // 5. 如果隊列未滿,則喚醒因為隊列已滿而等待放數(shù)據(jù)的線程(用來補(bǔ)償,不加也行)
        if (c + 1 < capacity) {
            notFull.signal();
        }
    } finally {
        // 6. 釋放鎖
        putLock.unlock();
    }
    // 7. c等于0,表示插入前,隊列為空,是第一次插入,需要喚醒因為隊列為空而等待取數(shù)據(jù)的線程
    if (c == 0) {
        signalNotEmpty();
    }
}

offer(e, time, unit)源碼

再看一下offer(e, time, unit)方法源碼,在數(shù)組滿的時候, offer(e, time, unit)方法會阻塞一段時間。

/**
 * offer方法入口
 *
 * @param e       元素
 * @param timeout 超時時間
 * @param unit    時間單位
 * @return 是否添加成功
 */
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
    // 1. 判空,傳參不允許為null
    if (e == null) {
        throw new NullPointerException();
    }
    // 2. 把超時時間轉(zhuǎn)換為納秒
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final AtomicInteger count = this.count;
    // 2. 加可中斷的鎖,防止一直阻塞
    final ReentrantLock putLock = this.putLock;
    putLock.lockInterruptibly();
    try {
        // 4. 循環(huán)判斷隊列是否已滿
        while (count.get() == capacity) {
            if (nanos <= 0) {
                // 6. 如果隊列已滿,且超時時間已過,則返回false
                return false;
            }
            // 5. 如果隊列已滿,則等待指定時間
            nanos = notFull.awaitNanos(nanos);
        }
        // 7. 如果隊列未滿,則入隊
        enqueue(new Node<E>(e));
        // 8. 如果隊列未滿,則喚醒因為隊列已滿而等待放數(shù)據(jù)的線程(用來補(bǔ)償,不加也行)
        c = count.getAndIncrement();
        if (c + 1 < capacity) {
            notFull.signal();
        }
    } finally {
        // 9. 釋放鎖
        putLock.unlock();
    }
    // 10. c等于0,表示插入前,隊列為空,是第一次插入,需要喚醒因為隊列為空而等待取數(shù)據(jù)的線程
    if (c == 0) {
        signalNotEmpty();
    }
    return true;
}

彈出數(shù)據(jù)源碼

彈出數(shù)據(jù)(取出數(shù)據(jù)并刪除)的方法有四個:

操作

拋出異常

返回特定值

阻塞

阻塞一段時間

取數(shù)據(jù)(同時刪除數(shù)據(jù))

remove()

poll()

take()

poll(time, unit)

poll方法源碼

看一下poll()方法源碼,其他方取數(shù)據(jù)法邏輯大同小異,都是從鏈表頭部彈出元素。 poll()方法在彈出元素的時候,如果隊列為空,直接返回null,表示彈出失敗。

/**
 * poll方法入口
 */
public E poll() {
    // 如果隊列為空,則返回null
    final AtomicInteger count = this.count;
    if (count.get() == 0) {
        return null;
    }
    E x = null;
    int c = -1;
    // 2. 加鎖
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // 3. 如果隊列不為空,則取出隊頭元素
        if (count.get() > 0) {
            x = dequeue();
            // 4. 元素個數(shù)減一
            c = count.getAndDecrement();
            // 5. 如果隊列不為空,則喚醒因為隊列為空而等待取數(shù)據(jù)的線程
            if (c > 1) {
                notEmpty.signal();
            }
        }
    } finally {
        // 6. 釋放鎖
        takeLock.unlock();
    }
    // 7. 如果取數(shù)據(jù)之前,隊列已滿,取數(shù)據(jù)之后隊列肯定不滿了,則喚醒因為隊列已滿而等待放數(shù)據(jù)的線程
    if (c == capacity) {
        signalNotFull();
    }
    return x;
}

/**
 * 取出隊頭元素
 */
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h;
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

/**
 * 喚醒因為隊列已滿而等待放數(shù)據(jù)的線程
 */
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

remove方法源碼

再看一下remove()方法源碼,如果隊列為空,remove()會拋出異常。

/**
 * remove方法入口
 */
public E remove() {
    // 1. 直接調(diào)用poll方法
    E x = poll();
    // 2. 如果取到數(shù)據(jù),直接返回,否則拋出異常
    if (x != null) {
        return x;
    } else {
        throw new NoSuchElementException();
    }
}

take方法源碼

再看一下take()方法源碼,如果隊列為空,take()方法就一直阻塞,直到被喚醒。

/**
 * take方法入口
 */
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    // 1. 加可中斷的鎖,防止一直阻塞
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 2. 如果隊列為空,就一直阻塞,直到被喚醒
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 3. 如果隊列不為空,則取出隊頭元素
        x = dequeue();
        // 4. 隊列元素個數(shù)減一
        c = count.getAndDecrement();
        // 5. 如果隊列不為空,則喚醒因為隊列為空而等待取數(shù)據(jù)的線程
        if (c > 1) {
            notEmpty.signal();
        }
    } finally {
        // 6. 釋放鎖
        takeLock.unlock();
    }
    // 7. 如果取數(shù)據(jù)之前,隊列已滿,取數(shù)據(jù)之后隊列肯定不滿了,則喚醒因為隊列已滿而等待放數(shù)據(jù)的線程
    if (c == capacity) {
        signalNotFull();
    }
    return x;
}

poll(time, unit)源碼

再看一下poll(time, unit)方法源碼,在隊列滿的時候, poll(time, unit)方法會阻塞指定時間,然后然后null。

/**
 * poll方法入口
 *
 * @param timeout 超時時間
 * @param unit    時間單位
 * @return 元素
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    // 1. 把超時時間轉(zhuǎn)換成納秒
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    // 2. 加可中斷的鎖,防止一直阻塞
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 3. 循環(huán)判斷隊列是否為空
        while (count.get() == 0) {
            if (nanos <= 0) {
                // 5. 如果隊列為空,且超時時間已過,則返回null
                return null;
            }
            // 4. 阻塞到到指定時間
            nanos = notEmpty.awaitNanos(nanos);
        }
        // 6. 如果隊列不為空,則取出隊頭元素
        x = dequeue();
        // 7. 隊列元素個數(shù)減一
        c = count.getAndDecrement();
        // 8. 如果隊列不為空,則喚醒因為隊列為空而等待取數(shù)據(jù)的線程
        if (c > 1) {
            notEmpty.signal();
        }
    } finally {
        // 9. 釋放鎖
        takeLock.unlock();
    }
    // 7. 如果取數(shù)據(jù)之前,隊列已滿,取數(shù)據(jù)之后隊列肯定不滿了,則喚醒因為隊列已滿而等待放數(shù)據(jù)的線程
    if (c == capacity) {
        signalNotFull();
    }
    return x;
}

查看數(shù)據(jù)源碼

再看一下查看數(shù)據(jù)源碼,查看數(shù)據(jù),并不刪除數(shù)據(jù)。

操作

拋出異常

返回特定值

阻塞

阻塞一段時間

取數(shù)據(jù)(不刪除)

element()

peek()

不支持

不支持

peek方法源碼

先看一下peek()方法源碼,如果數(shù)組為空,直接返回null。

/**
 * peek方法入口
 */
public E peek() {
    // 1. 如果隊列為空,則返回null
    if (count.get() == 0) {
        return null;
    }
    // 2. 加鎖
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // 3. 取出隊頭元素
        Node<E> first = head.next;
        if (first == null) {
            return null;
        } else {
            return first.item;
        }
    } finally {
        // 4. 釋放鎖
        takeLock.unlock();
    }
}

element方法源碼

再看一下element()方法源碼,如果隊列為空,則拋出異常。

/**
 * element方法入口
 */
public E element() {
    // 1. 調(diào)用peek方法查詢數(shù)據(jù)
    E x = peek();
    // 2. 如果查到數(shù)據(jù),直接返回
    if (x != null) {
        return x;
    } else {
        // 3. 如果沒找到,則拋出異常
        throw new NoSuchElementException();
    }
}

總結(jié)

這篇文章講解了LinkedBlockingQueue阻塞隊列的核心源碼,了解到LinkedBlockingQueue隊列具有以下特點:

  1. LinkedBlockingQueue實現(xiàn)了BlockingQueue接口,提供了四組放數(shù)據(jù)和讀數(shù)據(jù)的方法,來滿足不同的場景。
  2. LinkedBlockingQueue底層基于鏈表實現(xiàn),支持從頭部彈出數(shù)據(jù),從尾部添加數(shù)據(jù)。
  3. LinkedBlockingQueue初始化的時候,如果不指定隊列長度,默認(rèn)長度是Integer最大值,有內(nèi)存溢出風(fēng)險,建議初始化的時候指定隊列長度。
  4. LinkedBlockingQueue的方法是線程安全的,分別使用了讀寫兩把鎖,比ArrayBlockingQueue性能更好。

那么ArrayBlockingQueue與LinkedBlockingQueue區(qū)別是什么?相同點:

  1. 都是繼承自AbstractQueue抽象類,并實現(xiàn)了BlockingQueue接口,所以兩者擁有相同的讀寫方法,出現(xiàn)的地方可以相互替換。

不同點:

  1. 底層結(jié)構(gòu)不同,ArrayBlockingQueue底層基于數(shù)組實現(xiàn),初始化的時候必須指定數(shù)組長度,無法擴(kuò)容。LinkedBlockingQueue底層基于鏈表實現(xiàn),鏈表最大長度是Integer最大值。
  2. 占用內(nèi)存大小不同,ArrayBlockingQueue一旦初始化,數(shù)組長度就確定了,不會隨著元素增加而改變。LinkedBlockingQueue會隨著元素越多,鏈表越長,占用內(nèi)存越大。
  3. 性能不同,ArrayBlockingQueue的入隊和出隊共用一把鎖,并發(fā)較低。LinkedBlockingQueue入隊和出隊使用兩把獨立的鎖,并發(fā)情況下性能更高。
  4. 公平鎖選項,ArrayBlockingQueue初始化的時候,可以指定使用公平鎖或者非公平鎖,公平鎖模式下,可以按照線程等待的順序來操作隊列。LinkedBlockingQueue只支持非公平鎖。
  5. 適用場景不同,ArrayBlockingQueue適用于明確限制隊列大小的場景,防止生產(chǎn)速度大于消費速度的時候,造成內(nèi)存溢出、資源耗盡。LinkedBlockingQueue適用于業(yè)務(wù)高峰期可以自動擴(kuò)展消費速度的場景。

今天一起分析了LinkedBlockingQueue隊列的源碼,可以看到LinkedBlockingQueue的源碼非常簡單,沒有什么神秘復(fù)雜的東西,下篇文章再一起接著分析其他的阻塞隊列源碼。

責(zé)任編輯:武曉燕 來源: 一燈架構(gòu)
相關(guān)推薦

2022-11-11 10:48:55

AQS源碼架構(gòu)

2022-09-05 22:22:00

Stream操作對象

2018-10-31 15:54:47

Java線程池源碼

2016-10-26 20:49:24

ReactJavascript前端

2021-01-06 14:15:42

線程池Java代碼

2024-03-04 15:05:37

2024-12-31 09:00:12

Java線程狀態(tài)

2021-05-26 11:30:24

Java線程池代碼

2025-02-24 08:00:00

線程池Java開發(fā)

2024-01-09 08:28:44

應(yīng)用多線程技術(shù)

2024-06-06 09:58:13

2017-12-18 16:33:55

多線程對象模型

2018-03-14 15:20:05

Java多線程勘誤

2010-06-01 15:25:27

JavaCLASSPATH

2016-12-08 15:36:59

HashMap數(shù)據(jù)結(jié)構(gòu)hash函數(shù)

2020-07-21 08:26:08

SpringSecurity過濾器

2021-09-18 06:56:01

JavaCAS機(jī)制

2023-09-19 22:47:39

Java內(nèi)存

2009-06-19 14:10:42

Java多態(tài)性

2013-09-22 14:57:19

AtWood
點贊
收藏

51CTO技術(shù)棧公眾號