0%

架构设计 线程设计模式

介绍了调度线程常用的设计模式。

Single Threaded Execution 模式

Single Threaded Execution 是指以单线程的形式访问共享资源,保证线程安全。典型例子是使用 mutex lock 使得在同一时间只有 1 个线程访问共享资源。

别名

  • Critical Section
  • Critical Region

问题及解决方案

当多线程访问共享资源时,如果各个线程可以随意变更实例状态,实例会失去安全性。Single Threaded Execution 模式严格定义了临界区,并确保临界区在同一时间只接受一个线程执行,这样就可以确保实例的安全性。

死锁发生的场景

当 Single Threaded Execution 模式满足以下条件,死锁就会发生。

  1. 线程在持有着某个 Shared Resource 角色的锁的同时,还想获取其他 Shared Resource 角色的锁。
  2. 获取 Shared Resource 角色的锁的顺序并不固定(Shared Resource 角色是对称的)

与其他并发设计模式联系

  1. 当实例状态不变时,推荐使用 Immutable 模式。
  2. 当实例读多写少时,推荐使用 Read-Write Lock 模式。
  3. 当实例锁状态与业务状态耦合时,推荐使用 Guarded Suspension 模式。
  4. 当实例只需暂存而不需要与其他线程共享读写时,推荐使用 Thread-Specific Storage 模式。

Immutable 模式

Immutable 模式是指将共享资源设定为不发生改变的类(Immutable 类),在访问实例时不需要执行耗时的互斥处理。使用巧妙可以提高程序性能。

问题及解决方案

使用 Single Threaded Execution 模式会导致吞吐量大幅度下降。但使用中发现,虽然多线程共享了实例,实例本身并不会发生变化,因此将实例定义为 Immutable 模式可以解决这个问题。

以“只读”的方式,解决 write-write conflict 和 read-write conflict。

Read-Write Lock 模式

在 Read-Write Lock 模式中,读取操作和写入操作是分开考虑的。当线程执行读取操作时,实例的状态不会发生变化,所以多个线程可以同时读取。但在读取时,不可以写入。当线程执行写入操作时,实例的状态就会发生变化。因此,当有一个线程正在写入时,其他线程不可以读取或写入。

一般来说,执行互斥处理会降低程序性能。但如果把针对写入的互斥处理和针对读取的互斥处理分开来考虑,在“适合读取操作繁重时”和“适合读取频率比写人频率高时”,则可以提高程序性能。

问题及解决方案

如果不进行线程的互斥处理将会失去安全性。但是,如果使用 Single Threaded Execution 模式,吞吐量又会下降。在当多个线程共享了实例,且存在读取实例状态的线程(Reader角色)和改变实例状态的线程(Writer角色)时,可以考虑使用 Read-Write Lock 模式改善吞吐量。

以“读写分离”的方式,保证 read-read safety。

Guarded Suspension 模式

Guarded Suspension 模式通过让线程等待来保证实例的安全性。它主要是用来解决线程协作的问题,其核心思想是某个线程执行特定的操作前需要满足一定条件,条件未满足则暂挂线程,处于 WAITING 状态,直到条件满足该线程继续执行。

别名

  • Spin Lock
  • Guarded Wait

问题及解决方案

如果各个线程都随意地访问实例,实例会失去安全性。使用 Guarded Suspension 模式时,可以通过守护条件来控制方法的执行,保证实例的安全性。但是,如果永远无法满足守护条件,那么线程会永远等待,所以可能会失去活性(Liveness)。

与其他并发设计模式联系

  1. 如果不希望线程等待守护条件成立,直接返回,则推荐使用 Balking 模式。
  2. 如果希望先做其他处理,等需要时再获取,推荐使用 Future 模式。

Latch 模式

Latch(门阀)模式指定了一个屏障,若干线程并发执行某个特定的任务,然后等到所有的子任务都执行结束之后再统一汇总,只有所有的条件都达到满足的时候,门阀才能打开。

详细设计思路参考《JUC ConcurrentUtility》中 CountDownLatch 实现

Balking 模式

