任务串行-非阻塞队列+线程锁和阻塞队列

前言

最近公司项目涉及到硬件,需要给机器发送指令,过程是这样的:

  1. 首先是发送指令不能并发执行,要等到该指令响应结束才能发起第二条指令
  2. 两台设备一起执行任务,会有一台丢失响应数据,所以过个设备需要逐个执行任务
    上述两种情况都再引导我要使用队列了

并行和并发区别

1、并行是指两者同时执行一件事,比如赛跑,两个人都在不停的往前跑;
2、并发是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高效率

队列属性

队列实现的是一种先入先出(FIFO)策略,队列的先入先出特性如同生活中的排队现象一样。因为队列既有队头,又有队尾,所以在实现是需要一个数组和两个分别指向对头和队尾的属性。关于队列属性,现有以下说明:

  1. 队头属性指向队头元素所在位置,队尾属性指向队尾元素的下一个位置,即下一个元素即将插入的位置。
  2. 为防止队列的队头移动会引起数组前端空间的浪费,一般设置队头和队尾两属性循环移动。
  3. 当队尾元素加一取模等于队头元素时,说明此队列存储空间已满。
  4. 当队头元素和队尾元素相等时,说明此队列为空。

队列实现

ConcurrentLinkedQueue

无界限线程安全, 非阻塞,基于链接节点,元素按FIFO原则进行排序,不允许使用null元素,否则抛NullPointerException,常用函数:

  1. add(E e): 内部执行的是offer()函数
  2. offer(E e): 在队列的尾部插入指定的元素。由于队列是无界的,此方法将永远不会返回false
  3. poll(): 从队列取出元素并且删除该元素
  4. peek(): 获取但不移除此队列的头
  5. remove(Object o): 从队列中移除制定的元素,如果存在返回true 反之false
  6. isEmpty():
  7. size():
  8. contains(Object o): 如果此队列包含指定元素,则返回 true

详细原理参考: http://ifeve.com/concurrentlinkedqueue/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

/**
* Created by lkk on 2018/3/28.
*/

public class QueueLinkedJob {

QueueJobListener listener;

private volatile boolean isRunning = false;
/**
* 任务队列
*/
private final Queue<Object> sWriteQueue = new ConcurrentLinkedQueue<>();

/**
* 线程锁
*/
private final Object lock = new Object();

public QueueLinkedJob(QueueJobListener listener) {
this.listener = listener;

}

/**
* 添加指令任务
*/
public void addJobHex(Object... hexs) {
for (Object hex : hexs) {
sWriteQueue.offer(hex);
}
}

/**
* 按顺序(先进先出)
* 取出指令并且删除
*/
public void pollJobHex(String hex) {
sWriteQueue.poll();
}

/**
* 取出某一项指令并且删除
*/
public void removeJobHex(String hex) {
sWriteQueue.remove(hex);
}

public boolean isRunning() {
return isRunning;
}

public void startJob() {
if (!isRunning) {
ThreadPool.execute(new Runnable() {
@Override
public void run() {
isRunning = true;
try {
while (true) {
Thread.sleep(200);
Object hex = sWriteQueue.poll();
if (hex != null) {
listener.executeJobHex(hex);
synchronized (lock) {
lock.wait();
}
} else {
break;
}
}

} catch (InterruptedException e) {
e.printStackTrace();
}
isRunning = false;
}
});
}
}


/**
* 完成被执行完成任务
* 唤醒线程 执行下一条任务
* 注: 任务完成必须调用该函数否则将无法执行下一条指令
*/
public void completeJob() {
synchronized (lock) {
lock.notify();
}
}

}

LinkedBlockingQueue
线程安全,阻塞队列,可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE

阻塞队列概要:

  1. 阻塞添加-所谓的阻塞添加是指当阻塞队列元素已满时,队列会阻塞加入元素的线程,直队列元素不满时才重新唤醒线程执行元素加入操作。
  2. 阻塞删除-阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作(一般都会返回被删除的元素)

操作函数:

  1. add(E e) : 添加成功返回true,失败抛IllegalStateException异常
  2. offer(E e) : 成功返回 true,如果此队列已满,则返回 false。
  3. put(E e) :将元素插入此队列的尾部,如果该队列已满,则一直阻塞
  4. remove(Object o) :移除指定元素,成功返回true,失败返回false
  5. poll() : 获取并移除此队列的头元素,若队列为空,则返回 null
  6. take():获取并移除此队列头元素,若没有元素则一直阻塞。
  7. element() :获取但不移除此队列的头元素,没有元素则抛异常
  8. peek() :获取但不移除此队列的头;若队列为空,则返回 null。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

public class QueueLinkedTest {

private static final LinkedBlockingQueue<String> sWriteQueue = new LinkedBlockingQueue<String>(1);

public static void main(String[] args) {

new Thread(){
public void run() {
try {
boolean result = false;
for(int i = 0; i < 10; i++){
// hread.sleep(500);
// result = sWriteQueue.offer("job: " + i);
sWriteQueue.put("job: " + i);
System.out.println("-------->result: " + i);
}

} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

};
}.start();

new Thread(){
public void run() {
try {

while(true){
Thread.sleep(2000);
String job = sWriteQueue.poll();
if(null != job){
System.out.println("-------->job: " + job);
}else{
System.out.println("-------->job-break");
break;
}

}

} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

};
}.start();

}
}

打印结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
-------->result: 0
-------->job: job: 0
-------->result: 1
-------->job: job: 1
-------->result: 2
-------->job: job: 2
-------->result: 3
-------->job: job: 3
-------->result: 4
-------->job: job: 4
-------->result: 5
-------->job: job: 5
-------->result: 6
-------->job: job: 6
-------->result: 7
-------->job: job: 7
-------->result: 8
-------->job: job: 8
-------->result: 9
-------->job: job: 9
-------->job-break

还有以下实例我都加入了API超链接,就不一一举例

ArrayBlockingQueue
DelayQueue
PriorityBlockingQueue
SynchronousQueue
上述都继承自AbstractQueue