• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

我工作三年了,该懂并发了!

开发技术 开发技术 3周前 (08-30) 28次浏览

本文的组织形式如下,主要会介绍到同步容器类,操作系统的并发工具,Java 开发工具包(只是简单介绍一下,后面会有源码分析)。同步工具类有哪些。

我工作三年了,该懂并发了!

下面我们就来介绍一下 Java 并发中都涉及哪些模块,这些并发模块都是 Java 并发类库所提供的。

同步容器类

同步容器主要包括两类,一种是本来就是线程安全实现的容器,这类容器有 Vector、Hashtable、Stack,这类容器的方法上都加了 synchronized 锁,是线程安全的实现。

Vector、Hashtable、Stack 这些容器我们现在几乎都不在使用,因为这些容器在多线程环境下的效率不高。

还有一类是由 Collections.synchronizedxxx 实现的非线程安全的容器,使用 Collections.synchronized 会把它们封装起来编程线程安全的容器,举出两个例子

  • Collections.synchronizedList
  • Collections.synchronizedMap

我们可以通过 Collections 源码可以看出这些线程安全的实现

我工作三年了,该懂并发了!

要不为啥要称 Collections 为集合工具类呢?Collections 会把这些容器类的状态封装起来,并对每个同步方法进行同步,使得每次只有一个线程能够访问容器的状态。

其中每个 synchronized xxx 都是相当于创建了一个静态内部类。

我工作三年了,该懂并发了!

虽然同步容器类都是线程安全的,但是在某些情况下需要额外的客户端加锁来保证一些复合操作的安全性,复合操作就是有两个及以上的方法组成的操作,比如最典型的就是 若没有则添加,用伪代码表示则是

if(a == null){
  a = get();
}

比如可以用来判断 Map 中是否有某个 key,如果没有则添加进 Map 中。这些复合操作在没有客户端加锁的情况下是线程安全的,但是当多个线程并发修改容器时,可能会表现出意料之外的行为。例如下面这段代码

public class TestVector implements Runnable{

    static Vector vector = new Vector();
    static void addVector(){
        for(int i = 0;i < 10000;i++){
            vector.add(i);
        }
    }

    static Object getVector(){
        int index = vector.size() - 1;
        return vector.get(index);
    }

    static void removeVector(){
        int index = vector.size() - 1;
        vector.remove(index);
    }

    @Override
    public void run() {
        getVector();
    }

    public static void main(String[] args) {
        TestVector testVector = new TestVector();
        testVector.addVector();
        Thread t1 = new Thread(() -> {
            for(int i = 0;i < vector.size();i++){
                getVector();
            }
        });

        Thread t2 = new Thread(() -> {
            for(int i = 0;i < vector.size();i++){
                removeVector();
            }
        });

        t1.start();
        t2.start();
    }

}

这些方法看似没有问题,因为 Vector 能够保证线程安全性,无论多少个线程访问 Vector 也不会造成 Vector 的内部产生破坏,但是从整个系统来说,是存在线程安全性的,事实上你运行一下,也会发现报错。

会出现

我工作三年了,该懂并发了!

如果线程 A 在包含这么多元素的基础上调用 getVector 方法,会得到一个数值,getVector 只是取得该元素,而并不是从 vector 中移除,removeVector 方法是得到一个元素进行移除,这段代码的不安全因素就是,因为线程的时间片是乱序的,而且 getVector 和 removeVector 并不会保证互斥,所以在 removeVector 方法把某个值比如 6666 移除后,vector 中就不存在这个 6666 的元素,此时 getVector 方法取得 6666 ,就会抛出数组越界异常。为什么是数组越界异常呢?可以看一下 vector 的源码

我工作三年了,该懂并发了!

如果用图表示的话,则会是下面这样。

我工作三年了,该懂并发了!

所以,从系统的层面来看,上面这段代码也要保证线程安全性才可以,也就是在客户端加锁 实现,只要我们让复合操作使用一把锁,那么这些操作就和其他单独的操作一样都是原子性的。如下面例子所示

static Object getVector(){
  synchronized (vector){
    int index = vector.size() - 1;
    return vector.get(index);
  }
}

static void removeVector(){
  synchronized (vector) {
    int index = vector.size() - 1;
    vector.remove(index);
  }
}

也可以通过锁住 .class 来保证原子性操作,也能达到同样的效果。

static Object getVector(){
  synchronized (TestVector.class){
    int index = vector.size() - 1;
    return vector.get(index);
  }
}

static void removeVector(){
  synchronized (TestVector.class) {
    int index = vector.size() - 1;
    vector.remove(index);
  }
}

在调用 size 和 get 之间,Vector 的长度可能会发生变化,这种变化在对 Vector 进行排序时出现,如下所示

for(int i = 0;i< vector.size();i++){
  doSomething(vector.get(i));
}

这种迭代的操作正确性取决于运气,即在调用 size 和 get 之间会修改 Vector,在单线程环境中,这种假设完全成立,但是再有其他线程并发修改 Vector 时,则可能会导致麻烦。

我们仍旧可以通过客户端加锁的方式来避免这种情况

synchronized(vector){
  for(int i = 0;i< vector.size();i++){
    doSomething(vector.get(i));
  }  
}

这种方式为客户端的可靠性提供了保证,但是牺牲了伸缩性,而且这种在遍历过程中进行加锁,也不是我们所希望看到的。

fail-fast

