博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java集合, ArrayBlockingQueue源码解析(常用于并发编程)
阅读量:6720 次
发布时间:2019-06-25

本文共 4596 字,大约阅读时间需要 15 分钟。

  hot3.png

本文主要就其中的一个实现类:ArrayBlockingQueue进行源码分析,分析阻塞队列的阻塞是如何实现的。

概述

ArrayBlockingQueue底层是使用一个数组实现队列的,并且在构造ArrayBlockingQueue时需要指定容量,也就意味着底层数组一旦创建了,容量就不能改变了,因此ArrayBlockingQueue是一个容量限制的阻塞队列。因此,在队列全满时执行入队将会阻塞,在队列为空时出队同样将会阻塞。

源码分析

数据结构

public class ArrayBlockingQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable {

重要字段

ArrayBlockingQueue的重要字段有如下几个:

/** The queued items */    final Object[] items;    /** Main lock guarding all access */    final ReentrantLock lock;    /** Condition for waiting takes */    private final Condition notEmpty;    /** Condition for waiting puts */    private final Condition notFull;

上面代码中的items数组就代表的是队列,ReentrantLock和两个Condition都是用于用于并发的,并且这几个字段都是final的,意味着ArrayBloackingQueue初始化时就必须完成赋值。

ArrayBlockingQueue中有几个int型的字段表示当前操作items数组的索引,如下:

//记录下一个take、remove、peek的索引    int takeIndex;    //记录下一个put、offer、add的索引    int putIndex;    //队列中元素的个数    int count;

构造方法

ArrayBlockingQueue一共有三个构造方法,如下:

//只指定容量    public ArrayBlockingQueue(int capacity) {        this(capacity, false);    }    //指定容量和ReentrantLock是否公平    public ArrayBlockingQueue(int capacity, boolean fair) {        if (capacity <= 0)            throw new IllegalArgumentException();        this.items = new Object[capacity];        lock = new ReentrantLock(fair);        notEmpty = lock.newCondition();        notFull =  lock.newCondition();    }    //将集合中的元素初始化队列的元素    public ArrayBlockingQueue(int capacity, boolean fair,                              Collection
c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }

从上面的代码可以看到,构造方法主要使用容量对items数组完成初始化,fair参数用来构造一个公平的或不公平的ReentrantLock。

另外一个构造方法就是使用集合中的元素初始化队列中的元素。

put(E e)方法

put(E e)方法在队列不满的情况下,将会将元素添加到队列尾部,如果队列已满,将会阻塞,直到队列中有剩余空间可以插入。该方法的实现如下:

从上面代码可以看到几点:

1. ArrayBlockingQueue不允许元素为null
2. ArrayBlockingQueue在队列已满时将会调用notFull的await()方法释放锁并处于阻塞状态
3. 一旦ArrayBlockingQueue不为满的状态,就将元素入队

下面首先看一下enqueue方法。

enqueue(E e)方法

enqueue()方法用于将元素插入到队列中,由于有元素进入了队列,所以就通知了为空的Condition,释放了因队列为空而阻塞的线程。代码如下:

private void enqueue(E x) {        // assert lock.getHoldCount() == 1;        // assert items[putIndex] == null;        final Object[] items = this.items;        items[putIndex] = x;        if (++putIndex == items.length)            putIndex = 0;        count++;        notEmpty.signal();    }

从上面的代码可以看到,底层的数组使用的循环插入的方式;当一旦插入一个元素后,将调用notEmpty.signal()。

当调用put()方法时,由于会首先对Lock加锁,然后再执行插入,所以当很多线程一起插入时,是线程安全的;而一旦进入lock块中,当当前队列已满时,该线程就会被阻塞,直到队列不再为满的时候,可以重新获取到锁执行插入;在插入之后,由于新加了一个元素,需要通知因为空而阻塞的线程,所以需要调用notEmpty的signal方法。

E take()方法

take()方法用于取走队头的元素,当队列为空时将会阻塞,直到队列中有元素可取走时将会被释放。其实现如下:

public E take() throws InterruptedException {        final ReentrantLock lock = this.lock;        //首先加锁        lock.lockInterruptibly();        try {            //如果队列为空,阻塞            while (count == 0)                notEmpty.await();            //队列不为空,调用dequeue()出队            return dequeue();        } finally {            //释放锁            lock.unlock();        }    }

从上面可以看到take()流程和put()流程类似,一旦获得了锁之后,如果队列为空,那么将阻塞;否则调用dequeue()出队一个元素。

下面看一下dequeue()方法。

dequeu()方法

private E dequeue() {        // assert lock.getHoldCount() == 1;        // assert items[takeIndex] != null;        final Object[] items = this.items;        @SuppressWarnings("unchecked")        //取走数据        E x = (E) items[takeIndex];        //置为null,以助gc        items[takeIndex] = null;        //循环取        if (++takeIndex == items.length)            takeIndex = 0;        count--;        if (itrs != null)            itrs.elementDequeued();        //通知因队列满而阻塞的线程        notFull.signal();        return x;    }

代码中注释已经说明了dequeu的流程。

下面两个过程联合起来看,如果队列已满,那么调用put时,因为调用了notFull.await(),那么那个线程将会放弃锁进入到阻塞状态,这时一个线程取走了一个数据,调用了notFull.signal(),这时上一个线程有可能就被释放了然后重新获得了锁,调用了enqueue()方法将元素插入到队列中;如果队列为空,执行take(),那么由于调用了notEmpty.await(),该线程将会被阻塞,这时另一个线程执行了put()方法插入了一个元素,然后调用了notEmpty.signal(),这时取走线程被释放了重新获取了锁取走了数据。这基本就是ArrayBlockingQueue的阻塞实现原理。

总结

根据分析源码可知,ArrayBlockingQueue的并发阻塞是通过ReentrantLock和Condition来实现的。

ArrayBlockingQueue内部只有一把锁,意味着同一时刻只有一个线程能进行入队或者出队的操作。

转载于:https://my.oschina.net/90888/blog/1624500

你可能感兴趣的文章
开始学习silverlight
查看>>
php使用递归计算目录大小
查看>>
EF 直接修改数据,不再查询数据库
查看>>
script标签加载js代码的一些知识
查看>>
The builder launch configuration could not be found
查看>>
linux 安装软件的地方
查看>>
bond网卡绑定(centos6.5 + centos 7)
查看>>
Leetcode-Letter Combinations of a Phone Number
查看>>
压测 linux + jexus + mono + asp.net mvc
查看>>
成功交付离岸项目
查看>>
[转]ArcGIS 10.1创建切片缓存方法工具总结
查看>>
垃圾回收与内存泄漏
查看>>
旧博客欢迎莅临
查看>>
深度学习原理与框架-卷积网络细节-迁移学习 1.冻结层数,只进行部分层的训练...
查看>>
C# 中的invoke和begininvoke
查看>>
通用类 IPScanner 利用纯真IP数据库(QQWry.dat)查询IP所在地
查看>>
http请求报文结构
查看>>
人不成熟的五个特征
查看>>
HTTP===返回结果的HTTP状态码
查看>>
How to change the computer name on Ubuntu
查看>>