// The App trait can be used to quickly turn objects into executable programs. // Here, object Main inherits the main method of App. // args returns the current command line arguments as an array. objectStatusDemoextendsApp{ // Console.println("Hello World: " + (args mkString ", ")) val system = ActorSystem("demo") val statusSwap = system.actorOf(Props(classOf[StatusSwap]), "statusSwap")
/** [INFO] [09/14/2017 00:20:28.573] [demo-akka.actor.default-dispatcher-3] [akka://demo/user/supervisor/childA] unknow [WARN] [09/14/2017 00:20:28.581] [demo-akka.actor.default-dispatcher-4] [akka://demo/user/supervisor/childA] null [ERROR] [09/14/2017 00:20:28.584] [demo-akka.actor.default-dispatcher-3] [akka://demo/user/supervisor/childA] null java.lang.NullPointerException at io.binglau.scala.akka.demo.SupervisorChild$$anonfun$receive$1.applyOrElse(Supervisor.scala:12) at akka.actor.Actor.aroundReceive(Actor.scala:513) at akka.actor.Actor.aroundReceive$(Actor.scala:511) at io.binglau.scala.akka.demo.SupervisorChild.aroundReceive(Supervisor.scala:9) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) at akka.actor.ActorCell.invoke(ActorCell.scala:496) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [INFO] [09/14/2017 00:20:29.572] [demo-akka.actor.default-dispatcher-4] [akka://demo/user/supervisor/childA] unknow [ERROR] [09/14/2017 00:20:29.573] [demo-akka.actor.default-dispatcher-3] [akka://demo/user/supervisor/childA] null java.lang.IllegalArgumentException at io.binglau.scala.akka.demo.SupervisorChild$$anonfun$receive$1.applyOrElse(Supervisor.scala:14) at akka.actor.Actor.aroundReceive(Actor.scala:513) at akka.actor.Actor.aroundReceive$(Actor.scala:511) at io.binglau.scala.akka.demo.SupervisorChild.aroundReceive(Supervisor.scala:9) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) at akka.actor.ActorCell.invoke(ActorCell.scala:496) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [INFO] [09/14/2017 00:20:29.579] [demo-akka.actor.default-dispatcher-3] [akka://demo/user/supervisor/childA] Message [java.lang.String] from Actor[akka://demo/user/supervisor#347975802] to Actor[akka://demo/user/supervisor/childA#794249311] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [09/14/2017 00:20:53.607] [Thread-0] [CoordinatedShutdown(akka://demo)] Starting coordinated shutdown from JVM shutdown hook **/
层级结构
当 Actor 系统被创建时,有几个 Actor 对象会随着它一起被创建。其中包括:
root 守护对象
user 守护对象(应用程序创建的 Actor 对象上方)
system 守护对象
1 2 3 4 5 6
// will look up this absolute path context.actorSelection("/user/serviceA/aggregator") // will look up sibling beneath same supervisor context.actorSelection("../joe") // remote context.actorSelection("akka.tcp://app@otherhost:1234/user/serviceB")
订阅发布
三个分类器:
LookupClassification:通过匹配指定的事件类型,支持简单的查询操作。
1 2 3 4
// must define a full order over the subscribers, expressed as expected from // `java.lang.Comparable.compare` overrideprotecteddefcompareSubscribers(a: Subscriber, b: Subscriber): Int = a.compareTo(b)
// is needed for determining matching classifiers and storing them in an // ordered collection overrideprotecteddefcompareClassifiers(a: Classifier, b: Classifier): Int = if (a < b) -1elseif (a == b) 0else1
// is needed for storing subscribers in an ordered collection overrideprotecteddefcompareSubscribers(a: Subscriber, b: Subscriber): Int = a.compareTo(b)
// determines whether a given classifier shall match a given event; it is invoked // for each subscription for all received events, hence the name of the classifier overrideprotecteddefmatches(classifier: Classifier, event: Event): Boolean = event.length <= classifier
objectSubchannelDemo{ defmain(args: Array[String]): Unit = { val system = ActorSystem("Test") val print = system.actorOf(Props(classOf[Print]), "print")
/** [INFO] [09/14/2017 23:42:47.268] [Test-akka.actor.default-dispatcher-5] [akka://Test/user/print] receive c [INFO] [09/14/2017 23:42:47.269] [Test-akka.actor.default-dispatcher-5] [akka://Test/user/print] receive d **/
Dispatchers
添加配置在 /src/main/resources 中添加一份 application.conf
An Akka MessageDispatcher is what makes Akka Actors “tick”, it is the engine of the machine so to speak. All MessageDispatcher implementations are also an ExecutionContext, which means that they can be used to execute arbitrary code, for instance Futures.
通过下面代码可以得到一个配置的 Dispatcher
1 2
// for use with Futures, Scheduler, etc. implicitval executionContext = system.dispatchers.lookup("my-dispatcher")
设置 Dispatcher
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
my-dispatcher{ # Dispatcher is the name of the event-based dispatcher type = Dispatcher # What kind of ExecutionService to use executor = "fork-join-executor" # Configuration for the fork join pool fork-join-executor{ # Min number of threads to cap factor-based parallelism number to parallelism-min = 2 # Parallelism (threads) ... ceil(available processors * factor) parallelism-factor = 2.0 # Max number of threads to cap factor-based parallelism number to parallelism-max = 10 } # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. # Set to 1 for as fair as possible. throughput = 100 }
配置Actor使用一个特定的disptacher:
1 2 3 4 5 6 7 8 9
import akka.actor.Props val myActor = context.actorOf(Props[MyActor], "myactor")
import akka.actor.Props val myActor = context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor1")
type
Dispatcher
This is an event-based dispatcher that binds a set of Actors to a thread pool. It is the default dispatcher used if one is not specified.
可共享性: 无限制
邮箱: 任何一种类型,为每一个Actor创建一个
使用场景: 默认派发器,Bulkheading
底层使用: java.util.concurrent.ExecutorService 可以指定“executor”使用“fork-join-executor”, “thread-pool-executor” 或者 the FQCN(类名的全称) of an akka.dispatcher.ExecutorServiceConfigurator
PinnedDispatcher
This dispatcher dedicates a unique thread for each actor using it; i.e. each actor will have its own thread pool with only one thread in the pool.
This dispatcher runs invocations on the current thread only. This dispatcher does not create any new threads, but it can be used from different threads concurrently for the same actor. See CallingThreadDispatcher for details and restrictions.
可共享性: 无限制
邮箱: 任何一种类型,每Actor每线程创建一个(需要时)
使用场景: 测试
底层使用: 调用的线程 (duh)
确保送达机制
at-least-once-delivery
To send messages with at-least-once delivery semantics to destinations you can mix-in AtLeastOnceDelivery trait to your PersistentActor on the sending side. It takes care of re-sending messages when they have not been confirmed within a configurable timeout.
结合持久化保证一定时间被确认(发送端)
The state of the sending actor, including which messages have been sent that have not been confirmed by the recipient must be persistent so that it can survive a crash of the sending actor or JVM. The AtLeastOnceDelivery trait does not persist anything by itself. It is your responsibility to persist the intent that a message is sent and that a confirmation has been received.
// 在任何情况下,从主题 / 队列持久性 Actor 对象接收消息的持久性视图 Actor 实例, // 都会将它的 persistenceId 作为相应主题 / 队列持久性 Actor 对象标识符 overridedefpersistenceId: String = "persistence-id"
// 通过 receiveCommand 来接受新消息 overridedefreceiveCommand: Receive = { // To send messages to the destination path, use the **deliver** method after // you have persisted the intent to send the message. case s: String => persist(MsgSent(s))(updateState) // The destination actor must send back a confirmation message. // When the sending actor receives this confirmation message you should // persist the fact that the message was delivered successfully // and then call the confirmDelivery method. caseConfirm(deliveryId) => persist(MsgConfirmed(deliveryId))(updateState) }
// 方式 // 1. 发送消息带上 server // 2. 初始化带上 server classClientextendsActor{ defreceive= { caseStartWith(server) => println("Client: is starting...") server ! Request("REQ-1") caseReply(what) => println("Client: received response: " + what) case _ => println("Client: received unexpected message") } }
/** * 一个 Actor 对象可以通过下列方式获得其他 Actor 对象的地址 * 1. 一个 Actor 对象创建了另一个 Actor 对象 * 2. 一个 Actor 对象收到消息,包含其他 Actor 对象地址 * 3. 有时 Actor 对象可以根据名称(selection)查询 Actor,但是这么做会带来不合适的定义和实现束缚 */ classServerextendsActor{ defreceive= { caseRequest(what) => println("Server: received request value: " + what) sender ! Reply("RESP-1 for " + what) case _ => println("Server: received unexpected message") } }
objectRequestReplyDemoextendsApp{ val system = ActorSystem("demo") val client = system.actorOf(Props[Client], "client") val server = system.actorOf(Props[Server], "server") client ! StartWith(server)
println("RequestReply: is completed.") system.terminate() }
classAuthenticator(nextFilter: ActorRef) extendsActor{ defreceive= { case message: ProcessIncomingOrder => val text = newString(message.orderInfo) println(s"Authenticator: processing $text") val orderText = text.replace("(certificate)", "") nextFilter ! ProcessIncomingOrder(orderText.toCharArray.map(_.toByte)) } }
classDecrypter(nextFilter: ActorRef) extendsActor{ defreceive= { case message: ProcessIncomingOrder => val text = newString(message.orderInfo) println(s"Decrypter: processing $text") val orderText = text.replace("(encryption)", "") nextFilter ! ProcessIncomingOrder(orderText.toCharArray.map(_.toByte)) } }
classDeduplicator(nextFilter: ActorRef) extendsActor{ val processedOrderIds = scala.collection.mutable.Set[String]()
deforderIdFrom(orderText: String): String = { val orderIdIndex = orderText.indexOf("id='") + 4 val orderIdLastIndex = orderText.indexOf("'", orderIdIndex) orderText.substring(orderIdIndex, orderIdLastIndex) }
defreceive= { case message: ProcessIncomingOrder => val text = newString(message.orderInfo) println(s"Deduplicator: processing $text") val orderId = orderIdFrom(text) if (processedOrderIds.add(orderId)) { nextFilter ! message } else { println(s"Deduplicator: found duplicate order $orderId") } } }
classOrderAcceptanceEndpoint(nextFilter: ActorRef) extendsActor{ defreceive= { case rawOrder: Array[Byte] => val text = newString(rawOrder) println(s"OrderAcceptanceEndpoint: processing $text") nextFilter ! ProcessIncomingOrder(rawOrder) } }
classOrderManagementSystemextendsActor{ defreceive= { case message: ProcessIncomingOrder => val text = newString(message.orderInfo) println(s"OrderManagementSystem: processing unique order: $text") } }
objectPipesAndFilterDemoextendsApp{ val system = ActorSystem("demo") val orderText = "(encryption)(certificate)<order id='123'>...</order>" val rawOrderBytes = orderText.toCharArray.map(_.toByte)
// 链式过滤器 val filter5 = system.actorOf(Props[OrderManagementSystem], "orderManagementSystem") val filter4 = system.actorOf(Props(classOf[Deduplicator], filter5), "deduplicator") val filter3 = system.actorOf(Props(classOf[Authenticator], filter4), "authenticator") val filter2 = system.actorOf(Props(classOf[Decrypter], filter3), "decrypter") val filter1 = system.actorOf(Props(classOf[OrderAcceptanceEndpoint], filter2), "orderAcceptanceEndpoint")
// 模拟各种原因导致的消息传输协议延迟 classPurchaseRouter(purchaseAgent: ActorRef) extendsActor{ val random = newRandom((newDate()).getTime)
defreceive= { case message: Any => val millis = random.nextInt(100) + 1 println(s"PurchaseRouter: delaying delivery of $message for $millis milliseconds") val duration = Duration.create(millis, TimeUnit.MILLISECONDS) context.system.scheduler.scheduleOnce(duration, purchaseAgent, message) } }
classPurchaseAgentextendsActor{ defreceive= { case placeOrder: PlaceOrder => if (placeOrder.isExpired) { context.system.deadLetters ! placeOrder println(s"PurchaseAgent: delivered expired $placeOrder to dead letters") } else { println(s"PurchaseAgent: placing order for $placeOrder") }
case message: Any => println(s"PurchaseAgent: received unexpected: $message") } }
objectMessageExpirationDemoextendsApp{ val system = ActorSystem("demo") val purchaseAgent = system.actorOf(Props[PurchaseAgent], "purchaseAgent") val purchaseRouter = system.actorOf(Props(classOf[PurchaseRouter], purchaseAgent), "purchaseRouter")