针对上面这种情况,很多集合类都提供了一种 fail-fast 机制,因为大部分集合内部都是使用 Iterator 进行遍历,在循环中使用同步锁的开销会很大,而 Iterator 的创建是轻量级的,所以在集合内部如果有并发修改的操作,集合会进行快速失败,也就是 fail-fast。当他们发现容器在迭代过程中被修改时,会抛出 ConcurrentModificationException 异常,这种快速失败不是一种完备的处理机制,而只是 善意的捕获并发错误。

如果查看过 ConcurrentModificationException 的注解,你会发现,ConcurrentModificationException 抛出的原则由两种,如下

我工作三年了,该懂并发了!

造成这种异常的原因是由于多个线程在遍历集合的同时对集合类内部进行了修改,这也就是 fail-fast 机制。

该注解还声明了另外一种方式

我工作三年了,该懂并发了!

这个问题也是很经典的一个问题,我们使用 ArrayList 来举例子。如下代码所示

public static void main(String[] args) {
  List<String> list = new ArrayList<>();
  for (int i = 0 ; i < 10 ; i++ ) {
    list.add(i + "");
  }
  Iterator<String> iterator = list.iterator();
  int i = 0 ;
  while(iterator.hasNext()) {
    if (i == 3) {
      list.remove(3);
    }
    System.out.println(iterator.next());
    i ++;
  }
}

该段代码会发生异常,因为在 ArrayList 内部,有两个属性,一个是 modCount ,一个是 expectedModCount ,ArrayList 在 remove 等对集合结构的元素造成数量上的操作会有 checkForComodification 的判断,如下所示,这也是这段代码的错误原因。

我工作三年了,该懂并发了!

fail-safe

fail-safe 是 Java 中的一种 安全失败 机制,它表示的是在遍历时不是直接在原集合上进行访问,而是先复制原有集合内容,在拷贝的集合上进行遍历。 由于迭代时是对原集合的拷贝进行遍历,所以在遍历过程中对原集合所作的修改并不能被迭代器检测到,所以不会触发 ConcurrentModificationException。java.util.concurrent 包下的容器都是安全失败的,可以在多线程条件下使用,并发修改。

比如 CopyOnWriteArrayList, 它就是一种 fail-safe 机制的集合,它就不会出现异常,例如如下操作

List<Integer> integers = new CopyOnWriteArrayList<>();
integers.add(1);
integers.add(2);
integers.add(3);
Iterator<Integer> itr = integers.iterator();
while (itr.hasNext()) {
    Integer a = itr.next();
    integers.remove(a);
}

CopyOnWriteArrayList 就是 ArrayList 的一种线程安全的变体,CopyOnWriteArrayList 中的所有可变操作比如 add 和 set 等等都是通过对数组进行全新复制来实现的。

操作系统中的并发工具

讲到并发容器,就不得不提操作系统级别实现了哪些进程/线程间的并发容器,说白了其实就是数据结构的设计。下面我们就来一起看一下操作系统级别的并发工具

信号量

信号量是 E.W.Dijkstra 在 1965 年提出的一种方法,它使用一个整形变量来累计唤醒次数,以供之后使用。在他的观点中,有一个新的变量类型称作 信号量(semaphore)。一个信号量的取值可以是 0 ,或任意正数。0 表示的是不需要任何唤醒,任意的正数表示的就是唤醒次数。

Dijkstra 提出了信号量有两个操作,现在通常使用 downup(分别可以用 sleep 和 wakeup 来表示)。down 这个指令的操作会检查值是否大于 0 。如果大于 0 ,则将其值减 1 ;若该值为 0 ,则进程将睡眠,而且此时 down 操作将会继续执行。检查数值、修改变量值以及可能发生的睡眠操作均为一个单一的、不可分割的 原子操作(atomic action) 完成。

互斥量

如果不需要信号量的计数能力时,可以使用信号量的一个简单版本,称为 mutex(互斥量)。互斥量的优势就在于在一些共享资源和一段代码中保持互斥。由于互斥的实现既简单又有效,这使得互斥量在实现用户空间线程包时非常有用。

互斥量是一个处于两种状态之一的共享变量:解锁(unlocked)加锁(locked)。这样,只需要一个二进制位来表示它,不过一般情况下,通常会用一个 整型(integer) 来表示。0 表示解锁,其他所有的值表示加锁,比 1 大的值表示加锁的次数。

mutex 使用两个过程,当一个线程(或者进程)需要访问关键区域时,会调用 mutex_lock 进行加锁。如果互斥锁当前处于解锁状态(表示关键区域可用),则调用成功,并且调用线程可以自由进入关键区域。

另一方面,如果 mutex 互斥量已经锁定的话,调用线程会阻塞直到关键区域内的线程执行完毕并且调用了 mutex_unlock 。如果多个线程在 mutex 互斥量上阻塞,将随机选择一个线程并允许它获得锁。

我工作三年了,该懂并发了!

Futexes

随着并行的增加,有效的同步(synchronization)锁定(locking) 对于性能来说是非常重要的。如果进程等待时间很短,那么自旋锁(Spin lock) 是非常有效;但是如果等待时间比较长,那么这会浪费 CPU 周期。如果进程很多,那么阻塞此进程,并仅当锁被释放的时候让内核解除阻塞是更有效的方式。不幸的是,这种方式也会导致另外的问题:它可以在进程竞争频繁的时候运行良好,但是在竞争不是很激烈的情况下内核切换的消耗会非常大,而且更困难的是,预测锁的竞争数量更不容易。

有一种有趣的解决方案是把两者的优点结合起来,提出一种新的思想,称为 futex,或者是 快速用户空间互斥(fast user space mutex),是不是听起来很有意思?

我工作三年了,该懂并发了!

