Demo

1
2
3
4
5
6
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer();
buf.writeBytes("test".getBytes());

byte[] readBytes = new byte[buf.readableBytes()];
buf.readBytes(readBytes);
System.out.println("read content: " + new String(readBytes));

直接看 PooledByteBufAllocator.newHeapBuffer(int initialCapacity, int maxCapacity)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<byte[]> heapArena = cache.heapArena;

final ByteBuf buf;
if (heapArena != null) {
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

return toLeakAwareBuffer(buf);
}
Read more »

要点

  1. UnpooledByteBufAllocator 的 Heap 与 Direct 的实现
  2. Heap 内部实现使用了 byte[]
  3. Direct 内部依托于 PlatformDependent0 的各种 native 方法
  4. toLeakAwareBuffer 部分主要讨论了 Netty 是如何应对内存泄露,以及检测内存泄露跟踪的四个级别是如何实现的
Read more »

要点:

  1. JDK 原生支持,主要用于 NIO
  2. 包含两个实现
    1. HeapByteBuffer: 基于 Java 堆实现
    2. DirectByteBuffer: 使用 unsafe 的 API 进行堆外操作
  3. 核心方法为 put(byte)get()。分别是往ByteBuffer里写一个字节,和读一个字节。
  4. 读写模式分离,正常的应用场景是:往ByteBuffer里写一些数据,然后 flip(),然后再读出来。
  5. 在 JDK11 中 MappedByteBuffer 的创建实际是 DirectByteBuffer
  6. DirectByteBuffer 的垃圾回收利用了幻象引用进行回收,详见下面的 Cleaner
Read more »

大多数东西都是根据之前学习中的印象而来,这篇文章更多的是喃喃自语吧。

协程

在 Python 3.5 之前,协程在我眼中应该就是 yield 这个语法的同义,通过 sendthrow 等方法来作为其交互,用于多进程中以提升效率,然而就 Python 的使用环境来说,其实接触到的机会并不是太多。

在 Python 3.5 之后,给了一种新的,现在看起来还是比较友好的选择

1
2
3
4
5
6
7
async def hello():
print("Hello world!")
# asyncio.sleep 也是一个 coroutine
# 异步调用asyncio.sleep(1):
# 线程也可以从这儿拿到返回值(asyncio.sleep(1) 返回值为 None)
r = await asyncio.sleep(1)
print("Hello again!")

I/O 密集的地方,填入 await ,在 def 前面加上 async,不需要去适应至今让我还是十分不适的 yield / yield from

Read more »

紧接 Kafka生产者分析——KafkaProducer

前文介绍过,KafkaProducer 可以有同步和异步两种方式发送消息,其实两者的底层实现相同,都是通过异步方式实现的。主线程调用 KafkaProducer#send() 方法发送消息的时候,先将消息放到 RecordAccumulator 中暂存,然后主线程就可以从 send() 方法中返回了,此时消息并没有真正地发送给 Kafka,而是缓存在了 RecordAccumulator 中。之后,业务线程通过 KafkaProducer#send() 方法不断向 RecordAccumulator 追加消息,当达到一定的条件,会唤醒 Sender 线程发送 RecordAccumulator 中的消息。

下面我们就来介绍 RecordAccumulator 的结构。首先需要注意的是,**RecordAccumulator 至少有一个业务线程和一个 Sender 线程并发操作,所以必须是线程安全的**。

Read more »

动态 AOP 使用示例

  1. 创建用于拦截的 bean

    1
    2
    3
    4
    5
    6
    7
    8
    @Data
    public class TestBean {
    private String testStr = "test";

    public void test() {
    System.out.println("test");
    }
    }
  2. 创建 Advisor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    @Aspect
    public class AspectJTest {
    @Pointcut("execution(* *.test(..))")
    public void test() {

    }

    @Before("test()")
    public void beforeTest() {
    System.out.println("beforeTest");
    }

    @After("test()")
    public void afterTest() {
    System.out.println("afterTest");
    }

    @Around("test()")
    public Object aroundTest(ProceedingJoinPoint p) {
    System.out.println("brfore around");
    Object o = null;
    try {
    o = p.proceed();
    } catch (Throwable e) {
    e.printStackTrace();
    }
    System.out.println("after around");
    return o;
    }
    }
  3. 创建配置文件

    1
    2
    3
    <aop:aspectj-autoproxy />
    <bean id="testBean" class="io.github.binglau.bean.TestBean" />
    <bean class="io.github.binglau.AspectJTest" />
  4. 测试

    1
    2
    3
    4
    5
    6
    @Test
    public void testAop() {
    ApplicationContext ctx = new ClassPathXmlApplicationContext("beanFactory.xml");
    TestBean bean = (TestBean) ctx.getBean("testBean");
    bean.test();
    }
  5. 不出意外结果

    1
    2
    3
    4
    5
    brfore around
    beforeTest
    test
    after around
    afterTest

