并行还是并发?
并发:同一时间应对多件事情的能力
并行:同一时间动手做多件事情的能力
并发:两队列人享有一台咖啡机,每列都能获得咖啡
并行:一队列人享有独有的咖啡机,互不干扰
而获取咖啡从局部来看又是串行,每个人都需要等前面人使用完成后才能使用(并发中也可能需要不定等别队列的人来获取)
Java 的线程与锁模型
线程:Java 并发中的基本单元,可以将一个线程看做一个控制流,线程之间通过共享内存进行通信。
使用锁来达到在使用共享内存时,线程之间的使用形成互斥的目的。
简单的哲学家问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| package io.github.binglau.concurrency;
import io.github.binglau.bean.Chopstick;
import java.util.Random;
public class Philosopher implements Runnable { private Chopstick left, right; private Random random;
public Philosopher(Chopstick left, Chopstick right) { this.left = left; this.right = right; random = new Random(); }
@Override public void run() { try { while (true) { Thread.sleep(random.nextInt(100) * 10); synchronized (left) { synchronized (right) { System.out.println(Thread.currentThread().getName() + "-ok"); Thread.sleep(random.nextInt(100) * 10); } } } } catch (InterruptedException e) { e.printStackTrace(); } }
public static void main(String[] args) { Chopstick c1 = new Chopstick(); Chopstick c2 = new Chopstick(); Chopstick c3 = new Chopstick(); Chopstick c4 = new Chopstick(); new Thread(new Philosopher(c1, c2)).start(); new Thread(new Philosopher(c2, c3)).start(); new Thread(new Philosopher(c3, c4)).start(); new Thread(new Philosopher(c4, c1)).start(); } }
|
线程与所模型带来的三个主要危害:
- 静态条件
- 死锁
- 内存可见性(Java 内存模型定义了何时一个线程对内存的修改对另一个线程可见,这样两个线程都需要进行同步的情况下,线程获得的一个值可能已经是一个失效的值)
优势:
缺点:
『天然』的并发,函数式编程
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| (defn make-heavy [f] (fn [& args] (Thread/sleep 1000) (apply f args)))
(time (doall (map (make-heavy inc) [1 2 3 4 5]))) (time (doall (pmap (make-heavy inc) [1 2 3 4 5])))
|
容易推理,便于测试,消除并发与串行的区别
通信顺序进程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| package concurrency
import "fmt"
type Job struct { Id int Name string }
func Demo() { jobList := make([]Job, 10) jobs := make(chan Job) done := make(chan bool, len(jobList))
go func() { for _, job := range jobList { jobs <- job } close(jobs) }()
go func() { for job := range jobs { job.Id = 1 job.Name = "Name" fmt.Println(job) done <- true } }()
for i := 0; i < len(jobList); i++ { <- done }
}
|
Actor
Actor 是一种计算实体,它会对收到的消息做出回应,并且可以做下列事情:
- 向其他 Actor 对象发送一定数量的消息
- 创建一定数量的新 Actor 对象
- 设定对下一条消息做出的回应方式
Celery
以前的我,使用过一种类似于 Actor 发送消息的并发模式,使用消息队列来做并发,最典型的莫过于使用 rabbitmq
的 celery
一般是这样的,写一个 worker
在一个进程跑着,通过 celery
来调用 worker
Celery is used in production systems to process millions of tasks a day.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| from celery_demo.worker import demo_worker
from celery import Celery
RABBITMQ_DSN = 'amqp://guest@127.0.0.1'
app = Celery('worker', broker=RABBITMQ_DSN, include=['celery_demo.worker.demo_worker'])
import time import log
from celery_demo.worker.celery_app import app
@app.task def add(x, y): time.sleep(3) result = x + y log.app.info(f'result: {result}') return result
from celery_demo.worker import demo_worker import log
if __name__ == '__main__': demo_worker.add.delay(1, 2) log.app.info('end')
[INFO app 2017-09-09 11:53:28,992] end [INFO app 2017-09-09 11:53:31,997] result: 3
|
Akka
Akka is a toolkit for building highly concurrent, distributed, and resilient message-driven applications for Java and Scala
- Simpler Concurrent & Distributed Systems
- Resilient by Design
- High Performance
- Elastic & Decentralized
- Reactive Streaming Data
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package io.binglau.scala.akka.demo
import akka.actor.{Actor, ActorSystem, Props} import akka.event.Logging
class ActorPrint extends Actor { val log = Logging(context.system, this)
override def receive: Receive = { case msg : String => log.info("receive {}", msg) case _ => log.info("receive unknow message") } }
object ActorDemo { def main(args: Array[String]): Unit = { val system = ActorSystem("demo") val printActor = system.actorOf(Props[ActorPrint], "print")
printActor ! "Test" printActor ! 123
system.terminate() } }
|
Actor 系统和 Actor 对象具有的特点:
直接通过异步消息传递方式进行通信
同一个 Actor 对象发送保证顺序,但是不同的 Actor 对象之间的发送不能保证顺序
状态机
当 Actor 对象转换为某个预设状态时,就能够改变对未来接收到的消息的处理模式。通过变为另一种消息处理器,Actor对象就成了一种有限状态机。
无共享
一个 Actor 对象不会与其他 Actor 对象或相关组件共享可变状态
无锁的并发处理方式
因为 Actor 对象不会共享它们的可变状态,而且它们在同一时刻仅会接收一条消息,所以在对消息做出回应前,Actor 对象永远都不需要尝试锁定它们的状态。
并行性
当等级较高的 Actor 对象能够将多个任务分派给多个下级 Actor 对象,或者任务重含有复杂的处理层级时,就适合通过 Actor 模型使用并行处理方式。
Actor 对象的系统性
Actor 对象的量级都非常轻,因此在单个系统中添加许多 Actor 对象是受推荐的处理方式。任何问题都可以通过添加 Actor 对象来解决。
参考书籍
Clojure并发
Akka 官方文档
《七周七并发模式》
《响应式架构——消息模式 Actor 实现与 Scala、Akka 应用集成》
《Go 语言程序设计》