futex 是 Linux 中的特性实现了基本的锁定(很像是互斥锁)而且避免了陷入内核中,因为内核的切换的开销非常大,这样做可以大大提高性能。futex 由两部分组成:内核服务和用户库。内核服务提供了了一个 等待队列(wait queue) 允许多个进程在锁上排队等待。除非内核明确的对他们解除阻塞,否则它们不会运行。

我工作三年了,该懂并发了!

Pthreads 中的互斥量

Pthreads 提供了一些功能用来同步线程。最基本的机制是使用互斥量变量,可以锁定和解锁,用来保护每个关键区域。希望进入关键区域的线程首先要尝试获取 mutex。如果 mutex 没有加锁,线程能够马上进入并且互斥量能够自动锁定,从而阻止其他线程进入。如果 mutex 已经加锁,调用线程会阻塞,直到 mutex 解锁。如果多个线程在相同的互斥量上等待,当互斥量解锁时,只有一个线程能够进入并且重新加锁。这些锁并不是必须的,程序员需要正确使用它们。

下面是与互斥量有关的函数调用

我工作三年了,该懂并发了!

和我们想象中的一样,mutex 能够被创建和销毁,扮演这两个角色的分别是 Phread_mutex_initPthread_mutex_destroy。mutex 也可以通过 Pthread_mutex_lock 来进行加锁,如果互斥量已经加锁,则会阻塞调用者。还有一个调用Pthread_mutex_trylock 用来尝试对线程加锁,当 mutex 已经被加锁时,会返回一个错误代码而不是阻塞调用者。这个调用允许线程有效的进行忙等。最后,Pthread_mutex_unlock 会对 mutex 解锁并且释放一个正在等待的线程。

除了互斥量以外,Pthreads 还提供了第二种同步机制: 条件变量(condition variables) 。mutex 可以很好的允许或阻止对关键区域的访问。条件变量允许线程由于未满足某些条件而阻塞。绝大多数情况下这两种方法是一起使用的。下面我们进一步来研究线程、互斥量、条件变量之间的关联。

下面再来重新认识一下生产者和消费者问题:一个线程将东西放在一个缓冲区内,由另一个线程将它们取出。如果生产者发现缓冲区没有空槽可以使用了,生产者线程会阻塞起来直到有一个线程可以使用。生产者使用 mutex 来进行原子性检查从而不受其他线程干扰。但是当发现缓冲区已经满了以后,生产者需要一种方法来阻塞自己并在以后被唤醒。这便是条件变量做的工作。

下面是一些与条件变量有关的最重要的 pthread 调用

我工作三年了,该懂并发了!

上表中给出了一些调用用来创建和销毁条件变量。条件变量上的主要属性是 Pthread_cond_waitPthread_cond_signal。前者阻塞调用线程,直到其他线程发出信号为止(使用后者调用)。阻塞的线程通常需要等待唤醒的信号以此来释放资源或者执行某些其他活动。只有这样阻塞的线程才能继续工作。条件变量允许等待与阻塞原子性的进程。Pthread_cond_broadcast 用来唤醒多个阻塞的、需要等待信号唤醒的线程。

需要注意的是,条件变量(不像是信号量)不会存在于内存中。如果将一个信号量传递给一个没有线程等待的条件变量,那么这个信号就会丢失,这个需要注意

管程

为了能够编写更加准确无误的程序,Brinch Hansen 和 Hoare 提出了一个更高级的同步原语叫做 管程(monitor)。管程有一个很重要的特性,即在任何时候管程中只能有一个活跃的进程,这一特性使管程能够很方便的实现互斥操作。管程是编程语言的特性,所以编译器知道它们的特殊性,因此可以采用与其他过程调用不同的方法来处理对管程的调用。通常情况下,当进程调用管程中的程序时,该程序的前几条指令会检查管程中是否有其他活跃的进程。如果有的话,调用进程将被挂起,直到另一个进程离开管程才将其唤醒。如果没有活跃进程在使用管程,那么该调用进程才可以进入。

进入管程中的互斥由编译器负责,但是一种通用做法是使用 互斥量(mutex)二进制信号量(binary semaphore)。由于编译器而不是程序员在操作,因此出错的几率会大大降低。在任何时候,编写管程的程序员都无需关心编译器是如何处理的。他只需要知道将所有的临界区转换成为管程过程即可。绝不会有两个进程同时执行临界区中的代码。

即使管程提供了一种简单的方式来实现互斥,但在我们看来,这还不够。因为我们还需要一种在进程无法执行被阻塞。在生产者-消费者问题中,很容易将针对缓冲区满和缓冲区空的测试放在管程程序中,但是生产者在发现缓冲区满的时候该如何阻塞呢?

解决的办法是引入条件变量(condition variables) 以及相关的两个操作 waitsignal。当一个管程程序发现它不能运行时(例如,生产者发现缓冲区已满),它会在某个条件变量(如 full)上执行 wait 操作。这个操作造成调用进程阻塞,并且还将另一个以前等在管程之外的进程调入管程。在前面的 pthread 中我们已经探讨过条件变量的实现细节了。另一个进程,比如消费者可以通过执行 signal 来唤醒阻塞的调用进程。

通过临界区自动的互斥,管程比信号量更容易保证并行编程的正确性。但是管程也有缺点,我们前面说到过管程是一个编程语言的概念,编译器必须要识别管程并用某种方式对其互斥作出保证。C、Pascal 以及大多数其他编程语言都没有管程,所以不能依靠编译器来遵守互斥规则。

