ConcurrentHashMap源码分析(一)【添加元素】

Scroll Down

锁的简介

  1. synchronized
  • java中的关键字,内部实现为监视器锁,主要是通过对象监视器在对象头中的字段来表明的。
  • synchronized从旧版本到现在已经做了很多优化了,在运行时会有三种存在方式:偏向锁,轻量级锁,重量级锁。
  • 偏向锁,是指一段同步代码一直被一个线程访问,那么这个线程会自动获取锁,降低获取锁的代价。
  • 轻量级锁,是指当锁是偏向锁时,被另一个线程所访问,偏向锁会升级为轻量级锁,这个线程会通过自旋的方式尝试获取锁,不会阻塞,提高性能。
  • 重量级锁,是指当锁是轻量级锁时,当自旋的线程自旋了一定的次数后,还没有获取到锁,就会进入阻塞状态,该锁升级为重量级锁,重量级锁会使其他线程阻塞,性能降低。
  1. CAS
  • CAS,Compare And Swap,它是一种乐观锁,认为对于同一个数据的并发操作不一定会发生修改,在更新数据的时候,尝试去更新数据,如果失败就不断尝试。
  1. volatile(非锁)
  • java中的关键字,当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。
  • volatile只保证可见性,不保证原子性,比如 volatile修改的变量 i,针对i+ +操作,不保证每次结果都正确,因为i+ +操作是两步操作,相当于 i = i +1,先读取,再加1,这种情况volatile是无法保证的。
  1. 自旋锁
  • 自旋锁,是指尝试获取锁的线程不会阻塞,而是循环的方式不断尝试,这样的好处是减少线程的上下文切换带来的开锁,提高性能,缺点是循环会消耗CPU。
  1. 分段锁
  • 分段锁,是一种锁的设计思路,它细化了锁的粒度,主要运用在ConcurrentHashMap中,实现高效的并发操作,当操作不需要更新整个数组时,就只锁数组中的一项就可以了。
  1. ReentrantLock
  • 可重入锁,是指一个线程获取锁之后再尝试获取锁时会自动获取锁,可重入锁的优点是避免死锁。

ConcurrentHashMap的继承体系

image

构造方法

public ConcurrentHashMap() {
}

public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

添加元素

public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 校验key和value是否为空
    if (key == null || value == null) throw new NullPointerException();
    // 获取key的hash值
    int hash = spread(key.hashCode());
    // 
    int binCount = 0;
    // 死循环,结合CAS使用(如果CAS失败,则会重新取整个桶进行下面的流程)
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 判断桶是否为空,为空进行初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 判断元素所在的桶是否为空
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 如果要插入的元素所在的桶没有元素,则把这个元素直接放到桶中
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 判断是否在迁移元素
        else if ((fh = f.hash) == MOVED)
            // 帮助迁移元素
            tab = helpTransfer(tab, f);
        // 桶不为空,也不是正在迁移
        else {
            V oldVal = null;
            // 锁整个桶
            synchronized (f) {
                // 检测第一个元素是否有变化
                if (tabAt(tab, i) == f) {
                    // 如果第一个元素的hash值大于等于0,说明不是在迁移,也不是树化。
                    if (fh >= 0) {
                        // 桶中元素个数赋值为1
                        binCount = 1;
                        // 遍历整个桶,每次结束桶中元素+1
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 元素的hash和key相同说明找到这个元素了
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                // 判断是否替换值
                                if (!onlyIfAbsent)
                                    // 重新赋值
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                // 如果到链表尾部还没有找到元素,就把它插入到链表结尾并退出循环
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 如果第一个元素是树节点
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        // 桶中元素个数赋值为2
                        binCount = 2;
                        // 调用红黑树的插入方法插入元素,如果成功插入则返回null,否则返回寻找到的节点
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            // 判断是否要替换元素
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            
            // 如果binCount不为0,说明成功插入了元素或者寻找到了元素
            if (binCount != 0) {
                // 如果链表元素个数达到了8,则尝试树化
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                // 如果要插入的元素已经存在,则返回旧值
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 成功插入元素,元素个数加1(是否要扩容在这个里面)
    addCount(1L, binCount);
    // 成功插入元素返回null
    return null;
}
  1. 如果桶数组未初始化,则初始化;
  2. 如果待插入的元素所在的桶为空,则尝试把此元素直接插入到桶的第一个位置;
  3. 如果正在扩容,则当前线程一起加入到扩容的过程中;
  4. 如果待插入的元素所在的桶不为空且不在迁移元素,则锁住这个桶(分段锁);
  5. 如果当前桶中元素以链表方式存储,则在链表中寻找该元素或者插入元素;
  6. 如果当前桶中元素以红黑树方式存储,则在红黑树中寻找该元素或者插入元素;
  7. 如果元素存在,则返回旧值;
  8. 如果元素不存在,整个Map的元素个数加1,并检查是否需要扩容;

添加元素用到了那些锁?

自旋锁 + CAS + synchronized + 分段锁

为什么使用synchronized而不是ReentrantLock?

因为synchronized已经得到了极大地优化,在特定情况下并不比ReentrantLock差。