DevilKing's blog

冷灯看剑,剑上几分功名?炉香无需计苍生,纵一穿烟逝,万丈云埋,孤阳还照古陵

0%

Akka in action

原文链接

针对买票的一个并发问题

akka的模型:

  1. 在票务代理和打印室之间,事件和票只能以不变的消息的方式进行传递。
  2. 票务代理和打印室的请求已异步消息入队的方式进行,这样就不需要线程等待方法的执行和甚至接受。
  3. 票务代理和打印室可以持有彼此的引用,这个引用包含了彼此之间可以发送消息的地址。这些消息会被暂时存在一个信箱里,然后根据到底的顺序一个个执行。(别担心,等会我们会好好解释这个工具集是怎么实现这个的)

流式处理中,重要的是传递的不变性

像 Spring 之类的框架在容器/bean 层级处理,Akka 把可替换性引入运行时 (这就是 Actor“让它崩溃”的概念):我们预计到会出现单个处理者不能执行任务的情况, 我们准备着重新分配工作以避免灾难性的失败。

在运行时可替换

Actors 运行在称为**分派者(dispatcher)**的抽象层上。 分派者操心使用哪个线程模型,并处理信箱。 类似线程池的工作方式,只是基于极端重要的消息传递方案之上,线程池处理任务调度,Akka 的分派者/信箱会处理线程模型(TODO)和消息。

出现父节点的概念不只是为了整齐:Akka 中最重要的概念之一,监控,就是层级的功能:每个 actor 都自动成为子节点的监控者。 这意味着,当子 actor 崩溃,父节点来决定采取何种策略来纠正问题。 这使得问题可以顺着层级上升,上层的监控者可以处理这种更为全局的问题,这这是它关注的

actor的核心操作:

  • 创建
  • 发送
  • 变化
  • 监控

akka的tdd?

测试 Actors 比测试一般对象要难,因为:

  • 时机 - 消息发送是异步的,因此单元测试中难于知道何时断言期望值,
  • 异步性 - Actors 用来在多个线程中并行运行。 多线程测试比单线程测试困难,需要并发元语来同步各种 actor 的结果,如 lock, latch, 和 barriers 等。我们极力想避免的正是这些东西。一个 barrier 用不对就 可能中止了整个测试套件的执行。
  • 无状态 - actor 掩盖了内部状态,不允许外部访问。访问只能通过 ActorRef。 对 actor 调用方法,检查其状态,都被特意阻止了,这些都是你想在单元测试时做的。
  • 合作/集成 - 如果你想集成测试很多 actors, 你可能想窃听 actors 之间的消息来 判断是否是预计值。怎样做并不能立即想到。

akka的容错

既然我们不能阻止所有的故障的发生,我们就要采取一种策略,铭记以下:

  1. Things break.系统要容错,可恢复的故障不能导致灾难性的失败。
  2. 在一些情况下,尽量长的保持系统的主功能可用是可以接受的。同时失败的部分被停止并清理出系统,确保不会重启系统或产生不可预知的结果。
  3. 其他一些情况下,有些组件很重要,就需要有备份(active backups),当主组件出问题时,可以快速的替代主组件。
  4. 在系统的某些部分发生故障时,不应该使整个系统崩溃,所以我们需要一种方法来隔离特定的故障,让我们可以稍后处理。

我们已经看了一些actor的功能,这能够帮我们简化问题。 actor可被props对象建立和重建,这是actor系统的一部分,而且actor之间的通信是通过actor的引用而不是直接通信。

akka没有用一个流程来处理正常代码和故障恢复代码,而是用两个不同的流程; 一个用于正常逻辑,一个用于故障恢复逻辑。 正常流程用于处理正常的消息,故障恢复流程包含一个actor用来检测正常流程的actor。 这个检测actor被称作supervisor

akka让父actor作为supervisor,只要这个actor创建了其他actor,它就自动变为supervisor。 supervisor不捕获异常,而是监督失败的原因并给出一些策略。 supervisor也不去尝试恢复actor或者actor的状态。 它提供如何恢复的判断,然后触发相应的策略。