与管程和信号量有关的另一个问题是,这些机制都是设计用来解决访问共享内存的一个或多个 CPU 上的互斥问题的。通过将信号量放在共享内存中并用 TSLXCHG 指令来保护它们,可以避免竞争。但是如果是在分布式系统中,可能同时具有多个 CPU 的情况,并且每个 CPU 都有自己的私有内存呢,它们通过网络相连,那么这些原语将会失效。因为信号量太低级了,而管程在少数几种编程语言之外无法使用,所以还需要其他方法。

消息传递

上面提到的其他方法就是 消息传递(messaage passing)。这种进程间通信的方法使用两个原语 sendreceive ,它们像信号量而不像管程,是系统调用而不是语言级别。示例如下

send(destination, &message);

receive(source, &message);

send 方法用于向一个给定的目标发送一条消息,receive 从一个给定的源接收一条消息。如果没有消息,接受者可能被阻塞,直到接收一条消息或者带着错误码返回。

消息传递系统现在面临着许多信号量和管程所未涉及的问题和设计难点,尤其对那些在网络中不同机器上的通信状况。例如,消息有可能被网络丢失。为了防止消息丢失,发送方和接收方可以达成一致:一旦接受到消息后,接收方马上回送一条特殊的 确认(acknowledgement) 消息。如果发送方在一段时间间隔内未收到确认,则重发消息。

现在考虑消息本身被正确接收,而返回给发送着的确认消息丢失的情况。发送者将重发消息,这样接受者将收到两次相同的消息。

我工作三年了,该懂并发了!

对于接收者来说,如何区分新的消息和一条重发的老消息是非常重要的。通常采用在每条原始消息中嵌入一个连续的序号来解决此问题。如果接受者收到一条消息,它具有与前面某一条消息一样的序号,就知道这条消息是重复的,可以忽略。

消息系统还必须处理如何命名进程的问题,以便在发送或接收调用中清晰的指明进程。身份验证(authentication) 也是一个问题,比如客户端怎么知道它是在与一个真正的文件服务器通信,从发送方到接收方的信息有可能被中间人所篡改。

屏障

最后一个同步机制是准备用于进程组而不是进程间的生产者-消费者情况的。在某些应用中划分了若干阶段,并且规定,除非所有的进程都就绪准备着手下一个阶段,否则任何进程都不能进入下一个阶段,可以通过在每个阶段的结尾安装一个 屏障(barrier) 来实现这种行为。当一个进程到达屏障时,它会被屏障所拦截,直到所有的屏障都到达为止。屏障可用于一组进程同步,如下图所示

我工作三年了,该懂并发了!

在上图中我们可以看到,有四个进程接近屏障,这意味着每个进程都在进行运算,但是还没有到达每个阶段的结尾。过了一段时间后,A、B、D 三个进程都到达了屏障,各自的进程被挂起,但此时还不能进入下一个阶段呢,因为进程 B 还没有执行完毕。结果,当最后一个 C 到达屏障后,这个进程组才能够进入下一个阶段。

避免锁:读-复制-更新

最快的锁是根本没有锁。问题在于没有锁的情况下,我们是否允许对共享数据结构的并发读写进行访问。答案当然是不可以。假设进程 A 正在对一个数字数组进行排序,而进程 B 正在计算其平均值,而此时你进行 A 的移动,会导致 B 会多次读到重复值,而某些值根本没有遇到过。

然而,在某些情况下,我们可以允许写操作来更新数据结构,即便还有其他的进程正在使用。窍门在于确保每个读操作要么读取旧的版本,要么读取新的版本,例如下面的树

我工作三年了,该懂并发了!

上面的树中,读操作从根部到叶子遍历整个树。加入一个新节点 X 后,为了实现这一操作,我们要让这个节点在树中可见之前使它"恰好正确":我们对节点 X 中的所有值进行初始化,包括它的子节点指针。然后通过原子写操作,使 X 称为 A 的子节点。所有的读操作都不会读到前后不一致的版本

我工作三年了,该懂并发了!

在上面的图中,我们接着移除 B 和 D。首先,将 A 的左子节点指针指向 C 。所有原本在 A 中的读操作将会后续读到节点 C ,而永远不会读到 B 和 D。也就是说,它们将只会读取到新版数据。同样,所有当前在 B 和 D 中的读操作将继续按照原始的数据结构指针并且读取旧版数据。所有操作均能正确运行,我们不需要锁住任何东西。而不需要锁住数据就能够移除 B 和 D 的主要原因就是 读-复制-更新(Ready-Copy-Update,RCU),将更新过程中的移除和再分配过程分离开。

Java 并发工具包

JDK 1.5 提供了许多种并发容器来改进同步容器的性能,同步容器将所有对容器状态的访问都串行化,以实现他们之间的线程安全性。这种方法的代价是严重降低了并发性能,当多个线程争抢容器锁的同时,严重降低吞吐量。

下面我们就来一起认识一下 Java 中都用了哪些并发工具

Java 并发工具综述

在 Java 5.0 中新增加了 ConcurrentHashMap 用来替代基于散列的 Map 容器;新增加了 CopyOnWriteArrayListCopyOnWriteArraySet 来分别替代 ArrayList 和 Set 接口实现类;还新增加了两种容器类型,分别是 QueueBlockingQueue, Queue 是队列的意思,它有一些实现分别是传统的先进先出队列 ConcurrentLinkedQueue以及并发优先级队列 PriorityQueue。Queue 是一个先入先出的队列,它的操作不会阻塞,如果队列为空那么获取元素的操作会返回空值。PriorityQueue 扩展了 Queue,增加了可阻塞的插入和获取等操作。如果队列为空,那么获取元素的操作将一直阻塞,直到队列中出现一个可用的元素为止。如果队列已满,那么插入操作则一直阻塞,直到队列中有可用的空间为止。