Balking 模式与 Guarded Suspension 模式一样,也存在守护条件。在 Balking 模式中,如果守护条件不成立,则立即中断处理。这与 Guarded Suspension 模式有所不同,因为 Guarded Suspension 模式是一直等待至可以运行。

问题及解决方案

如果各个线程都随意地访问实例,实例会失去安全性。但是,如果要等待安全的时机,活性又会下降。当实例状态不正确时就中断处理。首先,用“守护条件”表示实例的“正确状态”。接着,在执行可能会导致实例失去安全性的处理之前,检查是否满足守护条件。只有满足守护条件时才让程序继续执行。如果不满足守护条件就中断执行,立即返回。

与其他并发设计模式联系

当在多线程环境下使用 Observer 模式时,有时会用到 Balking 模式。当 Subject 角色通知 Observer 角色状态发生变化时,如果 Observer 角色的状态不适合处理该通知,则会 balk 该通知处理。

Balking 模式和 Guarded Suspension 模式之间

介于“直接balk并返回”和“等待到守护条件成立为止”这两种极端的处理方法之间,还有一种处理方法,那就是“在守护条件成立之前等待一段时间”。在守护条件成立之前等待一段时间,如果到时条件还未成立,则直接 balk。我们将这种处理称为 guarded timed 或 timeout。

Future 模式

Future 的意思是未来、期货(经济学用语)。假设有一个方法需要花费很长时间才能获取运行结果。那么,与其一直等待结果,不如先拿一张“提货单”。获取提货单并不耗费时间。这里的“提货单”我们就称为 Future 角色。

获取 Future 角色的线程会在稍后使用 Future 角色来获取运行结果。这与凭着提货单去取蛋糕非常相似。如果运行结果已经出来了,那么直接领取即可;如果运行结果还没有出来,那么需要等待结果出来。

问题及解决方案

当一个线程(Client 角色)向其他线程委托了处理,而 Client 角色也想要获取处理结果时,如果在委托处理时等待执行结果,响应性会下降。

使用 Future 模式时,在处理开始时返回 Future 角色,稍后再将处理结果设置到 Future 角色中。这样,Client 角色就可以通过 Future 角色在自己觉得合适的时机获取(等待)处理结果。

与其他并发设计模式联系

  1. 如想在 Thread-Per-Message 模式中获取处理结果时推荐使用 Future 模式。
  2. 如想在 Worker Thread 模式中获取处理结果时推荐使用 Future 模式。
  3. Future 模式可以与 Guarded Suspension 模式和 Balking 模式结合来做到守护获取执行结果以及防止重复执行等。

与通用设计模式联系

  1. Builder 模式

    Builder 模式会晚一些才去获取组装完成的对象。Future 模式是在开始组装时就创建了 Future 角色的对象,然后会使用 Future 角色去访问实际的处理结果对象。

  2. Proxy 模式

    在 Proxy 模式中,由于创建 RealSubject 角色会花费很长时间,所以作为代理人,Proxy 角色会尽量完成更多的操作。而且,RealSubject 角色只在其必须出面时才会被创建(在 Virtual Proxy 模式下)。在 Future 模式中,由于创建 RealData 角色会花费很长时间,所以会使用 Future 角色作为提货单。接着由其他线程创建 RealData 角色。如果在使用 Future 角色时必须用到 RealData 角色,则线程会一直等待 RealData 角色创建完毕。

Producer-Consumer 模式

Producer-Consumer 模式,即生产者安全地将数据交给消费者。一般来说,在该模式中,生产者和消费者都有多个,当然生产者和消费者有时也会只有一个。当两者都只有一个时,我们称之为 Pipe 模式。

问题及解决方案

想从某个线程(Producer角色)向其他线程(Consumer角色)传递数据时。如果 Producer 角色和 Consumer 角色的处理速度不一致,那么处理速度快的角色会被处理速度慢的角色拖后腿,从而导致吞吐量下降。另外,如果在 Producer 角色写数据的同时,Consumer 角色去读取数据,又会失去安全性。