db writer actor

每个actor都经过这个生命周期。 启动,可能重启几次,直到停止或者终止。 preStart 、preRestart 、postRestart 和 postStop 用来初始化、清理状态或者崩溃后控制、恢复其状态。

ActorContext提供了watch方法来监控actor和unwatch方法来取消监控。 一旦一个actor调用watch方法来监控另外一个actor的引用,它就成为被监控actor的monitor。 当被监控的actor终止时,会给monitor发送终止消息。 终止消息只包含被终止actor的引用。

monitor的好处

monitor不一定非要是supervisor,可以是任意一个actor。 只要这个actor有被监控actor的引用,就可以通过context.watch(actorRef)实现对actorRef的监控。 当被监控actor终止时,monitor就可以收到终止消息。

在这个系统中,actor之间通过actorRef连接。 每个actor知道它要发送消息的actor的actorRef。

以actor为核心

“让它崩溃去吧”的理念不是忽略可能发生的故障,或者是提供个组件解决所有错误,实际上是: 程序员需要预先知道那些故障是可以恢复的,提供工具只是为了让这些处理起来简单,不用老开一些处理错误的会议或者编写大量代码。

Futures

futures 是最好的工具。一般来说,那些用例有一个或多个如下的特点:

  • 你不想阻塞(等在当前线程上)来处理函数结果。
  • 一次性地调用函数,在之后的某个时间点处理结果。
  • 结合多个一次性函数,并结合结果。
  • 调用多个竞争的函数,只使用部分结果,如只用最快的响应。
  • 调用函数,当函数抛出异常,返回默认的结果,让流可以继续。
  • 流水线化这类函数,其中一个函数依赖于一个或多个其他函数的结果。

关于route部分?

futures部分,提供onComplete方法来处理success和failure

1
2
3
4
5
6
7
8
9
10
11
12
13
scala> :paste
// Entering paste mode (ctrl-D to finish)
import scala.util._
import scala.util.control.NonFatal
import scala.concurrent._
import ExecutionContext.Implicits.global
val futureFail = Future { throw new Exception("error!")}
futureFail.onComplete {
case Success(value) => println(value)
case Failure(NonFatal(e)) => println(e)
}
// Exiting paste mode, now interpreting.
java.lang.Exception: error!

futures的组合部分

1
2
3
4
5
6
7
8
9
10
11
def getWeather(ticketInfo:TicketInfo):Future[TicketInfo] = {
val futureWeatherX = callWeatherXService(ticketInfo)
.recover(withNone)
val futureWeatherY = callWeatherYService(ticketInfo)
.recover(withNone)
val futures = Seq(futureWeatherX, futureWeatherY)
val fastestResponse = Future.firstCompletedOf(futures)
fastestResponse.map{ weatherResponse =>
ticketInfo.copy(weather = weatherResponse)
}
}

Akka构建本地以及分布式系统

分布式系统遇到的问题:

  • 延迟:在合作者之间有网络意味着a,每个消息有延时,和b(TODO:不知所云)。由于拥塞导致的延时,重发的包,断断续续的连接,等等。
  • 部分失败:当系统中的某一部分总是出现、消失、重现时,知道分布式系统中所有部分是够工作是非常难于解决的问题。
  • 内存访问:本地系统中获取一个内存中对象的引用不会时不时失败,但是在分布式系统中的确会发生。
  • 并发:没有什么拥有一切东西,上述因素意味着交错操作的计划可能会出错。
1
2
3
4
5
6
7
8
9
scala> :paste
// Entering paste mode (ctrl-D to finish)

val path = "akka.tcp://backend@0.0.0.0:2551/user/simple"
val simple = frontend.actorSelection(path)
// Exiting paste mode, now interpreting.

path: String = akka.tcp://backend@0.0.0.0:2551/user/simple
simple: akka.actor.ActorSelection = ActorSelection[Actor[akka.tcp://backend@0.0.0.0:2551/]/user/simple]

远端的actor路径