Java 6.0 还引入了 ConcurrentSkipListMapConcurrentSkipListSet 分别作为同步的 SortedMap 和 SortedSet 的并发替代品。下面我们就展开探讨了,设计不到底层源码,因为本篇文章主要目的就是为了描述一下有哪些东西以及用了哪些东西。

ConcurrentHashMap

我们先来看一下 ConcurrentHashMap 在并发集合中的位置

我工作三年了,该懂并发了!

可以看到,ConcurrentHashMap 继承了 AbstractMap 接口并实现了 ConcurrentMap 和 Serializable 接口,AbstractMap 和 ConcurrentMap 都是 Map 的实现类,只不过 AbstractMap 是抽象实现。

ConcurrentHashMapHashtable 的构造非常相似,只不过 Hashtable 容器在激烈竞争的场景中会表现出效率低下的现象,这是因为所有访问 Hashtable 的线程都想获取同一把锁,如果容器里面有多把锁,并且每一把锁都只用来锁定一段数据,那么当多个线程访问不同的数据段时,就不存在竞争关系。这就是 ConcurreentHashMap 采用的 分段锁 实现。在这种锁实现中,任意数量的读取线程可以并发的访问 Map,执行读取操作的线程和执行写入的线程可以并发的访问 Map,并且在读取的同时也可以并发修改 Map。

我工作三年了,该懂并发了!

ConcurrentHashMap 分段锁实现带来的结果是,在并发环境下可以实现更高的吞吐量,在单线程环境下只损失非常小的性能。

你知道 HashMap 是具有 fail-fast 机制的,也就是说它是一种强一致性的集合,在数据不一致的情况下会抛出 ConcurrentModificationException 异常,而 ConcurrentHashMap 是一种 一致性 的集合,在并发修改其内部结构时,它不会抛出 ConcurrentModificationException 异常,弱一致性能够容忍并发修改。

在 HashMap 中,我们一般使用的 size、empty、containsKey 等方法都是标准方法,其返回的结果是一定的,包含就是包含,不包含就是不包含,可以作为判断条件;而 ConcurrentHashMap 中的这些方法只是参考方法,它不是一个 精确值,像是 size、empty 这些方法在并发场景下用处很小,因为他们的返回值总是在不断变化,所以这些操作的需求就被弱化了。

我工作三年了,该懂并发了!

在 ConcurrentHashMap 中没有实现对 Map 加锁从而实现独占访问。在线程安全的 Map 实现 HashtableCollections.synchronizedMap 中都实现了独占访问,因此只能单个线程修改 Map 。ConcurrentHashMap 与这些 Map 容器相比,具有更多的优势和更少的劣势,只有当需要独占访问的需求时才会使用 Hashtable 或者是 Collections.synchronizedMap ,否则其他并发场景下,应该使用 ConcurrentHashMap。

ConcurrentMap

ConcurrentMap 是一个接口,它继承了 Map 接口并提供了 Map 接口中四个新的方法,这四个方法都是 原子性 方法,进一步扩展了 Map 的功能。

public interface ConcurrentMap<K, V> extends Map<K, V> {
 
  // 仅当 key 没有相应的映射值时才插入
  V putIfAbsent(K key, V value);
  
  // 仅当 key 被映射到 value 时才移除
  boolean remove(Object key, Object value);
  
  // 仅当 key 被映射到 value 时才移除
  V replace(K key, V value);
  
  // 仅当 key 被映射到 oldValue 时才替换为 newValue
  boolean replace(K key, V oldValue, V newValue);
  
}

ConcurrentNavigableMap

java.util.concurrent.ConcurrentNavigableMap 类是 java.util.NavigableMap 的子类,它支持并发访问,并且允许其视图的并发访问。

我工作三年了,该懂并发了!

什么是视图呢?视图就是集合中的一段数据序列,ConcurrentNavigableMap 中支持使用 headMapsubMaptailMap 返回的视图。与其重新解释一下 NavigableMap 中找到的所有方法,不如看一下 ConcurrentNavigableMap 中添加的方法

  • headMap 方法:headMap 方法返回一个严格小于给定键的视图
  • tailMap 方法:tailMap 方法返回包含大于或等于给定键的视图。
  • subMap 方法:subMap 方法返回给定两个参数的视图

ConcurrentNavigableMap 接口包含一些可能有用的其他方法

  • descendingKeySet()
  • descendingMap()
  • navigableKeySet()

更多关于方法的描述这里就不再赘述了,读者朋友们可自行查阅 javadoc

ConcurrentSkipListMap

ConcurrentSkipListMap 是线程安全的有序的哈希表,适用于高并发的场景。

我工作三年了,该懂并发了!

ConcurrentSkipListMap 的底层数据结构是基于跳表实现的。ConcurrentSkipListMap 可以提供 Comparable 内部排序或者是 Comparator 外部排序,具体取决于使用哪个构造函数。

ConcurrentSkipListSet

ConcurrentSkipListSet 是线程安全的有序的集合,适用于高并发的场景。ConcurrentSkipListSet 底层是通过 ConcurrentNavigableMap 来实现的,它是一个有序的线程安全的集合。

ConcurrentSkipListSet有序的,基于元素的自然排序或者通过比较器确定的顺序;

ConcurrentSkipListSet是线程安全的;

CopyOnWriteArrayList

CopyOnWriteArrayList 是 ArrayList 的变体,在 CopyOnWriteArrayList 中,所有可变操作比如 add、set 其实都是重新创建了一个副本,通过对数组进行复制而实现的。