可知 <aop:aspectj-autoproxy /> 是开启 aop 的关键,我们不妨由此入手。

Read more »

文章参考自《Apache Kafka 源码剖析》

github 地址:https://github.com/BingLau7/kafka,里面有代码的相关注释

具体部署方式不进行指导了,网上资料比较多

KafkaProducer Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class ProducerDemo {
public static void main(String[] args) {
boolean isAsync = args.length == 0 ||
/* 消息的发送方式:异步发送还是同步发送 */
!args[0].trim().equalsIgnoreCase("sync");

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
/* 客户端的 ID */
props.put("client.id", "DemoProducer");
/*
* 消息的 key 和 value 都是字节数组,为了将 Java 对象转化为字节数组,可以配置
* "key.serializer" 和 "value.serializer" 两个序列化器,完成转化
*/
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");

/* StringSerializer 用来将 String 对象序列化成字节数组 */
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

/* 生产者的核心类 */
KafkaProducer producer = new KafkaProducer<>(props);

/* 向指定的 test 这个 topic 发送消息 */
String topic = "test";

/* 消息的 key */
int messageNo = 1;

while (true) {
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();

if (isAsync) { /* 异步发送消息 */
/*
* 第一个参数是 ProducerRecord 类型的对象,封装了目标 Topic,消息的 kv
* 第二个参数是一个 CallBack 对象,当生产者接收到 Kafka 发来的 ACK 确认消息的时候,
* 会调用此 CallBack 对象的 onCompletion() 方法,实现回调功能
*/
producer.send(new ProducerRecord<>(topic, messageNo, messageStr),
new DemoCallBack(startTime, messageNo, messageStr));
} else { /* 同步发送消息 */
try {
/*
* KafkaProducer.send() 方法的返回值类型是 Future<RecordMetadata>
* 这里通过 Future.get 方法,阻塞当前线程,等待 Kafka 服务端的 ACK 响应
*/
producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get();
System.out.printf("Send message: (%d, %s)\n", messageNo, messageStr);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
/* 递增消息的 key */
++messageNo;
}
}
}

class DemoCallBack implements Callback {
/* 开始发送消息的时间戳 */
private final long startTime;
private final int key;
private final String message;

public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}

/**
* 生产者成功发送消息,收到 Kafka 服务端发来的 ACK 确认消息后,会调用此回调函数
* @param metadata 生产者发送的消息的元数据,如果发送过程中出现异常,此参数为 null
* @param exception 发送过程中出现的异常,如果发送成功为 null
*/
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.printf("message: (%d, %s) send to partition %d, offset: %d, in %d\n",
key, message, metadata.partition(), metadata.offset(), elapsedTime);
} else {
exception.printStackTrace();
}
}
}
Read more »

索引

映射

官方文档

简而言之,映射即为结构(虽然说 ElasticSearch 是一个无模式的搜索引擎)。

定义方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
curl -XPUT 'localhost:9200/my_index?pretty' -H 'Content-Type: application/json' -d'
{
"mappings": {
"doc": {
"properties": { // 字段定义
"title": { "type": "text" },
"name": { "type": "text" },
"age": { "type": "integer" },
"created": {
"type": "date", // 类型定义
"format": "strict_date_optional_time||epoch_millis"
}
}
}
}
}
'
Read more »

预热

所谓并发的时代,其实不仅仅提现在并发友好的库、框架、语言在兴起,而是这种并发的思想,早已融入到计算机最底层中了。

『让计算机并发执行若干个运算任务』与『更充分地利用计算机处理器的效能』之间的因果关系,看起来顺理成章,实际上它们之间的关系并没有想象中的那么简单,其中一个重要的复杂性来源是绝大多数的运算任务都不可能只靠处理器『计算』就能完成,处理器至少要与内存交互,如读取运算数据、存储运算结果等,这个I/O操作是很难消除的(无法仅靠寄存器来完成所有运算任务)。

由于计算机的存储设备与处理器的运算速度有几个数量级的差距,所以现代计算机系统都不得不加入一层读写速度尽可能接近处理器运算速度的高速缓存(Cache)来作为内存与处理器之间的缓冲:将运算需要使用到的数据复制到缓存中,让运算能快速进行,当运算结束后再从缓存同步回内存之中,这样处理器就无须等待缓慢的内存读写了。

嫌上面文字太长了,简单说来就是:计算机计算太快了,等不及慢的主存了,加了 L3、L2、L1 以及寄存器4个高速缓存就是为了让 IO 能快点。

由此产生出了一个问题

Read more »
0%