Java实现自定义阻塞队列

今天重温了下 java 多线程中的 notify() 方法以及 wait() 方法,一时兴起,决定通过这俩个方法,实现一个简易的自定义阻塞队列。

阻塞队列是什么,与普通队列的区别是什么?
阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来。

1.新建一个 MyQueue.java 类

import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;

import com.xiaoleilu.hutool.util.StrUtil;

/**
 * 使用 notify() 和 wait() 实现自定义阻塞队列
 *
 * @author Yangkai.Shen
 * @version 1.0
 * @date 2017.08.02 at 11:51:14
 */
public class MyQueue {

	// 1. 承载数据的容器
	private LinkedList<Object> queue = new LinkedList<Object>();

	// 2. 计数器,用于判定边界
	private AtomicInteger count = new AtomicInteger(0);
	private final int minSize = 0;

	// 3. 初始化一个对象,用于加锁
	private final Object lock = new Object();
	private final int maxSize;

	public MyQueue(int maxSize) {
		this.maxSize = maxSize;
	}

	/**
	 * 添加一个元素到队列中,如果队列元素已满,则调用此方法的线程被阻塞,直到存在多余空间了,再进行添加
	 *
	 * @param obj 添加 obj 到队列尾部
	 */
	public void put(Object obj) {
		synchronized (lock) {
			// 1.没有多余空间,就阻塞线程
			while (count.get() == this.maxSize) {
				try {
					lock.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			// 2.添加元素
			queue.add(obj);
			// 3.计数器累加
			count.incrementAndGet();
			System.out.println(StrUtil.format("新加入的元素为:{}", obj));
			// 4.唤醒其他线程(若本来元素为空,有线程调用 get 方法,那么原本被阻塞的,需要在此时被唤醒)
			lock.notify();
		}
	}

	/**
	 * 获取一个元素,如果队列元素为空,则调用此方法的线程被阻塞,直到添加新元素了,再进行获取
	 *
	 * @return 返回队列的第一个元素
	 */
	public Object get() {
		Object ret = null;

		synchronized (lock) {
			// 1.没有元素,就阻塞线程
			while (count.get() == this.minSize) {
				try {
					lock.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			// 2.取第一个元素
			ret = queue.removeFirst();
			// 3.计数器递减
			count.decrementAndGet();
			System.out.println(StrUtil.format("移除的元素为:{}", ret));
			// 4.唤醒其他线程(若元素本来已满,有线程调用 put 方法,那么原本被阻塞的,需要在此时被唤醒)
			lock.notify();
		}

		return ret;
	}

	public int getSize() {
		return this.count.get();
	}

}

2.新建一个测试类 MyQueueTest.java,测试类中,我们初始化一个队列,并将元素填满,然后启动一个线程 t1,去插入数据,中间休眠 2s,再去启动一个线程 t2 取数据。

import com.xiaoleilu.hutool.util.StrUtil;

import java.util.concurrent.TimeUnit;

public class MyQueueTest {
	public static void main(String[] args) {
		final MyQueue queue = new MyQueue(5);

		queue.put("a");
		queue.put("b");
		queue.put("c");
		queue.put("d");
		queue.put("e");

		System.out.println(StrUtil.format("当前队列的长度: {}", queue.getSize()));
		Thread t1 = new Thread(() -> {
			queue.put("f");
			queue.put("g");
			queue.put("h");
		}, "t1");
 Thread t2 = new Thread(() -> {
  queue.get();
  queue.get();
 });

 t1.start();
 try {
  TimeUnit.SECONDS.sleep(2);
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
 t2.start();
}

3.启动测试类,查看运行结果。控制台如果应该出现的效果是,队列先初始化完成,然后休眠 2s,接下来先取数据,再插入数据,则证明阻塞队列生效。下面是控制台运行的效果:

  • 初始化队列

Java实现自定义阻塞队列

  • 休眠 2s 后取队首元素,再插入队尾元素

Java实现自定义阻塞队列

扫一扫手机访问