因此,在 Producer 角色和 Consumer 角色之间准备一个中转站 —— Channel 角色。接着,让 Channel 角色持有多个数据。这样,就可以缓解 Producer 角色与 Consumer 角色之间的处理速度差异。另外,如果在 Channel 角色中进行线程互斥,就不会失去数据的安全性。这样就可以既不降低吞吐量,又可以在多个线程之间安全地传递数据。

与其他并发设计模式联系

  1. 在 Worker Thread 模式中传递请求的时候可以使用 Producer-Consumer 模式。

与通用设计模式联系

  1. Command 模式

    通过 Command 模式连接 Producer 和 Consumer。

  2. Strategy 模式

    在 Producer-Consumer 模式中,Producer 角色会将 Data 角色传递给 Consumer 角色。确定 Data 角色传递顺序的部分可以使用 Strategy 模式。

Thread-Per-Message 模式

Thread-Per-Message 模式中,Per 就是“每”的意思。因此,直译过来就是“每个消息一个线程”的意思。该模式会为每个命令或请求新分配一个线程,委托这个线程来执行后面的处理任务。

问题及解决方案

当线程(Client 角色)要调用实例(Host 角色)的方法时。在方法的处理结束前,程序的控制权无法从 Host 角色中返回。如果方法的处理需要花费很长时间,响应性会下降。

因此在 Host 角色中启动一个新线程。接着,将方法需要执行的实际处理交给这个新启动的线程负责。这样,Client 角色的线程就可以继续向前处理。这样修改后,可以在不改变 Client 角色的前提下提高响应性。

与其他并发设计模式联系

  1. 在 Worker Thread 模式中可以循环利用已经创建好的线程,避免了不断创建线程的开销。

Worker Thread 模式

在 Worker Thread 模式中,RM(Resource Manager)会保存工人线程(Worker Thread),工人线程(Worker Thread)会逐个取回工作并进行处理。当所有工作全部完成后,工人线程会等待新的工作到来。另外,如果从“保存多个工人线程的场所”这一点来看,我们也可以称这种模式为 Thread Pool(线程池)模式。

别名

  • Thread Pool
  • Background Thread

问题及解决方案

当线程(Client 角色)要调用实例(Host 角色)的方法时。如果方法的处理需要花费很长时间,响应性会下降。如果为了提高响应性而启动了一个新的线程并让它负责方法的处理,那么吞吐量会随线程的启动时间相应下降。另外,当要发出许多请求时,许多线程会启动,容量会因此下降。

解决方法是使用 Worker Thread 模式,先启动执行处理的线程(Worker Thread)。接着,将代表请求的实例传递给工人线程。这样,就无需每次都启动新线程了。

相关问题

  1. 创建了 Worker Thread 模式是否真正的提高了吞吐量?
  2. 创建的 Workers 是否数量合适?
  3. 是否需要通过 Producer-Consumer 模式来权衡容量和资源?

Two-Phase Termination 模式

Two-Phase Termination 模式的名字直译为中文是“分两阶段终止”的意思。它是一种先执行完终止处理再终止线程的模式。

问题及解决方案

当想要终止正在运行的线程时,如果因为外部的原因紧急终止了线程,就会失去安全性。通过实现 Two-Phase Termination 模式,让即将被终止的线程自己去判断开始终止处理的时间点,待将资源处理完毕后,再终止线程。

我们称线程在进行正常处理时的状态为“操作中”。在要停止该线程时,我们会发出“终止请求”。这样,线程就不会突然终止,而是会先开始进行“打扫工作”。我们称这种状态为“终止处理中”。从“操作中”变为“终止处理中”是线程终止的第一阶段。

在“终止处理中”状态下,线程不会再进行正常操作了。它虽然仍然在运行,但是只会进行终止处理。终止处理完成后,就会真正地终止线程。“终止处理中”状态结束是线程终止的第二阶段。

先从“操作中”状态变为“终止处理中”状态,然后再真正地终止线程。这就是 Two-Phase Termination 模式。该模式的要点如下:

  1. 安全地终止线程(安全性)
  2. 必定会进行终止处理(生存性)
  3. 发出终止请求后尽快进行终止处理(响应性)