我工作三年了,该懂并发了!

CopyOnWriteArrayList 其内部有一个指向数组的引用,数组是不会被修改的,每次并发修改 CopyOnWriteArrayList 都相当于重新创建副本,CopyOnWriteArrayList 是一种 fail-safe 机制的,它不会抛出 ConcurrentModificationException 异常,并且返回元素与迭代器创建时的元素相同。

每次并发写操作都会创建新的副本,这个过程存在一定的开销,所以,一般在规模很大时,读操作要远远多于写操作时,为了保证线程安全性,会使用 CopyOnWriteArrayList。

类似的,CopyOnWriteArraySet 的作用也相当于替代了 Set 接口。

我工作三年了,该懂并发了!

BlockingQueue

BlockingQueue 译为 阻塞队列,它是 JDK 1.5 添加的新的工具类,它继承于 Queue 队列,并扩展了 Queue 的功能。

我工作三年了,该懂并发了!

BlockingQueue 在检索元素时会等待队列变成非空,并在存储元素时会等待队列变为可用。BlockingQueue 的方法有四种实现形式,以不同的方式来处理。

  • 第一种是抛出异常
  • 特殊值:第二种是根据情况会返回 null 或者 false
  • 阻塞:第三种是无限期的阻塞当前线程直到操作变为可用后
  • 超时:第四种是给定一个最大的超时时间,超过后才会放弃

BlockingQueue 不允许添加 null 元素,在其实现类的方法 add、put 或者 offer 后时添加 null 会抛出空指针异常。BlockingQueue 会有容量限制。在任意时间内,它都会有一个 remainCapacity,超过该值之前,任意 put 元素都会阻塞。

BlockingQueue 一般用于实现生产者 - 消费者 队列,如下图所示

我工作三年了,该懂并发了!

BlockingQueue 有多种实现,下面我们一起来认识一下这些容器。

其中 LinkedBlockingQueueArrayBlockingQueue 是 FIFO 先入先出队列,二者分别和 LinkedListArrayList 对应,比同步 List 具有更好的并发性能。 PriorityBlockingQueue 是一个优先级排序的阻塞队列,如果你希望按照某种顺序而不是 FIFO 处理元素时这个队列将非常有用。正如其他有序的容器一样,PriorityBlockingQueue 既可以按照自然顺序来比较元素,也可以使用 Comparator 比较器进行外部元素比较。SynchronousQueue 它维护的是一组线程而不是一组队列,实际上它不是一个队列,它的每个 insert 操作必须等待其他相关线程的 remove 方法后才能执行,反之亦然。

LinkedBlockingQueue

LinkedBlockingQueue 是一种 BlockingQueue 的实现。

我工作三年了,该懂并发了!

它是一种基于链表的构造、先入先出的有界阻塞队列。队列的 head 也就是头元素是在队列中等待时间最长的元素;队列的 tail也就是队尾元素是队列中等待时间最短的元素。新的元素会被插入到队尾中,检索操作将获取队列中的头部元素。链表队列通常比基于数组的队列具有更高的吞吐量,但是在大多数并发应用程序中,可预测的性能较差。

ArrayBlockingQueue

ArrayBlockingQueue 是一个用数组实现的有界队列,此队列顺序按照先入先出的原则对元素进行排序。

默认情况下不保证线程公平的访问队列,所谓公平访问队列指的是阻塞的线程,可以按照阻塞的先后顺序访问,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的。有可能先阻塞的线程最后才访问队列。

PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的阻塞队列,默认情况下的元素采取自然顺序生序或者降序,也可以自己定义 Comparator 进行外部排序。但需要注意的是不能保证同优先级元素的顺序。

DelayQueue

DelayQueue 是一个支持延时获取元素的无阻塞队列,其中的元素只能在延迟到期后才能使用,DelayQueue 中的队列头是延迟最长时间的元素,如果没有延迟,则没有 head 头元素,poll 方法会返回 null。判断的依据就是 getDelay(TimeUnit.NANOSECONDS) 方法返回一个值小于或者等于 0 就会发生过期。

TransferQueue

TransferQueue 继承于 BlockingQueue,它是一个接口,一个 BlockingQueue 是一个生产者可能等待消费者接受元素,TransferQueue 则更进一步,生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费,新添加的transfer 方法用来实现这种约束。

TransferQueue 有下面这些方法:两个 tryTransfer 方法,一个是非阻塞的,另一个是带有 timeout 参数设置超时时间的。还有两个辅助方法 hasWaitingConsumergetWaitingConcusmerCount

LinkedTransferQueue

一个无界的基于链表的 TransferQueue。这个队列对任何给定的生产者进行 FIFO 排序,head 是队列中存在时间最长的元素。tail 是队列中存在时间最短的元素。

BlockingDeque

与 BlockingQueue 相对的还有 BlockingDeque 和 Deque,它们是 JDK1.6 被提出的,分别对 Queue 和 BlockingQueue 做了扩展。

我工作三年了,该懂并发了!

Deque 是一个双端队列,分别实现了在队列头和队列尾的插入。Deque 的实现有 ArrayDequeConcurrentLinkedDeque,BlockingDeque 的实现有 LinkedBlockingDeque

阻塞模式一般用于生产者 – 消费者队列,而双端队列适用于工作密取。在工作密取的设计中,每个消费者都有各自的双端队列,如果一个消费者完成了自己双端队列的任务,就会去其他双端队列的末尾进行消费。密取方式要比传统的生产者 – 消费者队列具有更高的可伸缩性,这是因为每个工作密取的工作者都有自己的双端队列,不存在竞争的情况。

