7. 多线程

我们先来看一个熟悉的例子。

在下图中,包含了 6 个 Action :

  1. 首先, US 会执行异步操作 Action1 ,以等待一个请求消息;

  2. 随后, Action2Action3 是两个并发进行的异步操作;

  3. Action2Action3 完成之后, Action4Action5 同样是两个异步操作, 会依次顺序执行;

  4. 最后,执行同步操作 Action6 ,以回复一个响应消息。

_images/mt-1.png

这个过程简单而清晰,我们可以轻易的使用 Transaction DSL 对其进行描述。

__transaction
( __req(Action1)
, __concurrent(__asyn(Action2), __asyn(Action3)) , __asyn(Action4)
, __asyn(Action5)
, __rsp(Action6)));

但事情并非总是这么简单。如下图所示,有一天,需求迫使你不得不在原有流程上加入一个新的异步处理: 在 Action2 收到其应答消息之后,需要马上发出一条新消息,但却无须停下来等待其应答,整个流程仍然按照原有的方式往前走。 在这期间,应答消息随时可能到来。一旦其收到这条应答,系统应该马上对其进行处理。

而这个新增的请求-应答异步操作,就是下图中的 Action7 :

_images/mt-2.png

这样的过程,用 序列图 可能不能准确的表达其过程,而 活动图 会让其更加直观:

_images/mt-3.png

7.1. __fork

上图已经揭示了问题的本质:新加入的 Action7 已经脱离了本来的控制序列,独自形成了另外一条控制序列。如果我们将一条控制序列看作一个线程, 那么,我们已经在面对多线程的问题。

Transaction DSL 中,我们可以使用 __fork 来创建一条新线程。例如:

const ActionThreadId ACTION7_THREAD = 1;

__fork(ACTION7_THREAD, __asyn(Action7))

正如例子所揭示的, __fork 有两个参数:

第一个参数是 Thread ID ,随后我们会谈到。

第二个参数则是在这条线程上执行的操作。在一条线程的执行的操作,在本例中非常简单; 但事实上,它可以是任意复杂的操作,比如:

__fork(THREAD1, __sequential
                  ( __concurrent(__asyn(Action1), __asyn(Action2)) , __asyn(Action2)
                  , __time_guard(TIMER2, __asyn(Action3)))

而对于我们这个例子,其完整的描述如下:

const ActionThreadId ACTION7_THREAD = 1;

__transaction
( __req(Action1)
, __concurrent
    ( __sequential
        ( __asyn(Action2)
        , __fork(ACTION7_THREAD, __asyn(Action7)))
    , __asyn(Action3))
, __asyn(Action4)
, __asyn(Action5)
, __rsp(Action6));

其中,我们将 Action2ACTION7_THREAD 放在了一个 __sequential 操作里。 这是因为,我们必须确保,ACTION7_THREAD 的创建发生在 Action2 的成功执行结束之后。 同时,我们也必须确保,这个序列,和 Action3 是并行发生的。

7.1.1. 此线程非彼线程

尽管 Transaction DSL 引入了线程的概念,但这里的线程与操作系统或任何平 台所提供的线程之间没有任何关系。 它是一个由事务框架创建,管理和调度的实体,操作系统或平台完全意识不到它们的存在。

其执行和调度粒度是以 Action 为单位的,不像操作系统级别的线程是以硬件指令为单位的。 在一个抢占式的系统中,操作系统级别的线程,可以在允许的任何指令处暂停或中止一个线程的运行, 但 Transaction DSL 的线程则完全做不到这一点, 当然,它也没有任何必要去做到这一点。

所以,在 Transaction DSL 里,其准确的名字是操作线程 (Action Thread)。

7.2. __join

当创建了一条线程时,它就和其它的线程——包括主线程——各自独立的并发执行,互不干扰,老死不相往来。

但是,在现实的系统中,往往又存在这这样的需求,即,虽然同时存在多个线程,但其中某个线程的执行过程中,某个步骤的后续执行, 是以另外一条线程的执行结束为前提的。

比如,我们可以给之前的例子增加一个约束: Action5 开始执行之前,要求 Action7 必须执行完毕。如图所示:

_images/mt-4.png

对于这样的约束,你可以使用 __join 来描述。它的参数,就是某个线程要等待的 Thread ID 。例如:

// ...
__fork(THREAD1, __asyn(Action1))
// ...
__join(THREAD1)
// ...

将其应用于我们的例子,其完整的描述如下:

const ActionThreadId ACTION7_THREAD = 1;

__transaction
( __req(Action1)
, __concurrent
    ( __sequential
        ( __asyn(Action2)
        , __fork(ACTION7_THREAD, __asyn(Action7)))
    , __asyn(Action3))
, __asyn(Action4)
, __join(ACTION7_THREAD)
, __asyn(Action5)
, __rsp(Action6));

__join 时,如果被 join 的线程已经执行完毕,则 __join 马上完成。否则, __join 所在线程 将在 __join 处一直等待,直到目标线程运行结束。

如果一个线程 __join 它自己,会马上成功完成。

7.2.1. Thread ID

Thread ID ,标示了一个线程的身份,所以,在一个事务中,每个线程的 Thread ID 必须唯一。 在目前的实现中,其取值范围为 07 ,但 0 是主线程的 ID ,用户不能使用。所以,在一个事务中,用户最多允许创建 7 个线程。

由于 __join 机制的存在,在 __join 时,用户必须有一种手段,来指明具体的线程。所以,每个线程必须有一个唯一的身份标识。

从实现手段上,这个标识可以是一个字符串,从而避免让用户需要亲自来分配和管理 Thread ID 。 但是,从语言的约束和实现的复杂度上,用整数作为标识,是最为简单的。虽然这略微增加了用户的负担, 但却避免了框架实现的复杂度。毕竟,用户最多只能在一个事务中创建 7 个线程,这仍然在人类可轻松管理的范围内。

7.2.2. 同时等待多个线程

有些时候,一个线程的继续执行,是以多个线程的执行结束为条件的。这种情况下,你仍然使用 __join

比如,在下图中所描述的事务中,Action2Action3 在执行结束后,分别启动了一个线程, 并发的运行 Action7Action8 ,随后,在执行 Action5 之前,要求这两个线程都必须执行结束。

_images/mt-5.png

可描述为:

const ActionThreadId ACTION7_THREAD = 1;
const ActionThreadId ACTION8_THREAD = 2;

__transaction
( __req(Action1)
, __concurrent
    ( __sequential
        ( __asyn(Action2)
        , __fork(ACTION7_THREAD, __asyn(Action7)))
    , __sequential
        ( __asyn(Action3)
        , __fork(ACTION8_THREAD, __asyn(Action8))))
, __asyn(Action4)
, __join(ACTION7_THREAD, ACTION8_THREAD) , __asyn(Action5)
, __rsp(Action6));

__join 是一个变参操作,最多可以等待 7 个线程。因为每个事务的最大线程数量是 8 个。所以,每个线程都可以等待所有其它线程。

或许你会敏锐的发现,对于下图所描述的事务,和上图中所描述的事务是等价的。

_images/mt-6.png

所以,你会希望将代码写成这种形式,从而减少对于线程的操作,也让代码看来更加的简洁。

__transaction
( __req(Action1)
, __concurrent(__asyn(Action2), __asyn(Action3))
, __concurrent(__asyn(Action7), __asyn(Action4), __asyn(Action8))
, __asyn(Action5)
, __rsp(Action6));

不幸的是,尽管它们看起来很相似,但它们的实时性和性能却并不相同(想像一下, Action2Action8 是慢速操作, 而 Action3Action7 是快速操作,对比一下两者的性能)。而对于实时性和性能的追求,正是我们使用并发模型的原因, 不是吗?

7.2.3. 匿名线程

你应该早就已经意识到,在 __concurrent 里的多个操作,和通过 __fork 创建线程执行的操作都是并发操作。 所以, __concurrent 里的每个 Action 也都是线程。 不同的是,它们没有自己明确的身份: Thread ID 。所以,直接被放在 __concurrent 里的线程被称为匿名线程。 之所以它们不需要 Thread ID ,是因为 __concurrent 本身已经保证了这些线程 会被自动的 __join , 比如:

__concurrent(__asyn(Action1), __asyn(Action2))

从控制过程看,就近似的等价于:

__fork(TID1, __asyn(Action1)),
__fork(TID2, __asyn(Action2)),
__join(TID1, TID2)

但很明显,前一种写法更加简单明确。另外,匿名线程的一个重要优势是:没有数量上的约束。在一个事务内部,你可以根据需要创建任意多个匿名线程。 其实,匿名线程和有名线程之间的差别还有很多,我们会在其它相关的部分进行讨论。

7.3. 调度策略

对于一个事务而言,即便存在多条线程,但只要主线程执行结束,整个事务就执行结束。此时,其它线程执行到什么阶段, 都不会影响一个事务的 exechandleEvent 函数的返回值(你应该还记得,其返回值为 CONTINUE 表示一个事务仍在工作, 而 SUCCESS 则表示其已经成功结束)。

当主线程结束时,所有其它正在工作的有名线程将会被强行中止。 所以,一个用户创建的有名线程 __join 主线程是没有意义的。

7.3.1. join all

如果你期望所有的线程都结束之后,整个事务才能结束,那么你应该在主线程使用 __join ,但不指定任何具体 Thread ID 来等待 所有 其它线程结束。

__transaction
( __fork(THREAD1, __asyn(Action1))
, __fork(THREAD2, __asyn(Action2))
, __asyn(Action3)
, __join());

如果主线程是一个 __procedure ,那么就应该在 __finally__join,比如:

__transaction
( __fork(THREAD1, __asyn(Action1))
, __fork(THREAD2, __asyn(Action2))
, __asyn(Action3))
, __finally
    ( __asyn(Action4)
    , __join()));

注意,__join 并不关心它所等待的线程是以成功还是失败,而只关心它们是否已经结束。

7.3.2. 线程错误