实现方案

我们需要通过对三个状态(isInterruptedisShutdownisTerminated)的检查,来实现 Two-Phase Termination 模式,也就是优雅停机模式。这里需要用到 interrupt() 方法,我们需要知道它做了什么?

在一个线程内部存在 interrupt flag 的标识,如果一个线程调用了 interrput,那么它的 flag 将被设置,如果当前的线程正在执行可中断方法被阻塞时,调用 interrupt 方法将其中断,反而会导致 flag 被清除并抛出 InterruptedException 异常。

仅仅检查 isTerminated/isShutdown 终止状态是不够的

因为当想要终止线程时,该线程可能正在 sleep。而当线程正在 sleep 时,即使将 isTerminated/isShutdown 标志设置为 true,线程也不会开始终止处理。等到 sleep 时间过后,线程可能会在某个时间点开始终止处理,但是这样程序的响应性就下降了。如果使用 interrupt 方法的话,就可以中断 sleep。

另外,线程当时也可能正在 wait。而当线程正在 wait 时,即使将 isTerminated/isShutdown 标志设为 true,线程也不会从等待队列中出来,所以我们必须使用 interrupt 方法对线程下达“中断wait”的指示。

仅仅检查 isInterrupted 中断状态是不够的

有人可能会问:“调用 interrupt 方法后,如果线程正在 sleep 或是 wait,那么会抛出 InterruptedException 异常,而如果不抛出异常,线程就会变为中断状态。也就是说,没有必要特意准备一个新的 isTerminated/isShutdown 标志。只要捕获 InterruptedException,使用 isInterrupted 方法来检查线程是否处于中断状态不就可以了吗?”

这样的疑问很有道理。如果是像示例程序那样,开发人员可以看到线程的所有相关程序,那么就无需使用 isTerminated/isShutdown 标志。只要捕获 InterruptedException,并使用 isInterrupted 方法就能够正确地开始终止处理。

try {
    Thread.sleep(1000);
} catch (InterruptedException e) {
    //ignore
}

但是,只要线程正在执行的方法中如上述代码一样忽略 InterruptedException。这样,即使在可中断阻塞状态中被中断时抛出了 InterruptedException,线程也不会变为中断状态。

补充:当 interrupt 方法被调用后,线程就可以被中断了。中断线程这个行为会带来以下结果之一。

  1. 线程变为“中断状态”:反应为“状态”
  2. 抛出“InterruptedException 异常”:反映为“控制”

    我们还需要注意中断状态的读取:

  • Thread.currentThread().isInterrupted() 不清除中断状态位

    • Thread.interrupted() 清除中断状态位

    由于在可中断阻塞状态中被中断时抛出了 InterruptedException,线程不会变为中断状态。所以有时需要防止该中断状态遗失,再次调用中断。

    try {
         Thread.sleep(100000000);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }

也就是说,如果程序中没有 isTerminated/isShutdown 标志,而且有上面这样的代码,那么即使使用 terminate() 方法发出了终止请求,该请求也不会被处理。isTerminated 是用于记录是否已经发出终止请求的标志。

参考 ThreadPoolExecutor Two-Phase Termination 实现

通常,线程都会在 ExecutorService 接口背后运行。为了优雅地终止运行中的线程,ExecutorService 接口为我们准备了 shutdown 方法。

ExecutorService 接口还为我们提供了用于确认终止处理已执行到哪个阶段的方法。isShutdown 方法是用于确认 shutdown 方法是否已经被调用的方法。如果 shutdown 方法已经被调用,那么 isShutdown 会返回 true。但是,即使 isShutdown 为 true,也并不表示线程已经实际停止了。isTerminated 方法是用于确认线程是否已经实际停止了的方法。如果线程已经停止了,isTerminated 会返回 true。

Thread-Specific Storage 模式

Thread-Specific Storage 模式为每个线程准备一个特有的存储空间,让存储空间与线程一一对应并进行管理。也就是 java.lang.ThreadLocal

别名

  • Per-Thread Attribute
  • Thread-Specific Data
  • Thread-Specific Field
  • Thread-Local Storage