ArrayDeque

ArrayDeque 是 Deque 的可动态调整大小的数组实现,其内部没有容量限制,他们会根据需要进行增长。ArrayDeque 不是线程安全的,如果没有外部加锁的情况下,不支持多线程访问。ArrayDeque 禁止空元素,这个类作为栈使用时要比 Stack 快,作为 queue 使用时要比 LinkedList 快。

除了 remove、removeFirstOccurrence、removeLastOccurrence、contains、interator.remove 外,大部分的 ArrayDeque 都以恒定的开销运行。

注意:ArrayDeque 是 fail-fast 的,如果创建了迭代器之后,却使用了迭代器外部的 remove 等修改方法,那么这个类将会抛出 ConcurrentModificationException 异常。

ConcurrentLinkedDeque

ConcurrentLinkedDeque 是 JDK1.7 引入的双向链表的无界并发队列。它与 ConcurrentLinkedQueue 的区别是 ConcurrentLinkedDeque 同时支持 FIFO 和 FILO 两种操作方式,即可以从队列的头和尾同时操作(插入/删除)。ConcurrentLinkedDeque 也支持 happen-before 原则。ConcurrentLinkedDeque 不允许空元素。

LinkedBlockingDeque

LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。LinkedBlockingDeque 把初始容量和构造函数绑定,这样能够有效过度拓展。初始容量如果没有指定,就取的是 Integer.MAX_VALUE,这也是 LinkedBlockingDeque 的默认构造函数。

同步工具类

同步工具类可以是任何一个对象,只要它根据自身状态来协调线程的控制流。阻塞队列可以作为同步控制类,其他类型的同步工具类还包括 信号量(Semaphore)栅栏(Barrier)闭锁(Latch)。下面我们就来一起认识一下这些工具类

Semaphore

Semaphore 翻译过来就是 信号量,信号量是什么?它其实就是一种信号,在操作系统中,也有信号量的这个概念,在进程间通信的时候,我们就会谈到信号量进行通信。还有在 Linux 操作系统采取中断时,也会向进程发出中断信号,根据进程的种类和信号的类型判断是否应该结束进程。

在 Java 中,Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源

Semaphore 管理着一组许可(permit),许可的初始数量由构造函数来指定。在获取某个资源之前,应该先从信号量获取许可(permit),以确保资源是否可用。当线程完成对资源的操作后,会把它放在池中并向信号量返回一个许可,从而允许其他线程访问资源,这叫做释放许可。如果没有许可的话,那么 acquire 将会阻塞直到有许可(中断或者操作超时)为止。release 方法将返回一个许可信号量。

Semaphore 可以用来实现流量控制,例如常用的数据库连接池,线程请求资源时,如果数据库连接池为空则阻塞线程,直接返回失败,如果连接池不为空时解除阻塞。

CountDownLatch

闭锁(Latch) 是一种同步工具类,它可以延迟线程的进度以直到其到达终止状态。闭锁的作用相当于是一扇门,在闭锁达到结束状态前,门是一直关着的,没有任何线程能够通过。当闭锁到达结束状态后,这扇门会打开并且允许任何线程通过,然后就一直保持打开状态。

CountDownLatch 就是闭锁的一种实现。它可以使一个或者多个线程等待一组事件的发生。闭锁有一个计数器,闭锁需要对计数器进行初始化,表示需要等待的次数,闭锁在调用 await 处进行等待,其他线程在调用 countDown 把闭锁 count 次数进行递减,直到递减为 0 ,唤醒 await。如下代码所示

public class TCountDownLatch {

    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(5);
        Increment increment = new Increment(latch);
        Decrement decrement = new Decrement(latch);

        new Thread(increment).start();
        new Thread(decrement).start();

        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
class Decrement implements Runnable {

    CountDownLatch countDownLatch;

