DevilKing's blog

冷灯看剑,剑上几分功名?炉香无需计苍生,纵一穿烟逝,万丈云埋,孤阳还照古陵

0%

Akka Persistence

原文链接

persistence actor

1
2
3
4
5
def persistenceId = "example" //作为持久化Actor的唯一表示,用于持久化或者查询时使用

def receiveCommand: Receive = ??? //Actor正常运行时处理处理消息逻辑,可在这部分内容里持久化自己想要的消息

def receiveRecover: Receive = ??? //Actor重启恢复是执行的逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
case class LuckyEvent(  //抽奖成功事件
userId: Long,
luckyMoney: Int
)
case class FailureEvent( //抽奖失败事件
userId: Long,
reason: String
)
case class Lottery(
totalAmount: Int, //红包总金额
remainAmount: Int //剩余红包金额
) {
def update(luckyMoney: Int) = {
copy(
remainAmount = remainAmount - luckyMoney
)
}
}
class LotteryActor(initState: Lottery) extends PersistentActor with ActorLogging{
override def persistenceId: String = "lottery-actor-1"

var state = initState //初始化Actor的状态

override def receiveRecover: Receive = {
case event: LuckyEvent =>
updateState(event) //恢复Actor时根据持久化的事件恢复Actor状态
case SnapshotOffer(_, snapshot: Lottery) =>
log.info(s"Recover actor state from snapshot and the snapshot is ${snapshot}")
state = snapshot //利用快照恢复Actor的状态
case RecoveryCompleted => log.info("the actor recover completed")
}

def updateState(le: LuckyEvent) =
state = state.update(le.luckyMoney) //更新自身状态

override def receiveCommand: Receive = {
case lc: LotteryCmd =>
doLottery(lc) match { //进行抽奖,并得到抽奖结果,根据结果做出不同的处理
case le: LuckyEvent => //抽到随机红包
persist(le) { event =>
updateState(event)
increaseEvtCountAndSnapshot()
sender() ! event
}
case fe: FailureEvent => //红包已经抽完
sender() ! fe
}
case "saveSnapshot" => // 接收存储快照命令执行存储快照操作
saveSnapshot(state)
case SaveSnapshotSuccess(metadata) => ??? //你可以在快照存储成功后做一些操作,比如删除之前的快照等
}

private def increaseEvtCountAndSnapshot() = {
val snapShotInterval = 5
if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0) { //当有持久化5个事件后我们便存储一次当前Actor状态的快照
self ! "saveSnapshot"
}
}

def doLottery(lc: LotteryCmd) = { //抽奖逻辑具体实现
if (state.remainAmount > 0) {
val luckyMoney = scala.util.Random.nextInt(state.remainAmount) + 1
LuckyEvent(lc.userId, luckyMoney)
}
else {
FailureEvent(lc.userId, "下次早点来,红包已被抽完咯!")
}
}
}

actor里面,线程安全,关于数字的加减问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
object PersistenceTest extends App {
val lottery = Lottery(10000,10000)
val system = ActorSystem("example-05")
val lotteryActor = system.actorOf(Props(new LotteryActor(lottery)), "LotteryActor-1") //创建抽奖Actor
val pool: ExecutorService = Executors.newFixedThreadPool(10)
val r = (1 to 100).map(i =>
new LotteryRun(lotteryActor, LotteryCmd(i.toLong,"godpan","xx@gmail.com")) //创建100个抽奖请求
)
r.map(pool.execute(_)) //使用线程池来发起抽奖请求,模拟同时多人参加
Thread.sleep(5000)
pool.shutdown()
system.terminate()
}

class LotteryRun(lotteryActor: ActorRef, lotteryCmd: LotteryCmd) extends Runnable { //抽奖请求
implicit val timeout = Timeout(3.seconds)
def run: Unit = {
for {
fut <- lotteryActor ? lotteryCmd
} yield fut match { //根据不同事件显示不同的抽奖结果
case le: LuckyEvent => println(s"恭喜用户${le.userId}抽到了${le.luckyMoney}元红包")
case fe: FailureEvent => println(fe.reason)
case _ => println("系统错误,请重新抽取")
}
}
}
1
2
3
4
5
6
persistAll(success.keys.toIndexedSeq) {  //批量持久化中奖用户事件
case event => println(event)
updateState(event)
increaseEvtCountAndSnapshot()
success(event) ! event
}

实现persitAll方法