Skip to content
gqlxj1987's Blog
Go back

Akka in action

Edit page

原文链接

针对买票的一个并发问题

akka的模型:

  1. 在票务代理和打印室之间,事件和票只能以不变的消息的方式进行传递。
  2. 票务代理和打印室的请求已异步消息入队的方式进行,这样就不需要线程等待方法的执行和甚至接受。
  3. 票务代理和打印室可以持有彼此的引用,这个引用包含了彼此之间可以发送消息的地址。这些消息会被暂时存在一个信箱里,然后根据到底的顺序一个个执行。(别担心,等会我们会好好解释这个工具集是怎么实现这个的)

流式处理中,重要的是传递的不变性

像 Spring 之类的框架在容器/bean 层级处理,Akka 把可替换性引入运行时 (这就是 Actor“让它崩溃”的概念):我们预计到会出现单个处理者不能执行任务的情况, 我们准备着重新分配工作以避免灾难性的失败。

在运行时可替换

Actors 运行在称为**分派者(dispatcher)**的抽象层上。 分派者操心使用哪个线程模型,并处理信箱。 类似线程池的工作方式,只是基于极端重要的消息传递方案之上,线程池处理任务调度,Akka 的分派者/信箱会处理线程模型(TODO)和消息。

出现父节点的概念不只是为了整齐:Akka 中最重要的概念之一,监控,就是层级的功能:每个 actor 都自动成为子节点的监控者。 这意味着,当子 actor 崩溃,父节点来决定采取何种策略来纠正问题。 这使得问题可以顺着层级上升,上层的监控者可以处理这种更为全局的问题,这这是它关注的

actor的核心操作:

akka的tdd?

测试 Actors 比测试一般对象要难,因为:

akka的容错

既然我们不能阻止所有的故障的发生,我们就要采取一种策略,铭记以下:

  1. Things break.系统要容错,可恢复的故障不能导致灾难性的失败。
  2. 在一些情况下,尽量长的保持系统的主功能可用是可以接受的。同时失败的部分被停止并清理出系统,确保不会重启系统或产生不可预知的结果。
  3. 其他一些情况下,有些组件很重要,就需要有备份(active backups),当主组件出问题时,可以快速的替代主组件。
  4. 在系统的某些部分发生故障时,不应该使整个系统崩溃,所以我们需要一种方法来隔离特定的故障,让我们可以稍后处理。

我们已经看了一些actor的功能,这能够帮我们简化问题。 actor可被props对象建立和重建,这是actor系统的一部分,而且actor之间的通信是通过actor的引用而不是直接通信。

akka没有用一个流程来处理正常代码和故障恢复代码,而是用两个不同的流程; 一个用于正常逻辑,一个用于故障恢复逻辑。 正常流程用于处理正常的消息,故障恢复流程包含一个actor用来检测正常流程的actor。 这个检测actor被称作supervisor

akka让父actor作为supervisor,只要这个actor创建了其他actor,它就自动变为supervisor。 supervisor不捕获异常,而是监督失败的原因并给出一些策略。 supervisor也不去尝试恢复actor或者actor的状态。 它提供如何恢复的判断,然后触发相应的策略。

db writer actor

每个actor都经过这个生命周期。 启动,可能重启几次,直到停止或者终止。 preStart 、preRestart 、postRestart 和 postStop 用来初始化、清理状态或者崩溃后控制、恢复其状态。

ActorContext提供了watch方法来监控actor和unwatch方法来取消监控。 一旦一个actor调用watch方法来监控另外一个actor的引用,它就成为被监控actor的monitor。 当被监控的actor终止时,会给monitor发送终止消息。 终止消息只包含被终止actor的引用。

monitor的好处

monitor不一定非要是supervisor,可以是任意一个actor。 只要这个actor有被监控actor的引用,就可以通过context.watch(actorRef)实现对actorRef的监控。 当被监控actor终止时,monitor就可以收到终止消息。

在这个系统中,actor之间通过actorRef连接。 每个actor知道它要发送消息的actor的actorRef。

以actor为核心

“让它崩溃去吧”的理念不是忽略可能发生的故障,或者是提供个组件解决所有错误,实际上是: 程序员需要预先知道那些故障是可以恢复的,提供工具只是为了让这些处理起来简单,不用老开一些处理错误的会议或者编写大量代码。

Futures

futures 是最好的工具。一般来说,那些用例有一个或多个如下的特点:

关于route部分?

futures部分,提供onComplete方法来处理success和failure

scala> :paste
// Entering paste mode (ctrl-D to finish)
import scala.util._
import scala.util.control.NonFatal
import scala.concurrent._
import ExecutionContext.Implicits.global
val futureFail = Future { throw new Exception("error!")}
futureFail.onComplete {
  case Success(value) => println(value)
  case Failure(NonFatal(e)) => println(e)
}
// Exiting paste mode, now interpreting.
java.lang.Exception: error!

futures的组合部分

def getWeather(ticketInfo:TicketInfo):Future[TicketInfo] = {
  val futureWeatherX = callWeatherXService(ticketInfo)
                        .recover(withNone)
  val futureWeatherY = callWeatherYService(ticketInfo)
                        .recover(withNone)
  val futures = Seq(futureWeatherX, futureWeatherY)
  val fastestResponse = Future.firstCompletedOf(futures)
  fastestResponse.map{ weatherResponse =>
    ticketInfo.copy(weather = weatherResponse)
  }
}

Akka构建本地以及分布式系统

分布式系统遇到的问题:

scala> :paste
// Entering paste mode (ctrl-D to finish)

val path = "akka.tcp://backend@0.0.0.0:2551/user/simple"
val simple = frontend.actorSelection(path)
// Exiting paste mode, now interpreting.

path: String = akka.tcp://backend@0.0.0.0:2551/user/simple
simple: akka.actor.ActorSelection = ActorSelection[Actor[akka.tcp://backend@0.0.0.0:2551/]/user/simple]

远端的actor路径


Edit page
Share this post on:

Previous Post
阿里中台架构实践
Next Post
Json Style