上下文(Context)的作用

与强调吞吐量相比,Thread-Specific Storage 模式更看重如下所示的可复用性。

  1. 不改变结构即可实现程序。
  2. 没有显式地执行互斥处理,所以并发编程时犯错的可能性较小。

上下文使程序结构变得简单。但是反过来,它也让开发人员难以清楚地掌握处理中到底使用了哪些信息。上下文的危险性与全局变量的危险性非常相似。

但如果使用 Thread-Specific Storage 模式后,上下文会被引入程序中,这会导致难以透彻地理解整体代码。找 Bug 的原因时,我们会关注信息的流向。为了调查传递给方法的参数信息,我们会加入调试语句打印出参数的值。不过,一旦使用了上下文,要想解决问题就会非常困难。这是因为,可能程序以前的行为会导致上下文的异常,从而引发当前的 Bug。

Active Object 模式

Active Object 模式是一种异步编程模式。Active Object 是主动对象的意思,所谓主动对象是指拥有自己独立的线程、接收异步消息并返回处理结果的实例。它通过解耦(Decoupling)方法的调用(Method Invocation)与方法的执行(Method Execution)来提高并发性。换而言之,方法所定义的功能的调用方法和执行方法是运行在不同的线程中的,从而提高了并发性。

别名

  • Actor
  • Concurrent Object

角色及调用关系

Client(委托者)

Client 角色调用 ActiveObject 角色的方法来委托 Proxy 处理请求。并且 Client 角色在获取处理结果时,会调用 VirtualResult 角色的 getResultValue 方法。这里使用了 Future 模式。

Proxy(代理人)

Proxy 角色负责将方法调用转换为 MethodRequest 角色的对象。转换后的 MethodRequest 角色会被传递给 Scheduler 角色。Proxy 角色实现了 ActiveObject 角色提供的接口(API)。

调用 Proxy 角色的方法的是 Client 角色。将方法调用转换为 MethodRequest 角色,并传递给 Scheduler 角色的操作都是使用 Client 角色的线程进行的。

Servant(仆人)

Servant 角色负责实际地处理请求。

调用 Servant 角色的是 Scheduler 角色的线程。Scheduler 角色会从 ActivationQueue 角色取出一个 MethodRequest 角色(实际上是 ConcreteMethodRequest 角色)并执行它。此时,Scheduler 角色调用的就是 Servant 角色的方法。

Servant 角色实现了 ActiveObject 角色定义的接口(API)。

Scheduler(调度员)

Scheduler 角色负责将 Proxy 角色传递来的 MethodRequest 角色传递给 ActivationQueue 角色,以及从 ActivationQueue 角色取出并执行 MethodRequest 角色这两项工作。

Proxy 角色会将请求转换为 MethodRequest 角色,而 Servant 角色则会实际地执行该请求。Scheduler 角色介于 Proxy 角色和 Servant 角色之间,负责管理按照什么顺序执行请求,也就是请求的调度。

MethodRequest

MethodRequest 角色是与来自 Client 角色的请求对应的角色。MethodRequest 定义了负责执行处理的 Servant 角色,以及负责设置返回值的 Future 角色和负责执行请求的方法(execute )。

ConcreteMethodRequest

ConcreteMethodRequest 角色是使 MethodRequest 角色与具体的方法相对应的角色。对于 ActiveObject 角色中定义的每个方法,会有各个类与之对应,比如MethodAlphaRequest、MethodBetaRequest…。为了便于大家看出它们与 methodAlpha、methodBeta 等方法名的对应关系,上图中并没有使用 ConcreteMethodRequest 这个名字,而是使用 methodAlphaRequest、methodBetaRequest 等名字来表示。各个 ConcreteMethodRequest 角色中的字段分别与方法的参数相对应。

Event Bus 模式

Event Bus 模式可以看作多线程实现的观察者模式,用户端通过订阅数据源从而得到服务端推送的数据。