Transaction DSL 对于错误的应对哲学是:尽早失败 ( Fail Fast )。 因为,一旦一个事务中的任何一点发生了不可修复的错误,那么就应该让整个事务的所有线程都进入失败处理。 否则,将会导致其它线程的不必要的行为浪费。

7.3.2.1. 有名线程的失败

如果线程启动时发生错误, __fork 以失败结束。比如在下面的过程里:

__transaction
( __fork(THREAD1, __asyn(Action1))
, __asyn(Action2))
, __finally(__on_fail(__asyn(Action3))));

如果 Action1exec 调用结果为某种错误,则 __fork 失败,从而 Action2 会被调过,直接进入 __finally ; 而由于 __fork 失败,因而 __on_fail 谓词判断结果为 true ,所以, Action3 会得到执行。

一旦线程的 exec 执行结果是 SUCCESS ,则 __fork 成功。

如果被创建线程 exec 的结果是 CONTINUE__fork 同样成功结束。而被创建线程开始独立运行,从此与创建者线程之间再无任何关系。

如果随后任何线程发生了错误,(无论是通过上下文汇报的错误,还是通过执行结果返回的错误),都将导致对所有其它线程的 stop 调用。比如:

__transaction
  ( __fork(THREAD1, __asyn(Action1))
  , __fork(THREAD2, __asyn(Action2))
  , __asyn(Action3)
  , __asyn(Action4))
  , __recover
      ( __on_fail(__asyn(Action5))
      , __on_succ(__asyn(Action6))));

THREAD1THREAD2 被成功 __fork 后,如果 Action2 在随后的处理过程中发生了错误,则主线程和``THREAD1`` 都会 被 stop ,从而导致 THREAD1 的直接终止,而主线程的 Action3 也同样被终止, 并跳过 Action4 , 直接进入 __recover 。而由于这个错误,Action5 得到执行。由于 __recover ,整个过程最终失败还是成功,取决于 Action5 的执行结果。

但是,如果在 THREAD1THREAD2 发生任何错误之前,主线程已成功进入 __recover ,由于 __recover 让主线程 处于 免疫模式 ,所以,其它线程发生的任何错误,都将无法再影响到主线程。从而也无法影响到整个事务的运行结果。

因而,如果你的确想感知其它线程的失败,则务必在进入 __finally__recover 之前,进行 __join

7.3.2.2. 匿名线程的失败

而匿名线程则不然,它的错误将会被创建线程捕捉到。如果发生错误的匿名线程处于其创建线程的 某个 __procedure 内,则这个错误将可能被 __procedure__recover 捕捉并修复。

比如,在下面的事务中,如果 Action1 发生失败,它将会中止 Action2 的执行, 然后转向执行 Action4 ,如果 Action4 成功执行,则整个事务则成功结束。

__transaction
( __concurrent
    ( __asyn(Action1)
    , __asyn(Action2))
    , __asyn(Action3)
, __recover(__asyn(Action4)))

另外,当一个匿名线程失败后,其宿主有名线程必须等待匿名线程所处的整个 __concurrent 执行结束之后,才能进入 __finally 操作。 比如在下面的事务中,如果 Action1 失败,它所在的匿名线程将会马上以失败结束。

由于其所处的 __concurrent 里,还存在另外一条匿名线程,所以,另外一条匿名线程也会进入失败处理, 从而跳转执行 Action3 ;由于 Action3 是一个异步操作, 需要等待进一步的消息。所以,到目前为止, 整个 __concurrent 并没有执行结束。

Action3 等到期待的消息并处理之后, __concurrent 里的两个匿名线程都结束了, 从而导致整个 __concurrent 以错误的状态结束。

然后,其所处的 有名线程 —— 在这里是 主线程 —— 将会跳进 __finally ,去执行 Action5 ; 等 Action5 执行结束后, 整个事务将以失败结束。

__transaction
( __concurrent
    ( __asyn(Action1)
    , __procedure(__asyn(Action2), __finally(__asyn(Action3))))
, __asyn(Action4)
, __finally(__asyn(Action5)))

尽管如此,当一个匿名线程失败时,仍然会及时的通知给整个事务,从而让事 务内的其它线程可以尽早进入失败处理。

比如,在下面的事务里,如果匿名线程的 Action2 发生了失败, THREAD1 将会马上意识到这个错误并结束执行。 而主线程的错误处理顺序则和上一个例子所描述的过程一样。

__transaction
( __fork(THREAD1, __asyn(Action1))
, __concurrent
    ( __asyn(Action2)
    , __procedure(__asyn(Action3), __finally(__asyn(Action4))))
, __asyn(Action5)
, __recover(__asyn(Action6)))

7.3.3. __multi_thread 约束

  1. 创建线程时, Thread ID 一定要从 1 开始,并且连续(如果创建多个其它线程时);

  2. 主线程一旦结束,则整个 __multi_thread 都将结束;而运行结果则是主线程的执行结果;

  3. 其它线程不能 __join 主线程;

  4. 其它线程不能 __join() ,即 join all

  5. 其它线程进行 __join 时,不能得到保证;