map.md 46.7 KB
Newer Older
沉默王二's avatar
沉默王二 已提交
1
---
沉默王二's avatar
沉默王二 已提交
2 3 4
title: 聊聊Java的并发集合容器ConcurrentHashMap、阻塞队列和 CopyOnWrite 容器
shortTitle: Java的并发容器
description: Java 的并发集合容器提供了在多线程环境中高效访问和操作的数据结构。这些容器通过内部的同步机制实现了线程安全,使得开发者无需显式同步代码就能在并发环境下安全使用。
沉默王二's avatar
沉默王二 已提交
5 6 7
category:
  - Java核心
tag:
沉默王二's avatar
沉默王二 已提交
8 9 10 11 12
  - Java并发编程
head:
  - - meta
    - name: keywords
      content: Java,并发编程,多线程,Thread,并发集合容器
沉默王二's avatar
沉默王二 已提交
13 14
---

沉默王二's avatar
沉默王二 已提交
15
# 14.19 Java的并发容器
沉默王二's avatar
沉默王二 已提交
16

沉默王二's avatar
沉默王二 已提交
17
java.util 包下提供了一些容器类([集合框架](https://javabetter.cn/collection/gailan.html)),其中 Vector 和 Hashtable 是线程安全的,实现方式比较粗暴,通过在方法上加「[sychronized](https://javabetter.cn/thread/synchronized-1.html)」关键字实现。
沉默王二's avatar
沉默王二 已提交
18

沉默王二's avatar
沉默王二 已提交
19
但即便是 Vector 这样线程安全的类,在应对多线程的复合操作时也需要在客户端继续加锁以保证原子性。来看下面的例子:
沉默王二's avatar
沉默王二 已提交
20 21 22 23 24 25 26 27 28 29

```java
public class TestVector {
	private Vector<String> vector;

	//方法一
	public  Object getLast(Vector vector) {
	    int lastIndex = vector.size() - 1;
	    return vector.get(lastIndex);
	}
沉默王二's avatar
沉默王二 已提交
30

沉默王二's avatar
沉默王二 已提交
31 32 33 34 35
	//方法二
	public  void deleteLast(Vector vector) {
	    int lastIndex = vector.size() - 1;
	    vector.remove(lastIndex);
	}
沉默王二's avatar
沉默王二 已提交
36

沉默王二's avatar
沉默王二 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
	//方法三
	public  Object getLastSysnchronized(Vector vector) {
		synchronized(vector){
			int lastIndex = vector.size() - 1;
			return vector.get(lastIndex);
		}
	}

	//方法四
	public  void deleteLastSysnchronized(Vector vector) {
		synchronized (vector){
			int lastIndex = vector.size() - 1;
			vector.remove(lastIndex);
		}
	}
沉默王二's avatar
沉默王二 已提交
52

沉默王二's avatar
沉默王二 已提交
53 54 55
}
```

沉默王二's avatar
沉默王二 已提交
56
如果方法一和方法二是一个组合的话,那么当方法一获取到了`vector`的 size 之后,方法二已经执行完毕,这样就会导致程序出现错误。
沉默王二's avatar
沉默王二 已提交
57

沉默王二's avatar
沉默王二 已提交
58
如果方法三与方法四组合的话,就还需在内部加锁来保证 `vector` 上的原子性操作。
沉默王二's avatar
沉默王二 已提交
59

沉默王二's avatar
沉默王二 已提交
60
于是并发容器就应用而生了,它们是线程安全的,可以在多线程环境下高效地访问和操作数据,而不需要额外的同步措施。
沉默王二's avatar
沉默王二 已提交
61

沉默王二's avatar
沉默王二 已提交
62
### 并发容器类
沉默王二's avatar
沉默王二 已提交
63

沉默王二's avatar
沉默王二 已提交
64
整体架构如下图所示:
沉默王二's avatar
沉默王二 已提交
65

沉默王二's avatar
沉默王二 已提交
66
![](https://cdn.tobebetterjavaer.com/tobebetterjavaer/images/thread/map-a6a020a3-4573-4cf8-b5ae-1541ae45801c.png)
沉默王二's avatar
沉默王二 已提交
67

沉默王二's avatar
沉默王二 已提交
68
### 并发 Map
沉默王二's avatar
沉默王二 已提交
69

沉默王二's avatar
沉默王二 已提交
70
#### ConcurrentMap 接口
沉默王二's avatar
沉默王二 已提交
71

沉默王二's avatar
沉默王二 已提交
72
ConcurrentMap 接口继承了 Map 接口,在 Map 接口的基础上又定义了四个方法:
沉默王二's avatar
沉默王二 已提交
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87

```java
public interface ConcurrentMap<K, V> extends Map<K, V> {

    //插入元素
    V putIfAbsent(K key, V value);

    //移除元素
    boolean remove(Object key, Object value);

    //替换元素
    boolean replace(K key, V oldValue, V newValue);

    //替换元素
    V replace(K key, V value);
沉默王二's avatar
沉默王二 已提交
88

沉默王二's avatar
沉默王二 已提交
89 90 91
}
```

沉默王二's avatar
沉默王二 已提交
92
**putIfAbsent:** 与原有 put 方法不同的是,putIfAbsent 如果插入的 key 相同,则不替换原有的 value 值;
沉默王二's avatar
沉默王二 已提交
93

沉默王二's avatar
沉默王二 已提交
94
**remove:** 与原有 remove 方法不同的是,新 remove 方法中增加了对 value 的判断,如果要删除的 key-value 不能与 Map 中原有的 key-value 对应上,则不会删除该元素;
沉默王二's avatar
沉默王二 已提交
95

沉默王二's avatar
沉默王二 已提交
96
**replace(K,V,V):** 增加了对 value 值的判断,如果 key-oldValue 能与 Map 中原有的 key-value 对应上,才进行替换操作;
沉默王二's avatar
沉默王二 已提交
97

沉默王二's avatar
沉默王二 已提交
98
**replace(K,V):** 与上面的 replace 不同的是,此 replace 不会对 Map 中原有的 key-value 进行比较,如果 key 存在则直接替换;
沉默王二's avatar
沉默王二 已提交
99

沉默王二's avatar
沉默王二 已提交
100
#### ConcurrentHashMap
沉默王二's avatar
沉默王二 已提交
101

沉默王二's avatar
沉默王二 已提交
102
ConcurrentHashMap 同 [HashMap](https://javabetter.cn/collection/hashmap.html) 一样,也是基于散列表的 map,但是它提供了一种与 Hashtable 完全不同的加锁策略,提供了更高效的并发性和伸缩性。
沉默王二's avatar
沉默王二 已提交
103

沉默王二's avatar
沉默王二 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
简单分析一下 JDK1.8 中的 ConcurrentHashMap 的 put 方法:

```java
public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    // key和value都不能为null
    if (key == null || value == null) throw new NullPointerException();
    // 计算hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            // 如果tab未初始化或者个数为0,则初始化node数组
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                    new Node<K,V>(hash, key, value, null)))
                // 如果使用CAS插入元素时,发现已经有元素了,则进入下一次循环,重新操作
                // 如果使用CAS插入元素成功,则break跳出循环,流程结束
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            // 如果要插入的元素所在的tab的第一个元素的hash是MOVED,则当前线程帮忙一起迁移元素
            tab = helpTransfer(tab, f);
        else {
            // 如果这个tab不为空且不在迁移元素,则锁住这个tab(分段锁)
            // 并查找要插入的元素是否在这个tab中
            // 存在,则替换值(onlyIfAbsent=false)
            // 不存在,则插入到链表结尾或插入树中
            V oldVal = null;
            synchronized (f) {
                // 再次检测第一个元素是否有变化,如果有变化则进入下一次循环,从头来过
                if (tabAt(tab, i) == f) {
                    // 如果第一个元素的hash值大于等于0(说明不是在迁移,也不是树)
                    // 那就是tab中的元素使用的是链表方式存储
                    if (fh >= 0) {
                        // tab中元素个数赋值为1
                        binCount = 1;
                        // 遍历整个tab,每次结束binCount加1
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                            (ek != null && key.equals(ek)))) {
                                // 如果找到了这个元素,则赋值了新值(onlyIfAbsent=false)
                                // 并退出循环
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                // 如果到链表尾部还没有找到元素
                                // 就把它插入到链表结尾并退出循环
                                pred.next = new Node<K,V>(hash, key,
                                        value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        // 如果第一个元素是树节点
                        Node<K,V> p;
                        // tab中元素个数赋值为2
                        binCount = 2;
                        // 调用红黑树的插入方法插入元素
                        // 如果成功插入则返回null
                        // 否则返回寻找到的节点
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                value)) != null) {
                            // 如果找到了这个元素,则赋值了新值(onlyIfAbsent=false)
                            // 并退出循环
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // 如果binCount不为0,说明成功插入了元素或者寻找到了元素
            if (binCount != 0) {
                // 如果链表元素个数达到了8,则尝试树化
                // 因为上面把元素插入到树中时,binCount只赋值了2,并没有计算整个树中元素的个数
                // 所以不会重复树化
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                // 如果要插入的元素已经存在,则返回旧值
                if (oldVal != null)
                    return oldVal;
                // 退出外层大循环,流程结束
                break;
            }
        }
    }
    // 成功插入元素,元素个数加1(是否要扩容在这个里面)
    addCount(1L, binCount);
    // 成功插入元素返回null
    return null;
}
```

当进行 put 操作时,流程大概可以分如下几个步骤:
沉默王二's avatar
沉默王二 已提交
211

沉默王二's avatar
沉默王二 已提交
212 213 214 215 216 217 218
- 首先会判断 key、value 是否为空,如果为空就抛异常;
- 接着会判断容器数组是否为空,如果为空就初始化数组;
- 进一步判断,要插入的元素 f,在当前数组下标是否第一次插入,如果是就通过 CAS 方式插入;
- 再接着判断`f.hash == -1`是否成立,如果成立,说明当前 f 是 ForwardingNode 节点,表示有其它线程正在扩容,则一起进行扩容操作;
- 其他的情况,就是把新的 Node 节点按链表或红黑树的方式插入到合适的位置;
- 节点插入完成之后,接着判断链表长度是否超过 8,如果超过 8 个,就将链表转化为红黑树结构;
- 最后,插入完成之后,进行扩容判断。
沉默王二's avatar
沉默王二 已提交
219

沉默王二's avatar
沉默王二 已提交
220
initTable 初始化数组
沉默王二's avatar
沉默王二 已提交
221

沉默王二's avatar
沉默王二 已提交
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
```java
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            // 如果sizeCtl<0说明正在初始化或者扩容,让出CPU
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            // 如果把sizeCtl原子更新为-1成功,则当前线程进入初始化
            // 如果原子更新失败则说明有其它线程先一步进入初始化了,则进入下一次循环
            // 如果下一次循环时还没初始化完毕,则sizeCtl<0进入上面if的逻辑让出CPU
            // 如果下一次循环更新完毕了,则table.length!=0,退出循环
            try {
                // 再次检查table是否为空,防止ABA问题
                if ((tab = table) == null || tab.length == 0) {
                    // 如果sc为0则使用默认值16
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    // 新建数组
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    // 把tab数组赋值给table
                    table = tab = nt;
                    // 设置sc为数组长度的0.75倍
                    // n - (n >>> 2) = n - n/4 = 0.75n
                    // 可见这里装载因子和扩容门槛都是写死了的
                    // 这也正是没有threshold和loadFactor属性的原因
                    sc = n - (n >>> 2);
                }
            } finally {
                // 把sc赋值给sizeCtl,这时存储的是扩容门槛
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}
```
沉默王二's avatar
沉默王二 已提交
260

沉默王二's avatar
沉默王二 已提交
261 262 263
- 使用 CAS 锁控制只有一个线程初始化 tab 数组;
- sizeCtl 在初始化后存储的是扩容门槛;
- 扩容门槛写死的是 tab 数组大小的 0.75 倍,tab 数组大小即 map 的容量,也就是最多存储多少个元素。
沉默王二's avatar
沉默王二 已提交
264

沉默王二's avatar
沉默王二 已提交
265
helpTransfer 协助扩容
沉默王二's avatar
沉默王二 已提交
266

沉默王二's avatar
沉默王二 已提交
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
```java
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    // 如果tab数组不为空,并且当前tab第一个元素为ForwardingNode类型,并且nextTab不为空
    // 说明当前tab已经迁移完毕了,才去帮忙迁移其它tab的元素
    // 扩容时会把旧tab的第一个元素置为ForwardingNode,并让其nextTab指向新tab数组
    if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        int rs = resizeStamp(tab.length);
        // sizeCtl<0,说明正在扩容
        while (nextTab == nextTable && table == tab &&
                (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            // 扩容线程数加1
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                // 当前线程帮忙迁移元素
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}
```
沉默王二's avatar
沉默王二 已提交
294

沉默王二's avatar
沉默王二 已提交
295
操作步骤如下:
沉默王二's avatar
沉默王二 已提交
296

沉默王二's avatar
沉默王二 已提交
297 298 299 300
- 第 1 步,对 table、node 节点、node 节点的 nextTable,进行数据校验;
- 第 2 步,根据数组的 length 得到一个标识符号;
- 第 3 步,进一步校验 nextTab、tab、sizeCtl 值,如果 nextTab 没有被并发修改并且 tab 也没有被并发修改,同时 `sizeCtl < 0`,说明还在扩容;
- 第 4 步,对 sizeCtl 参数值进行分析判断,如果不满足任何一个判断,将`sizeCtl + 1`, 增加了一个线程帮助其扩容。
沉默王二's avatar
沉默王二 已提交
301

沉默王二's avatar
沉默王二 已提交
302
addCount 扩容判断
沉默王二's avatar
沉默王二 已提交
303

沉默王二's avatar
沉默王二 已提交
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
```java
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    // 先尝试把数量加到baseCount上,如果失败再加到分段的CounterCell上
    if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                        U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        // 计算元素个数
        s = sumCount();
    }
    //检查是否需要扩容,默认check=1,需要检查
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        // 如果元素个数达到了扩容门槛,则进行扩容
        // 注意,正常情况下sizeCtl存储的是扩容门槛,即容量的0.75倍
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                (n = tab.length) < MAXIMUM_CAPACITY) {
            // rs是扩容时的一个标识
            int rs = resizeStamp(n);
            if (sc < 0) {
                // sc<0说明正在扩容中
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                    // 扩容已经完成了,退出循环
                    break;
                // 扩容未完成,则当前线程加入迁移元素中
                // 并把扩容线程数加1
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                    (rs << RESIZE_STAMP_SHIFT) + 2))
                // 进入迁移元素
                transfer(tab, null);
            // 重新计算元素个数
            s = sumCount();
        }
    }
}
```

操作步骤如下:

- 第 1 步,利用 CAS 将方法更新 baseCount 的值
- 第 2 步,检查是否需要扩容,默认 check = 1,需要检查;
- 第 3 步,如果满足扩容条件,判断当前是否正在扩容,如果是正在扩容就一起扩容;
- 第 4 步,如果不在扩容,将 sizeCtl 更新为负数,并进行扩容处理。

以上就是整个 put 方法的流程,可以从中发现,里面大量的使用了 CAS 方法,CAS 表示比较与替换,里面有 3 个参数,分别是目标内存地址、旧值、新值,每次判断的时候,会将旧值与目标内存地址中的值进行比较,如果相等,就将新值更新到内存地址里,如果不相等,就继续循环,直到操作成功为止!

get 方法
沉默王二's avatar
沉默王二 已提交
366 367

```java
沉默王二's avatar
沉默王二 已提交
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 计算hash
    int h = spread(key.hashCode());
    // 判断数组是否为空,通过key定位到数组下标是否为空;
    if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
        // 如果第一个元素就是要找的元素,直接返回
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            // hash小于0,说明是树或者正在扩容
            // 使用find寻找元素,find的寻找方式依据Node的不同子类有不同的实现方式
            return (p = e.find(h, key)) != null ? p.val : null;
        // 遍历整个链表寻找元素
        while ((e = e.next) != null) {
            if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
沉默王二's avatar
沉默王二 已提交
393 394
```

沉默王二's avatar
沉默王二 已提交
395
步骤如下:
沉默王二's avatar
沉默王二 已提交
396

沉默王二's avatar
沉默王二 已提交
397 398 399 400
- 第 1 步,判断数组是否为空,通过 key 定位到数组下标是否为空;
- 第 2 步,判断 node 节点第一个元素是不是要找到,如果是直接返回;
- 第 3 步,如果是红黑树结构,就从红黑树里面查询;
- 第 4 步,如果是链表结构,循环遍历判断。
沉默王二's avatar
沉默王二 已提交
401

沉默王二's avatar
沉默王二 已提交
402
remove 方法
沉默王二's avatar
沉默王二 已提交
403

沉默王二's avatar
沉默王二 已提交
404 405 406 407 408
```java
public V remove(Object key) {
    // 调用替换节点方法
    return replaceNode(key, null, null);
}
沉默王二's avatar
沉默王二 已提交
409

沉默王二's avatar
沉默王二 已提交
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505
final V replaceNode(Object key, V value, Object cv) {
    // 计算hash
    int hash = spread(key.hashCode());
    // 循环遍历数组
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //校验参数
        if (tab == null || (n = tab.length) == 0 ||
                (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        else if ((fh = f.hash) == MOVED)
            // 如果正在扩容中,协助扩容
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 标记是否处理过
            boolean validated = false;
            //用 synchronized 同步锁,保证并发时元素移除安全
            synchronized (f) {
                // 再次验证当前tab元素是否被修改过
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        // fh>=0表示是链表节点
                        validated = true;
                        // 遍历链表寻找目标节点
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                            (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                if (cv == null || cv == ev ||
                                        (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            // 遍历到链表尾部还没找到元素,跳出循环
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    else if (f instanceof TreeBin) {
                        // 如果是树节点
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        // 遍历树找到了目标节点
                        if ((r = t.root) != null &&
                                (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                    (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            // 如果处理过,不管有没有找到元素都返回
            if (validated) {
                // 如果找到了元素,返回其旧值
                if (oldVal != null) {
                    // 如果要替换的值为空,元素个数减1
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    // 没找到元素返回空
    return null;
}
```

步骤如下:

- 第 1 步,循环遍历数组,接着校验参数;
- 第 2 步,判断是否有别的线程正在扩容,如果是一起扩容;
- 第 3 步,用 synchronized 同步锁,保证并发时元素移除安全;
- 第 4 步,因为 `check= -1`,所以不会进行扩容操作,利用 CAS 操作修改 baseCount 值。

后面我们还会单独再开一篇来详细介绍 [ConcurrentHashMap](https://javabetter.cn/thread/ConcurrentHashMap.html),这篇就先分析到这里。
沉默王二's avatar
沉默王二 已提交
506

沉默王二's avatar
沉默王二 已提交
507
#### ConcurrentSkipListMap
沉默王二's avatar
沉默王二 已提交
508

沉默王二's avatar
沉默王二 已提交
509
ConcurrentNavigableMap 接口继承了 NavigableMap 接口,这个接口提供了针对给定搜索目标返回最接近匹配项的导航方法。
沉默王二's avatar
沉默王二 已提交
510

沉默王二's avatar
沉默王二 已提交
511
ConcurrentNavigableMap 接口的主要实现类是 ConcurrentSkipListMap 类。从名字上来看,它的底层使用的是跳表(SkipList)。跳表是一种”空间换时间“的数据结构,可以使用 [CAS](https://javabetter.cn/thread/cas.html) 来保证并发安全性。
沉默王二's avatar
沉默王二 已提交
512

沉默王二's avatar
沉默王二 已提交
513
与 ConcurrentHashMap 的读密集操作相比,ConcurrentSkipListMap 的读和写操作的性能相对较低。这是由其数据结构导致的,因为跳表的插入和删除需要更复杂的指针操作。然而,ConcurrentSkipListMap 提供了有序性,这是 ConcurrentHashMap 所没有的。
沉默王二's avatar
沉默王二 已提交
514

沉默王二's avatar
沉默王二 已提交
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551
ConcurrentSkipListMap 适用于需要线程安全的同时又需要元素有序的场合。如果不需要有序,ConcurrentHashMap 可能是更好的选择,因为它通常具有更高的性能。

[時光以北这篇 ConcurrentSkipListMap 讲的很不错](https://juejin.cn/post/6844903958499033095),可以学习。

### 并发 Queue

JDK 并没有提供线程安全的 List 类,因为对 List 来说,**很难去开发一个通用并且没有并发瓶颈的线程安全的 List**。因为即使简单的读操作,比如 `contains()`,也需要再搜索的时候锁住整个 list。

所以退一步,JDK 提供了队列和双端队列的线程安全类:ConcurrentLinkedQueue 和 ConcurrentLinkedDeque。因为队列相对于 List 来说,有更多的限制。这两个类是使用 CAS 来实现线程安全的。

我们会在后面单独开一篇来详细介绍[ConcurrentLinkedQueue](https://javabetter.cn/thread/ConcurrentLinkedQueue.html)

### 并发 Set

ConcurrentSkipListSet 是线程安全的有序集合。底层是使用 ConcurrentSkipListMap 来实现。

谷歌的 [Guava](https://javabetter.cn/common-tool/guava.html)实现了一个线程安全的 ConcurrentHashSet:

```java
Set<String> s = Sets.newConcurrentHashSet();
```

### 阻塞队列

我们假设一种场景,[生产者一直生产资源,消费者一直消费资源](https://javabetter.cn/thread/shengchanzhe-xiaofeizhe.html),资源存储在一个缓冲池中,生产者将生产的资源存进缓冲池中,消费者从缓冲池中拿到资源进行消费,这就是大名鼎鼎的**生产者-消费者模式**

该模式能够简化开发过程,一方面消除了生产者类与消费者类之间的代码依赖性,另一方面将生产数据的过程与使用数据的过程解耦简化负载。

我们自己 coding 实现这个模式的时候,因为需要让**多个线程操作共享变量**(即资源),所以很容易引发**线程安全问题**,造成**重复消费****死锁**,尤其是生产者和消费者存在多个的情况。另外,当缓冲池空了,我们需要阻塞消费者,唤醒生产者;当缓冲池满了,我们需要阻塞生产者,唤醒消费者,这些个**等待-唤醒**逻辑都需要自己实现。

这么容易出错的事情,JDK 当然帮我们做啦,这就是阻塞队列(BlockingQueue),**你只管往里面存、取就行,而不用担心多线程环境下存、取共享变量的线程安全问题。**

> BlockingQueue 是 Java util.concurrent 包下重要的数据结构,区别于普通的队列,BlockingQueue 提供了**线程安全的队列访问方式**,并发包下很多高级同步类的实现都是基于 BlockingQueue 实现的。

BlockingQueue 一般用于生产者-消费者模式,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。**BlockingQueue 就是存放元素的容器**

#### BlockingQueue 的操作方法
沉默王二's avatar
沉默王二 已提交
552 553 554 555 556 557 558 559 560

阻塞队列提供了四组不同的方法用于插入、移除、检查元素:

| 方法\处理方式 | 抛出异常  | 返回特殊值 |  一直阻塞  |      超时退出      |
| :-----------: | :-------: | :--------: | :--------: | :----------------: |
|   插入方法    |  add(e)   |  offer(e)  | **put(e)** | offer(e,time,unit) |
|   移除方法    | remove()  |   poll()   | **take()** |  poll(time,unit)   |
|   检查方法    | element() |   peek()   |     -      |         -          |

沉默王二's avatar
沉默王二 已提交
561 562 563 564 565 566
- 抛出异常:如果操作无法立即执行,会抛异常。当阻塞队列满时候,再往队列里插入元素,会抛出 `IllegalStateException(“Queue full”)`异常。当队列为空时,从队列里获取元素时会抛出 NoSuchElementException 异常 。
- 返回特殊值:如果操作无法立即执行,会返回一个特殊值,通常是 true / false。
- 一直阻塞:如果操作无法立即执行,则一直阻塞或者响应中断。
- 超时退出:如果操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功,通常是 true / false。

**注意:**
沉默王二's avatar
沉默王二 已提交
567

沉默王二's avatar
沉默王二 已提交
568 569
- 不能往阻塞队列中插入 null,会抛出空指针异常。
- 可以访问阻塞队列中的任意元素,调用 `remove(o)`可以将队列之中的特定对象移除,但并不高效,尽量避免使用。
沉默王二's avatar
沉默王二 已提交
570

沉默王二's avatar
沉默王二 已提交
571
我们会在后面单独开一篇[BlockingQueue](https://javabetter.cn/thread/BlockingQueue.html)来细讲。
沉默王二's avatar
沉默王二 已提交
572

沉默王二's avatar
沉默王二 已提交
573
#### BlockingQueue 的实现类
沉默王二's avatar
沉默王二 已提交
574 575 576

**ArrayBlockingQueue**

沉默王二's avatar
沉默王二 已提交
577
**数组**结构组成的**有界**阻塞队列。内部结构是数组,具有数组的特性。
沉默王二's avatar
沉默王二 已提交
578

沉默王二's avatar
沉默王二 已提交
579
```java
沉默王二's avatar
沉默王二 已提交
580
public ArrayBlockingQueue(int capacity, boolean fair){
沉默王二's avatar
沉默王二 已提交
581
 //..省略代码
沉默王二's avatar
沉默王二 已提交
582
}
沉默王二's avatar
沉默王二 已提交
583
```
沉默王二's avatar
沉默王二 已提交
584

沉默王二's avatar
沉默王二 已提交
585
可以初始化队列大小,一旦初始化将不能改变。构造方法中的 fair 表示控制对象的内部锁是否采用公平锁,默认是**非公平锁**
沉默王二's avatar
沉默王二 已提交
586 587 588

**LinkedBlockingQueue**

沉默王二's avatar
沉默王二 已提交
589
**链表**结构组成的**有界**阻塞队列。内部结构是链表,具有链表的特性。默认队列的大小是`Integer.MAX_VALUE`,也可以指定大小。此队列按照**先进先出**的原则对元素进行排序。
沉默王二's avatar
沉默王二 已提交
590 591 592

**DelayQueue**

沉默王二's avatar
沉默王二 已提交
593 594 595
该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。注入其中的元素必须实现 `java.util.concurrent.Delayed` 接口。

DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
沉默王二's avatar
沉默王二 已提交
596

沉默王二's avatar
沉默王二 已提交
597
**PriorityBlockingQueue**
沉默王二's avatar
沉默王二 已提交
598

沉默王二's avatar
沉默王二 已提交
599
基于优先级的无界阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来决定),内部控制线程同步的锁采用的是非公平锁。
沉默王二's avatar
沉默王二 已提交
600

沉默王二's avatar
沉默王二 已提交
601
> 网上大部分博客上**PriorityBlockingQueue**为公平锁,其实是不对的,查阅源码(感谢 github:**ambition0802**同学的指出):
沉默王二's avatar
沉默王二 已提交
602

沉默王二's avatar
沉默王二 已提交
603
```java
沉默王二's avatar
沉默王二 已提交
604
public PriorityBlockingQueue(int initialCapacity,
沉默王二's avatar
沉默王二 已提交
605 606 607 608
                                 Comparator<? super E> comparator) {
    this.lock = new ReentrantLock(); //默认构造方法-非公平锁
    ...//其余代码略
}
沉默王二's avatar
沉默王二 已提交
609 610 611 612
```

**SynchronousQueue**

沉默王二's avatar
沉默王二 已提交
613
这个队列比较特殊,**没有任何内部容量**,甚至连一个队列的容量都没有。并且每个 put 必须等待一个 take,反之亦然。
沉默王二's avatar
沉默王二 已提交
614

沉默王二's avatar
沉默王二 已提交
615
需要区别容量为 1 的 ArrayBlockingQueue、LinkedBlockingQueue。
沉默王二's avatar
沉默王二 已提交
616

沉默王二's avatar
沉默王二 已提交
617
以下方法的返回值,可以帮助理解这个队列:
沉默王二's avatar
沉默王二 已提交
618

沉默王二's avatar
沉默王二 已提交
619 620 621 622 623 624 625 626
- `iterator()` 永远返回空,因为里面没有东西
- `peek()` 永远返回 null
- `put()` 往 queue 放进去一个 element 以后就一直 wait 直到有其他 thread 进来把这个 element 取走。
- `offer()` 往 queue 里放一个 element 后立即返回,如果碰巧这个 element 被另一个 thread 取走了,offer 方法返回 true,认为 offer 成功;否则返回 false。
- `take()` 取出并且 remove 掉 queue 里的 element,取不到东西他会一直等。
- `poll()` 取出并且 remove 掉 queue 里的 element,只有到碰巧另外一个线程正在往 queue 里 offer 数据或者 put 数据的时候,该方法才会取到东西。否则立即返回 null。
- `isEmpty()` 永远返回 true
- `remove()&removeAll()` 永远返回 false
沉默王二's avatar
沉默王二 已提交
627 628 629

**注意**

沉默王二's avatar
沉默王二 已提交
630
**PriorityBlockingQueue**不会阻塞数据生产者(因为队列是无界的),而只会在没有可消费的数据时阻塞数据的消费者。因此使用的时候要特别注意,**生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。**对于使用默认大小的**LinkedBlockingQueue**也是一样的。
沉默王二's avatar
沉默王二 已提交
631

沉默王二's avatar
沉默王二 已提交
632
#### 阻塞队列的原理
沉默王二's avatar
沉默王二 已提交
633

沉默王二's avatar
沉默王二 已提交
634
阻塞队列的原理很简单,利用了 Lock 锁的多条件([Condition](https://javabetter.cn/thread/condition.html))阻塞控制。接下来我们分析 ArrayBlockingQueue JDK 1.8 的源码。
沉默王二's avatar
沉默王二 已提交
635

沉默王二's avatar
沉默王二 已提交
636
首先是构造方法,除了初始化队列的大小和是否是公平锁之外,还对同一个锁(lock)初始化了两个监视器,分别是 notEmpty 和 notFull。这两个监视器的作用目前可以简单理解为标记分组,当该线程是 put 操作时,给他加上监视器 notFull,标记这个线程是一个生产者;当线程是 take 操作时,给他加上监视器 notEmpty,标记这个线程是消费者。
沉默王二's avatar
沉默王二 已提交
637 638 639 640 641 642 643 644 645 646 647 648 649 650 651

```java
//数据元素数组
final Object[] items;
//下一个待取出元素索引
int takeIndex;
//下一个待添加元素索引
int putIndex;
//元素个数
int count;
//内部锁
final ReentrantLock lock;
//消费者监视器
private final Condition notEmpty;
//生产者监视器
沉默王二's avatar
沉默王二 已提交
652
private final Condition notFull;
沉默王二's avatar
沉默王二 已提交
653 654 655 656 657 658 659 660 661

public ArrayBlockingQueue(int capacity, boolean fair) {
    //..省略其他代码
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
```

沉默王二's avatar
沉默王二 已提交
662
**put 操作的源码**
沉默王二's avatar
沉默王二 已提交
663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695

```java
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 1.自旋拿锁
    lock.lockInterruptibly();
    try {
        // 2.判断队列是否满了
        while (count == items.length)
            // 2.1如果满了,阻塞该线程,并标记为notFull线程,
            // 等待notFull的唤醒,唤醒之后继续执行while循环。
            notFull.await();
        // 3.如果没有满,则进入队列
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 4 唤醒一个等待的线程
    notEmpty.signal();
}

```

沉默王二's avatar
沉默王二 已提交
696
总结 put 的流程:
沉默王二's avatar
沉默王二 已提交
697

沉默王二's avatar
沉默王二 已提交
698 699 700 701
1. 所有执行 put 操作的线程竞争 lock 锁,拿到了 lock 锁的线程进入下一步,没有拿到 lock 锁的线程自旋竞争锁。
2. 判断阻塞队列是否满了,如果满了,则调用 await 方法阻塞这个线程,并标记为 notFull(生产者)线程,同时释放 lock 锁,等待被消费者线程唤醒。
3. 如果没有满,则调用 enqueue 方法将元素 put 进阻塞队列。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了 lock 锁的线程。
4. 唤醒一个标记为 notEmpty(消费者)的线程。
沉默王二's avatar
沉默王二 已提交
702

沉默王二's avatar
沉默王二 已提交
703
**take 操作的源码**
沉默王二's avatar
沉默王二 已提交
704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733

```java
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}
```

沉默王二's avatar
沉默王二 已提交
734
take 操作和 put 操作类似,总结一下 take 操作的流程:
沉默王二's avatar
沉默王二 已提交
735

沉默王二's avatar
沉默王二 已提交
736 737 738 739
1. 所有执行 take 操作的线程竞争 lock 锁,拿到了 lock 锁的线程进入下一步,没有拿到 lock 锁的线程自旋竞争锁。
2. 判断阻塞队列是否为空,如果是空,则调用 await 方法阻塞这个线程,并标记为 notEmpty(消费者)线程,同时释放 lock 锁,等待被生产者线程唤醒。
3. 如果没有空,则调用 dequeue 方法。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了 lock 锁的线程。
4. 唤醒一个标记为 notFull(生产者)的线程。
沉默王二's avatar
沉默王二 已提交
740 741 742

**注意**

沉默王二's avatar
沉默王二 已提交
743 744 745
1. put 和 take 操作都需要**先获取锁**,没有获取到锁的线程会被挡在第一道大门之外自旋拿锁,直到获取到锁。
2. 就算拿到锁了之后,也**不一定**会顺利进行 put/take 操作,需要判断**队列是否可用**(是否满/空),如果不可用,则会被阻塞,**并释放锁**
3. 在第 2 点被阻塞的线程会被唤醒,但是在唤醒之后,**依然需要拿到锁**才能继续往下执行,否则,自旋拿锁,拿到锁了再 while 判断队列是否可用(这也是为什么不用 if 判断,而使用 while 判断的原因)。
沉默王二's avatar
沉默王二 已提交
746

沉默王二's avatar
沉默王二 已提交
747
#### 示例和使用场景
沉默王二's avatar
沉默王二 已提交
748

沉默王二's avatar
沉默王二 已提交
749
生产者-消费者模型:
沉默王二's avatar
沉默王二 已提交
750 751 752 753 754

```java
public class Test {
    private int queueSize = 10;
    private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
沉默王二's avatar
沉默王二 已提交
755

沉默王二's avatar
沉默王二 已提交
756 757 758 759
    public static void main(String[] args)  {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();
沉默王二's avatar
沉默王二 已提交
760

沉默王二's avatar
沉默王二 已提交
761 762 763
        producer.start();
        consumer.start();
    }
沉默王二's avatar
沉默王二 已提交
764

沉默王二's avatar
沉默王二 已提交
765
    class Consumer extends Thread{
沉默王二's avatar
沉默王二 已提交
766

沉默王二's avatar
沉默王二 已提交
767 768 769 770
        @Override
        public void run() {
            consume();
        }
沉默王二's avatar
沉默王二 已提交
771

沉默王二's avatar
沉默王二 已提交
772 773 774 775 776 777 778 779 780 781 782
        private void consume() {
            while(true){
                try {
                    queue.take();
                    System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
沉默王二's avatar
沉默王二 已提交
783

沉默王二's avatar
沉默王二 已提交
784
    class Producer extends Thread{
沉默王二's avatar
沉默王二 已提交
785

沉默王二's avatar
沉默王二 已提交
786 787 788 789
        @Override
        public void run() {
            produce();
        }
沉默王二's avatar
沉默王二 已提交
790

沉默王二's avatar
沉默王二 已提交
791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825
        private void produce() {
            while(true){
                try {
                    queue.put(1);
                    System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
```

下面是这个例子的输出片段:

```
从队列取走一个元素,队列剩余0个元素
从队列取走一个元素,队列剩余0个元素
向队列取中插入一个元素,队列剩余空间:9
向队列取中插入一个元素,队列剩余空间:9
向队列取中插入一个元素,队列剩余空间:9
向队列取中插入一个元素,队列剩余空间:8
向队列取中插入一个元素,队列剩余空间:7
向队列取中插入一个元素,队列剩余空间:6
向队列取中插入一个元素,队列剩余空间:5
向队列取中插入一个元素,队列剩余空间:4
向队列取中插入一个元素,队列剩余空间:3
向队列取中插入一个元素,队列剩余空间:2
向队列取中插入一个元素,队列剩余空间:1
向队列取中插入一个元素,队列剩余空间:0
从队列取走一个元素,队列剩余1个元素
从队列取走一个元素,队列剩余9个元素
```

沉默王二's avatar
沉默王二 已提交
826
注意,这个例子中的输出结果看起来可能有问题,比如有几行在插入一个元素之后,队列的剩余空间不变。这是由于**System.out.println 语句没有锁**。考虑到这样的情况:线程 1 在执行完 put/take 操作后立即失去 CPU 时间片,然后切换到线程 2 执行 put/take 操作,执行完毕后回到线程 1 的 System.out.println 语句并输出,发现这个时候阻塞队列的 size 已经被线程 2 改变了,所以这个时候输出的 size 并不是当时线程 1 执行完 put/take 操作之后阻塞队列的 size,但可以确保的是 size 不会超过 10 个。实际上使用阻塞队列是没有问题的。
沉默王二's avatar
沉默王二 已提交
827 828 829 830

#### 线程池中使用阻塞队列

```java
沉默王二's avatar
沉默王二 已提交
831
public ThreadPoolExecutor(int corePoolSize,
沉默王二's avatar
沉默王二 已提交
832 833 834 835
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue) {
沉默王二's avatar
沉默王二 已提交
836
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
沉默王二's avatar
沉默王二 已提交
837 838 839 840
             Executors.defaultThreadFactory(), defaultHandler);
}
```

沉默王二's avatar
沉默王二 已提交
841 842 843
Java 中的线程池就是使用阻塞队列实现的,我们在了解阻塞队列之后,无论是使用 Executors 类中已经提供的线程池,还是自己通过 ThreadPoolExecutor 实现线程池,都会更加得心应手。

> 注:上面提到的生产者-消费者模式,大家可以参考[生产者-消费者模型](https://javabetter.cn/thread/shengchanzhe-xiaofeizhe.html),可以更好的理解阻塞队列。
沉默王二's avatar
沉默王二 已提交
844

沉默王二's avatar
沉默王二 已提交
845
### CopyOnWrite 容器
沉默王二's avatar
沉默王二 已提交
846

沉默王二's avatar
沉默王二 已提交
847
在聊 CopyOnWrite 容器之前我们先来谈谈什么是 CopyOnWrite 机制,CopyOnWrite 是计算机设计领域的一种优化策略,也是一种在并发场景下常用的设计思想——写入时复制。
沉默王二's avatar
沉默王二 已提交
848

沉默王二's avatar
沉默王二 已提交
849
什么是写入时复制呢?
沉默王二's avatar
沉默王二 已提交
850

沉默王二's avatar
沉默王二 已提交
851
就是当有多个调用者同时去请求一个资源数据的时候,有一个调用者出于某些原因需要对当前的数据源进行修改,这个时候系统将会复制一个当前数据源的副本给调用者修改。
沉默王二's avatar
沉默王二 已提交
852

沉默王二's avatar
沉默王二 已提交
853
CopyOnWrite 容器即**写时复制的容器**,当我们往一个容器中添加元素的时候,不直接往容器中添加,而是将当前容器进行 copy,复制出来一个新的容器,然后向新容器中添加我们需要的元素,最后将原容器的引用指向新容器。
沉默王二's avatar
沉默王二 已提交
854

沉默王二's avatar
沉默王二 已提交
855 856 857
这样做的好处在于,我们可以在并发的场景下对容器进行"读操作"而不需要"加锁",从而达到读写分离的目的。从 JDK 1.5 开始 Java 并发包里提供了两个使用 CopyOnWrite 机制实现的并发容器,分别是 [CopyOnWriteArrayList](https://javabetter.cn/thread/CopyOnWriteArrayList.html) 和 CopyOnWriteArraySet。

#### CopyOnWriteArrayList
沉默王二's avatar
沉默王二 已提交
858 859

**优点**
沉默王二's avatar
沉默王二 已提交
860 861 862 863

CopyOnWriteArrayList 经常被用于“读多写少”的并发场景,是因为 CopyOnWriteArrayList 无需任何同步措施,大大增强了读的性能。在 Java 中遍历线程非安全的 List(如:[ArrayList 和 LinkedList](https://javabetter.cn/collection/list-war-2.html))的时候,若中途有别的线程对 List 容器进行了修改,那么会抛出 ConcurrentModificationException 异常。

CopyOnWriteArrayList 由于其"读写分离",遍历和修改操作分别作用在不同的 List 容器,所以在使用迭代器遍历的时候,不会抛出异常。
沉默王二's avatar
沉默王二 已提交
864 865 866

**缺点**

沉默王二's avatar
沉默王二 已提交
867
第一个缺点是 CopyOnWriteArrayList 每次执行写操作都会将原容器进行拷贝一份,数据量大的时候,内存会存在较大的压力,可能会引起频繁 Full GC。比如这些对象占用的内存 200M 左右,那么再写入 100M 数据进去,内存就会多占用 300M。
沉默王二's avatar
沉默王二 已提交
868

沉默王二's avatar
沉默王二 已提交
869
第二个缺点是 CopyOnWriteArrayList 由于实现的原因,写和读分别作用在不同新老容器上,在写操作执行过程中,读不会阻塞,但读取到的却是老容器的数据。
沉默王二's avatar
沉默王二 已提交
870

沉默王二's avatar
沉默王二 已提交
871 872 873
现在我们来看一下 CopyOnWriteArrayList 的 add 操作源码,它的逻辑很清晰,就是先把原容器进行 copy,然后在新的副本上进行“写操作”,最后再切换引用,在此过程中是加了锁的。

```java
沉默王二's avatar
沉默王二 已提交
874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892
public boolean add(E e) {
    // ReentrantLock加锁,保证线程安全
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        // 拷贝原容器,长度为原容器长度加一
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        // 在新副本上执行添加操作
        newElements[len] = e;
        // 将原容器引用指向新副本
        setArray(newElements);
        return true;
    } finally {
        // 解锁
        lock.unlock();
    }
}
沉默王二's avatar
沉默王二 已提交
893 894 895 896 897
```

我们再来看一下 remove 操作的源码,remove 的逻辑是将要 remove 元素之外的其他元素拷贝到新的副本中,然后再将原容器的引用指向新的副本中,因为 remove 操作也是“写操作”所以也是要加锁的。

```java
沉默王二's avatar
沉默王二 已提交
898
public E remove(int index) {
沉默王二's avatar
沉默王二 已提交
899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916
    // 加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        E oldValue = get(elements, index);
        int numMoved = len - index - 1;
        if (numMoved == 0)
            // 如果要删除的是列表末端数据,拷贝前len-1个数据到新副本上,再切换引用
            setArray(Arrays.copyOf(elements, len - 1));
        else {
            // 否则,将要删除元素之外的其他元素拷贝到新副本中,并切换引用
            Object[] newElements = new Object[len - 1];
            System.arraycopy(elements, 0, newElements, 0, index);
            System.arraycopy(elements, index + 1, newElements, index,
                                numMoved);
            setArray(newElements);
沉默王二's avatar
沉默王二 已提交
917
        }
沉默王二's avatar
沉默王二 已提交
918 919 920 921
        return oldValue;
    } finally {
        // 解锁
        lock.unlock();
沉默王二's avatar
沉默王二 已提交
922
    }
沉默王二's avatar
沉默王二 已提交
923 924 925 926 927 928
}
```

我们再来看看 CopyOnWriteArrayList 效率最高的读操作的源码

```java
沉默王二's avatar
沉默王二 已提交
929 930 931
public E get(int index) {
    return get(getArray(), index);
}
沉默王二's avatar
沉默王二 已提交
932 933 934
```

```java
沉默王二's avatar
沉默王二 已提交
935 936 937
 private E get(Object[] a, int index) {
     return (E) a[index];
 }
沉默王二's avatar
沉默王二 已提交
938
```
沉默王二's avatar
沉默王二 已提交
939

沉默王二's avatar
沉默王二 已提交
940
由上可见“读操作”是没有加锁的,直接读取。
沉默王二's avatar
沉默王二 已提交
941

沉默王二's avatar
沉默王二 已提交
942
#### CopyOnWrite 使用
沉默王二's avatar
沉默王二 已提交
943

沉默王二's avatar
沉默王二 已提交
944
接下来,我们结合具体业务场景来实现一个 CopyOnWriteMap 的并发容器并且使用它。
沉默王二's avatar
沉默王二 已提交
945

沉默王二's avatar
沉默王二 已提交
946
```java
沉默王二's avatar
沉默王二 已提交
947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978
import java.util.Collection;
import java.util.Map;
import java.util.Set;

public class CopyOnWriteMap<K, V> implements Map<K, V>, Cloneable {
    private volatile Map<K, V> internalMap;

    public CopyOnWriteMap() {
        internalMap = new HashMap<K, V>();
    }

    public V put(K key, V value) {
        synchronized (this) {
            Map<K, V> newMap = new HashMap<K, V>(internalMap);
            V val = newMap.put(key, value);
            internalMap = newMap;
            return val;
        }
    }

    public V get(Object key) {
        return internalMap.get(key);
    }

    public void putAll(Map<? extends K, ? extends V> newData) {
        synchronized (this) {
            Map<K, V> newMap = new HashMap<K, V>(internalMap);
            newMap.putAll(newData);
            internalMap = newMap;
        }
    }
}
沉默王二's avatar
沉默王二 已提交
979 980 981
```

上面就是参考 CopyOnWriteArrayList 实现的 CopyOnWriteMap,我们可以用这个容器来做什么呢?结合我们之前说的 CopyOnWrite 的复制思想,它最适用于“读多写少”的并发场景。
沉默王二's avatar
沉默王二 已提交
982

沉默王二's avatar
沉默王二 已提交
983
**场景:** 假如我们有一个搜索的网站需要屏蔽一些“关键字”,“黑名单”每晚定时更新,每当用户搜索的时候,“黑名单”中的关键字不会出现在搜索结果当中,并且提示用户敏感字。
沉默王二's avatar
沉默王二 已提交
984

沉默王二's avatar
沉默王二 已提交
985
```java
沉默王二's avatar
沉默王二 已提交
986 987 988
// 黑名单服务
public class BlackListServiceImpl {
    // 减少扩容开销。根据实际需要,初始化CopyOnWriteMap的大小,避免写时CopyOnWriteMap扩容的开销。
沉默王二's avatar
沉默王二 已提交
989
    private static CopyOnWriteMap<String, Boolean> blackListMap =
沉默王二's avatar
沉默王二 已提交
990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010
        new CopyOnWriteMap<String, Boolean>(1000);

    public static boolean isBlackList(String id) {
        return blackListMap.get(id) == null ? false : true;
    }

    public static void addBlackList(String id) {
        blackListMap.put(id, Boolean.TRUE);
    }

    /**
     * 批量添加黑名单
     * (使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。
     * 如使用上面代码里的addBlackList方法)
     * @param ids
     */
    public static void addBlackList(Map<String,Boolean> ids) {
        blackListMap.putAll(ids);
    }

}
沉默王二's avatar
沉默王二 已提交
1011
```
沉默王二's avatar
沉默王二 已提交
1012

沉默王二's avatar
沉默王二 已提交
1013
这里需要各位小伙伴特别特别注意一个问题,此处的场景是每晚凌晨“黑名单”定时更新,原因是 CopyOnWrite 容器有**数据一致性**的问题,它只能保证**最终数据一致性**
沉默王二's avatar
沉默王二 已提交
1014

沉默王二's avatar
沉默王二 已提交
1015
所以如果我们希望写入的数据马上能准确地读取,请不要使用 CopyOnWrite 容器。
沉默王二's avatar
沉默王二 已提交
1016

沉默王二's avatar
沉默王二 已提交
1017
### 总结
沉默王二's avatar
jvm  
沉默王二 已提交
1018

沉默王二's avatar
沉默王二 已提交
1019
本文主要介绍了并发包中的三个重要的容器类,Map、阻塞队列和 CopyOnWrite 容器,Map 用于存储键值对,阻塞队列用于生产者-消费者模型,而 CopyOnWrite 容器用于“读多写少”的并发场景。
沉默王二's avatar
沉默王二 已提交
1020

沉默王二's avatar
沉默王二 已提交
1021
> 编辑:沉默王二,部分内容来源于朋友小七萤火虫开源的这个仓库:[深入浅出 Java 多线程](http://concurrent.redspider.group/),部分内容来自于这篇[初念初恋-ConcurrentHashMap](https://juejin.cn/post/7064061605185028110)。
沉默王二's avatar
沉默王二 已提交
1022 1023

---
沉默王二's avatar
沉默王二 已提交
1024

沉默王二's avatar
沉默王二 已提交
1025
GitHub 上标星 9000+ 的开源知识库《[二哥的 Java 进阶之路](https://github.com/itwanger/toBeBetterJavaer)》第一版 PDF 终于来了!包括 Java 基础语法、数组&字符串、OOP、集合框架、Java IO、异常处理、Java 新特性、网络编程、NIO、并发编程、JVM 等等,共计 32 万余字,可以说是通俗易懂、风趣幽默……详情戳:[太赞了,GitHub 上标星 9000+ 的 Java 教程](https://javabetter.cn/overview/)
沉默王二's avatar
沉默王二 已提交
1026 1027

微信搜 **沉默王二** 或扫描下方二维码关注二哥的原创公众号沉默王二,回复 **222** 即可免费领取。
沉默王二's avatar
沉默王二 已提交
1028

沉默王二's avatar
沉默王二 已提交
1029
![](https://cdn.tobebetterjavaer.com/tobebetterjavaer/images/gongzhonghao.png)