以下是 Event Bus 架构类图

  • Bus 接口对外提供了几种主要的使用方式,比如 post 方法用来发送 Event,register 方法用来注册 Event 接收者(Subscriber)接受响应事件,EventBus 采用同步的方式推送 Event,AsyncEventBus 采用异步的方式(Thread-Per-Message)推送 Event。
  • Registry 注册表,主要用来记录对应的 Subscriber 以及受理消息的回调方法。
  • Dispatcher 主要用来将 Event 广播给注册表中监听了 Topic 的 Subscriber。

Event Driven 模式

EDA(Event-Driven Architecture)是一种实现组件之间松耦合、易扩展的架构方式。一个最简单的 EDA 设计需要包含如下几个组件:

  • Events:需要被处理的数据。
  • Event Handlers:处理 Events 的方式方法。
  • Event Loop:维护 Events 和 Event Handlers 之间的交互流程。

如下图所示,EventA 将被 HandlerA 处理,而 EventB 将被 HandlerB 处理,这一切的分配都是由 Event Loop 所控制的。

因此,我们需要在 Event Loop 中将事件类型与 Handler(也叫 Channel)绑定,如下:

EventLoop el = EventLoop.build();
el.registerChannel(UserEvent.class, UserEventHandler.instance());

Event vs Message vs Command

我们通常会对 Event Driven 模式、Event Bus 模式、Request Driven 模式混淆不清。我们可以将 Event Driven 的 DTO 称作 Event,Event Bus 的 DTO 称作 Message,Request Driven 模式的 DTO 称作 Command。

我们可以通过以下定义区分:

  • Command 是一个未发生、待处理的命令(流程未进行)。
  • Message 是一个已发生、待后续系统接收到进一步处理的命令(流程已进行一部分)。
  • Event 是一个已发生、待后续系统接收到执行附加处理的命令(流程已进行完毕,附加处理待进行)。

EDA 架构类图

  • Message 是 Event 更高级别的抽象,用来标识一个事件消息。
  • Events 需要包含两个属性:类型和数据,Event 的类型决定了它会被哪个 Handler 处理,数据是在 Handler 中代加工的材料。
  • Event Handlers 主要用于处理 Event,比如一些 filtering 或者 transforming 数据的操作等。
  • EventLoop 处理接收到的所有 Event,并且将它们分配给合适的 Handler 去处理。
  • Channel 主要用来接收来自 Event Loop 分配的事件,每一个 Channel 负责一个类型。
  • Dynamic Router 主要将 Event 注册到对应的 Channel 并且还负责事件的分发。

在设计 Event 事件时,我们最好使用 Immutable 模式用来阻止处理中产生的并发问题。

Strategized Locking 模式

Read-Write Lock 模式依据 Reader 角色和 Writer 角色的特点执行互斥处理,提高程序性能。 而 Strategized Locking 模式则是将用于同步的结构参数化,使互斥处理的执行变得更加灵活。

Mediator 模式

在 Mediator 模式,多个 ConcreteColleague 角色之间并不直接通信,而是通过中间角色 ConcreteMediator 来调解、控制处理。

Producer-Consumer 模式中的 Channel 角色也负责调解、控制 Producer 角色和 Consumer 角色的 处理。但 Producer-Consumer 模式中的调解和控制并不是通过调用 Producer 角色和 Consumer 角色的方法来进行的,而是通过对各线程执行互斥处理来进行的。

Mutiphase Cancellation 模式

使用 Multiphase Cancellation 模式终止线程时,如果在一定时间内线程没有终止,那么程序会逐渐发出更加强硬的终止请求。

Multi-Phase Startup 模式

使用 Two-Phase Termination 模式时,在接收到终止请求后,程序并不立即终止线程,而是先进入“终止处理中”阶段,然后安全地终止线程。

而使用 Multi-Phase Startup 模式时,如果存在多个子系统,则程序会经过多个阶段启动全部系统。在该模式下,系统会定义一个整数值的运行级别,用来表示当前哪个运行级别正处于启动中状态。

Java 的 Applet 也使用了该模式,不过它将 Multi-Phase Startup 模式缩减至了三步(即创建实例 → 调用 init 方法 → 调用 start 方法)。