    public Decrement(CountDownLatch countDownLatch){
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        try {

            for(long i = countDownLatch.getCount();i > 0;i--){
                Thread.sleep(1000);
                System.out.println("countdown");
                this.countDownLatch.countDown();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
class Increment implements Runnable {

    CountDownLatch countDownLatch;

    public Increment(CountDownLatch countDownLatch){
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        try {
            System.out.println("await");
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Waiter Released");
    }
}

Future

我们常见的创建多线程的方式有两种,一种是继承 Thread 类,一种是实现 Runnable 接口。这两种方式都没有返回值。相对的,创建多线程还有其他三种方式,那就是使用 Callable接口、 Future 接口和 FutureTask 类。Callable 我们之前聊过,这里就不再描述了,我们主要来描述一下 Future 和 FutureTask 接口。

我工作三年了,该懂并发了!

Future 就是对具体的 Runnable 或者 Callable 任务的执行结果进行一系列的操作,必要时可通过 get 方法获取执行结果,这个方法会阻塞直到执行结束。Future 中的主要方法有

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • cancel(boolean mayInterruptIfRunning) : 尝试取消任务的执行。如果任务已经完成、已经被取消或者由于某些原因而无法取消,那么这个尝试会失败。如果取消成功,或者在调用 cancel 时此任务尚未开始,那么此任务永远不会执行。如果任务已经开始,那么 mayInterruptIfRunning 参数会确定是否中断执行任务以便于尝试停止该任务。这个方法返回后,会对 isDone 的后续调用也返回 true,如果 cancel 返回 true,那么后续的调用 isCancelled 也会返回 true。
  • boolean isCancelled():如果此任务在正常完成之前被取消,则返回 true。
  • boolean isDone():如果任务完成,返回 true。
  • V get() throws InterruptedException, ExecutionException:等待必要的计算完成,然后检索其结果
  • V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException : 必要时最多等待给定时间以完成计算,然后检索其结果。

因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。

FutureTask

FutureTask 实现了 RunnableFuture 接口,RunnableFuture 接口是什么呢?

RunnableFuture 接口又继承了 Runnable 接口和 Future 接口。纳尼?在 Java 中不是只允许单继承么,是的,单继承更多的是说的类与类之间的继承关系,子类继承父类,扩展父类的接口,这个过程是单向的,就是为了解决多继承引起的过渡引用问题。而接口之间的继承是接口的扩展,在 Java 编程思想中也印证了这一点

我工作三年了,该懂并发了!

对 RunnableFuture 接口的解释是:成功执行的 run 方法会使 Future 接口的完成并允许访问其结果。所以它既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。

FutureTask 也可以用作闭锁,它可以处于以下三种状态

  • 等待运行
  • 正在运行
  • 运行完成

FutureTask 在 Executor 框架中表示异步任务,此外还可以表示一些时间较长的计算,这些计算可以在使用计算结果之前启动。

FutureTask 具体的源码我后面会单独出文章进行描述。

Barrier

我们上面聊到了通过闭锁来启动一组相关的操作,使用闭锁来等待一组事件的执行。闭锁是一种一次性对象,一旦进入终止状态后,就不能被 重置

Barrier 的特点和闭锁也很类似,它也是阻塞一组线程直到某个事件发生。栅栏与闭锁的区别在于,所有线程必须同时到达栅栏的位置,才能继续执行,就像我们上面操作系统给出的这幅图一样。

我工作三年了,该懂并发了!

ABCD 四条线程,必须同时到达 Barrier,然后 手牵手一起走过幸福的殿堂。

当线程到达 Barrier 的位置时会调用 await 方法,这个方法会阻塞直到所有线程都到达 Barrier 的位置,如果所有线程都到达 Barrier 的位置,那么 Barrier 将会打开使所有线程都被释放,而 Barrier 将被重置以等待下次使用。如果调用 await 方法导致超时,或者 await 阻塞的线程被中断,那么 Barrier 就被认为被打破,所有阻塞的 await 都会抛出 BrokenBarrierException 。如果成功通过栅栏后,await 方法返回一个唯一索引号,可以利用这些索引选举一个新的 leader,来处理一下其他工作。

public class TCyclicBarrier {

    public static void main(String[] args) {

        Runnable runnable = () -> System.out.println("Barrier 1 开始...");

        Runnable runnable2 = () -> System.out.println("Barrier 2 开始...");

        CyclicBarrier barrier1 = new CyclicBarrier(2,runnable);
        CyclicBarrier barrier2 = new CyclicBarrier(2,runnable2);

        CyclicBarrierRunnable b1 = new CyclicBarrierRunnable(barrier1,barrier2);
        CyclicBarrierRunnable b2 = new CyclicBarrierRunnable(barrier1,barrier2);

        new Thread(b1).start();
        new Thread(b2).start();
    }

}

class CyclicBarrierRunnable implements Runnable {

    CyclicBarrier barrier1;
    CyclicBarrier barrier2;

    public CyclicBarrierRunnable(CyclicBarrier barrier1,CyclicBarrier barrier2){
        this.barrier1 = barrier1;
        this.barrier2 = barrier2;
    }

    @Override
    public void run() {

        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + "等待 barrier1" );
            barrier1.await();

            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + "等待 barrier2" );
            barrier2.await();

        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName() +
                " 做完了!");

    }
}

Exchanger

与 Barrier 相关联的还有一个工具类就是 Exchanger, Exchanger 是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。

我工作三年了,该懂并发了!

它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过exchange 方法交换数据, 如果第一个线程先执行 exchange方法,它会一直等待第二个线程也执行 exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。因此使用Exchanger 的重点是成对的线程使用 exchange() 方法,当有一对线程达到了同步点,就会进行交换数据。因此该工具类的线程对象是成对的。

下面通过一段例子代码来讲解一下

public class TExchanger {

    public static void main(String[] args) {

        Exchanger exchanger = new Exchanger();

        ExchangerRunnable exchangerRunnable = new ExchangerRunnable(exchanger,"A");
        ExchangerRunnable exchangerRunnable2 = new ExchangerRunnable(exchanger,"B");

        new Thread(exchangerRunnable).start();
        new Thread(exchangerRunnable2).start();
    }
}


class ExchangerRunnable implements Runnable {

    Exchanger exchanger;
    Object object;

    public ExchangerRunnable(Exchanger exchanger,Object object){
        this.exchanger = exchanger;
        this.object = object;
    }


    @Override
    public void run() {

        Object previous = object;

        try {
            object = this.exchanger.exchange(object);
            System.out.println(
                    Thread.currentThread().getName() + "改变前是" + previous + "改变后是" + object);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

总结

本篇文章我们从同步容器类入手,主要讲了 fail-fastfail-safe 机制,这两个机制在并发编程中非常重要。然后我们从操作系统的角度,聊了聊操作系统层面实现安全性的几种方式,然后从操作系统 -> 并发我们聊了聊 Java 中的并发工具包有哪些,以及构建并发的几种工具类。

你好,我是 cxuan,我自己手写了四本 PDF,分别是 Java基础总结、HTTP 核心总结、计算机基础知识,操作系统核心总结,我已经整理成为 PDF,可以关注公众号 Java建设者 回复 PDF 领取优质资料。

我工作三年了,该懂并发了!


喜欢 (0)