并发的姿势

并行还是并发?

并发:同一时间应对多件事情的能力

并行:同一时间动手做多件事情的能力

并发与并行

并发:两队列人享有一台咖啡机,每列都能获得咖啡

并行:一队列人享有独有的咖啡机,互不干扰

而获取咖啡从局部来看又是串行,每个人都需要等前面人使用完成后才能使用(并发中也可能需要不定等别队列的人来获取)

Java 的线程与锁模型

线程:Java 并发中的基本单元,可以将一个线程看做一个控制流,线程之间通过共享内存进行通信。

使用锁来达到在使用共享内存时,线程之间的使用形成互斥的目的。

简单的哲学家问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package io.github.binglau.concurrency;

import io.github.binglau.bean.Chopstick;

import java.util.Random;

/**
* 文件描述: 哲学家问题
*/

public class Philosopher implements Runnable {
private Chopstick left, right;
private Random random;

public Philosopher(Chopstick left, Chopstick right) {
this.left = left;
this.right = right;
random = new Random();
}

@Override
public void run() {
// 死锁:当所有人都持有一只筷子并等待另一个放开一只筷子
try {
while (true) {
Thread.sleep(random.nextInt(100) * 10); // 思考时间
// 获取对象锁
synchronized (left) { // 拿起筷子 1
synchronized (right) { // 拿起筷子 2
System.out.println(Thread.currentThread().getName() + "-ok");
Thread.sleep(random.nextInt(100) * 10); // 进餐时间
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
Chopstick c1 = new Chopstick();
Chopstick c2 = new Chopstick();
Chopstick c3 = new Chopstick();
Chopstick c4 = new Chopstick();
new Thread(new Philosopher(c1, c2)).start();
new Thread(new Philosopher(c2, c3)).start();
new Thread(new Philosopher(c3, c4)).start();
new Thread(new Philosopher(c4, c1)).start();
}
}

线程与所模型带来的三个主要危害:

  • 静态条件
  • 死锁
  • 内存可见性(Java 内存模型定义了何时一个线程对内存的修改对另一个线程可见,这样两个线程都需要进行同步的情况下,线程获得的一个值可能已经是一个失效的值)

优势:

  • 普遍,更接近本质,能解决几乎所有粒度的所有问题

缺点:

  • 难以维护
  • 难以测试(共享内存决定的不确定性)

『天然』的并发,函数式编程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
;用于延时执行某个函数
(defn make-heavy [f]
(fn [& args]
(Thread/sleep 1000)
(apply f args)))

(time (doall (map (make-heavy inc) [1 2 3 4 5])))
(time (doall (pmap (make-heavy inc) [1 2 3 4 5])))

;"Elapsed time: 5012.47372 msecs"
;"Elapsed time: 1012.144992 msecs"

;pmap并行地将make-heavy包装后的inc作用在集合的5个元素上,总耗时就接近于于单个调用的耗时,也就是一秒。
;pmap的并行,从实现上来说,是集合有多少个元素就使用多少个线程

容易推理,便于测试,消除并发与串行的区别

通信顺序进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package concurrency

import "fmt"

/**
* 文件描述:
*/

// goroutine 使用时候的一些问题
// 1. 主 goroutine 结束后 子 goroutine 也会结束
// 2. 容易发生死锁,就是即使所有工作已经完成了但
// 主 goroutine 无法获得工作 goroutine 的 完成状态。
// 死锁的另一个情况就是,当两个不同的 goroutine (或者线程)
// 都锁定了受保护的资源而且同时尝试去获取对方资源的时候
//
// 解决办法:
// 1. 下面所示主 goroutine 在一个 done 通道上等待
// 2. 使用 sync.WaitGroup 来让每个工作 goroutine 报告自己
// 的完成状态。但是,使用 sync.WaitGroup 本身也会产生死锁,
// 特别是当所有工作 goroutine 都处于锁定状态的时候(等待接收
// 通道的数据)调用 sync.WaitGroup.Wait()

// 通道为并发运行的 goroutine 之间提供了一种无锁的通信方式(尽管
// 内部实现可能使用了锁,但我们无需关系)。当一个通道发生通信时,发
// 送通道和接收通道(包括它们对应的 goroutine)都处于同步状态

// Go 语言并不保证在通道里发送指针或者引用类型(如切片或映射)的安全性,
// 因为指针指向的内容或者所引用的值可能在对方接收到时已被发送方修改。
// 所有,当涉及到指针和引用时,我们必须保证这些值在任何时候只能被一
// 个 goroutine 访问得到,也就是说,对这些值的访问必须是串行进行的。
// 除非文档特别声明传递这个指针是安全的。

type Job struct {
Id int
Name string
}

// 使用并发最简单的方式就是用一个 goroutine 来准备工作
// 然后让另一个 goroutine 来执行处理,让主 goroutine 和
// 一些通道来安排一切事情
func Demo() {
jobList := make([]Job, 10) // 任务列表
jobs := make(chan Job)
done := make(chan bool, len(jobList))

go func() { // 创建 goroutine
for _, job := range jobList { // 遍历 jobList 然后将每个工作发送到 jobs 通道
jobs <- job // 通道没有缓冲,所以马上阻塞,等待别的 goroutine 接收
}
close(jobs) // 发完任务之后关闭通道
}()

go func() {
for job := range jobs { // 接收上面的 goroutine 传来的 job (从 jobs 通道)
job.Id = 1
job.Name = "Name"
fmt.Println(job)
done <- true // 发送完成表示等待主 goroutine 接收
}
}()

for i := 0; i < len(jobList); i++ {
<- done // 阻塞,等待接收
}

// 对于通道的使用,我们有两个经验。
// 一. 我们只有在后面要检查通道是否关闭(例如在一个 for ... range 循环
// 里,或者 select, 或者使用 <- 操作符来检查是否可以接收等)的时候才需要
// 显式地关闭通道;
// 二. 应该由发送端的 goroutine 关闭通道,而不是由接收端的 goroutine 来完成
}

Actor

Actor 是一种计算实体,它会对收到的消息做出回应,并且可以做下列事情:

  • 向其他 Actor 对象发送一定数量的消息
  • 创建一定数量的新 Actor 对象
  • 设定对下一条消息做出的回应方式
Celery

以前的我,使用过一种类似于 Actor 发送消息的并发模式,使用消息队列来做并发,最典型的莫过于使用 rabbitmqcelery

一般是这样的,写一个 worker 在一个进程跑着,通过 celery 来调用 worker

Celery is used in production systems to process millions of tasks a day.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# __init__.py
from celery_demo.worker import demo_worker

# celery_app.py
# celery -A celery_demo.worker.celery_app worker -l info
from celery import Celery

RABBITMQ_DSN = 'amqp://guest@127.0.0.1'

app = Celery('worker',
broker=RABBITMQ_DSN,
include=['celery_demo.worker.demo_worker'])

# demo_worker.py
import time
import log

from celery_demo.worker.celery_app import app


@app.task
def add(x, y):
time.sleep(3)
result = x + y
log.app.info(f'result: {result}')
return result

# demo.py

from celery_demo.worker import demo_worker
import log

if __name__ == '__main__':
demo_worker.add.delay(1, 2)
log.app.info('end')

# log
[INFO app 2017-09-09 11:53:28,992] end
[INFO app 2017-09-09 11:53:31,997] result: 3
Akka

Akka is a toolkit for building highly concurrent, distributed, and resilient message-driven applications for Java and Scala

  • Simpler Concurrent & Distributed Systems
  • Resilient by Design
  • High Performance
  • Elastic & Decentralized
  • Reactive Streaming Data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package io.binglau.scala.akka.demo

import akka.actor.{Actor, ActorSystem, Props}
import akka.event.Logging

class ActorPrint extends Actor {
val log = Logging(context.system, this)

override def receive: Receive = {
case msg : String => log.info("receive {}", msg)
case _ => log.info("receive unknow message")
}
}

object ActorDemo {
def main(args: Array[String]): Unit = {
val system = ActorSystem("demo")
val printActor = system.actorOf(Props[ActorPrint], "print")

printActor ! "Test"
printActor ! 123

system.terminate()
}
}
Actor 系统和 Actor 对象具有的特点:
  • 直接通过异步消息传递方式进行通信

    同一个 Actor 对象发送保证顺序,但是不同的 Actor 对象之间的发送不能保证顺序

  • 状态机

    当 Actor 对象转换为某个预设状态时,就能够改变对未来接收到的消息的处理模式。通过变为另一种消息处理器,Actor对象就成了一种有限状态机。

  • 无共享

    一个 Actor 对象不会与其他 Actor 对象或相关组件共享可变状态

  • 无锁的并发处理方式

    因为 Actor 对象不会共享它们的可变状态,而且它们在同一时刻仅会接收一条消息,所以在对消息做出回应前,Actor 对象永远都不需要尝试锁定它们的状态。

  • 并行性

    当等级较高的 Actor 对象能够将多个任务分派给多个下级 Actor 对象,或者任务重含有复杂的处理层级时,就适合通过 Actor 模型使用并行处理方式。

  • Actor 对象的系统性

    Actor 对象的量级都非常轻,因此在单个系统中添加许多 Actor 对象是受推荐的处理方式。任何问题都可以通过添加 Actor 对象来解决。

参考书籍

Clojure并发
Akka 官方文档
《七周七并发模式》
《响应式架构——消息模式 Actor 实现与 Scala、Akka 应用集成》
《Go 语言程序设计》