借助 Netty 官方 Echo 实例: Echo Demo

问题

  1. 如何与底层交互
  2. Bootstrap 与 ServerBootstreap 有什么异同
  3. NioSocketChannel 与 NioServerSocketChannel 有什么异同
  4. 为什么说 netty 是事件驱动,其事件是如何传播的
  5. 为什么 ServerBootstrap 需要两个 EventLoopGroup,分别有什么用
Read more »

Demo

1
2
3
4
5
6
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer();
buf.writeBytes("test".getBytes());

byte[] readBytes = new byte[buf.readableBytes()];
buf.readBytes(readBytes);
System.out.println("read content: " + new String(readBytes));

直接看 PooledByteBufAllocator.newHeapBuffer(int initialCapacity, int maxCapacity)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<byte[]> heapArena = cache.heapArena;

final ByteBuf buf;
if (heapArena != null) {
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

return toLeakAwareBuffer(buf);
}
Read more »

要点

  1. UnpooledByteBufAllocator 的 Heap 与 Direct 的实现
  2. Heap 内部实现使用了 byte[]
  3. Direct 内部依托于 PlatformDependent0 的各种 native 方法
  4. toLeakAwareBuffer 部分主要讨论了 Netty 是如何应对内存泄露,以及检测内存泄露跟踪的四个级别是如何实现的
Read more »

要点:

  1. JDK 原生支持,主要用于 NIO
  2. 包含两个实现
    1. HeapByteBuffer: 基于 Java 堆实现
    2. DirectByteBuffer: 使用 unsafe 的 API 进行堆外操作
  3. 核心方法为 put(byte)get()。分别是往ByteBuffer里写一个字节,和读一个字节。
  4. 读写模式分离,正常的应用场景是:往ByteBuffer里写一些数据,然后 flip(),然后再读出来。
  5. 在 JDK11 中 MappedByteBuffer 的创建实际是 DirectByteBuffer
  6. DirectByteBuffer 的垃圾回收利用了幻象引用进行回收,详见下面的 Cleaner
Read more »

大多数东西都是根据之前学习中的印象而来,这篇文章更多的是喃喃自语吧。

协程

在 Python 3.5 之前,协程在我眼中应该就是 yield 这个语法的同义,通过 sendthrow 等方法来作为其交互,用于多进程中以提升效率,然而就 Python 的使用环境来说,其实接触到的机会并不是太多。

在 Python 3.5 之后,给了一种新的,现在看起来还是比较友好的选择

1
2
3
4
5
6
7
async def hello():
print("Hello world!")
# asyncio.sleep 也是一个 coroutine
# 异步调用asyncio.sleep(1):
# 线程也可以从这儿拿到返回值(asyncio.sleep(1) 返回值为 None)
r = await asyncio.sleep(1)
print("Hello again!")

I/O 密集的地方,填入 await ,在 def 前面加上 async,不需要去适应至今让我还是十分不适的 yield / yield from

Read more »

紧接 Kafka生产者分析——KafkaProducer

前文介绍过,KafkaProducer 可以有同步和异步两种方式发送消息,其实两者的底层实现相同,都是通过异步方式实现的。主线程调用 KafkaProducer#send() 方法发送消息的时候,先将消息放到 RecordAccumulator 中暂存,然后主线程就可以从 send() 方法中返回了,此时消息并没有真正地发送给 Kafka,而是缓存在了 RecordAccumulator 中。之后,业务线程通过 KafkaProducer#send() 方法不断向 RecordAccumulator 追加消息,当达到一定的条件,会唤醒 Sender 线程发送 RecordAccumulator 中的消息。

下面我们就来介绍 RecordAccumulator 的结构。首先需要注意的是,**RecordAccumulator 至少有一个业务线程和一个 Sender 线程并发操作,所以必须是线程安全的**。

Read more »

动态 AOP 使用示例

  1. 创建用于拦截的 bean

    1
    2
    3
    4
    5
    6
    7
    8
    @Data
    public class TestBean {
    private String testStr = "test";

    public void test() {
    System.out.println("test");
    }
    }
  2. 创建 Advisor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    @Aspect
    public class AspectJTest {
    @Pointcut("execution(* *.test(..))")
    public void test() {

    }

    @Before("test()")
    public void beforeTest() {
    System.out.println("beforeTest");
    }

    @After("test()")
    public void afterTest() {
    System.out.println("afterTest");
    }

    @Around("test()")
    public Object aroundTest(ProceedingJoinPoint p) {
    System.out.println("brfore around");
    Object o = null;
    try {
    o = p.proceed();
    } catch (Throwable e) {
    e.printStackTrace();
    }
    System.out.println("after around");
    return o;
    }
    }
  3. 创建配置文件

    1
    2
    3
    <aop:aspectj-autoproxy />
    <bean id="testBean" class="io.github.binglau.bean.TestBean" />
    <bean class="io.github.binglau.AspectJTest" />
  4. 测试

    1
    2
    3
    4
    5
    6
    @Test
    public void testAop() {
    ApplicationContext ctx = new ClassPathXmlApplicationContext("beanFactory.xml");
    TestBean bean = (TestBean) ctx.getBean("testBean");
    bean.test();
    }
  5. 不出意外结果

    1
    2
    3
    4
    5
    brfore around
    beforeTest
    test
    after around
    afterTest

可知 <aop:aspectj-autoproxy /> 是开启 aop 的关键,我们不妨由此入手。

Read more »

文章参考自《Apache Kafka 源码剖析》

github 地址:https://github.com/BingLau7/kafka,里面有代码的相关注释

具体部署方式不进行指导了,网上资料比较多

KafkaProducer Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class ProducerDemo {
public static void main(String[] args) {
boolean isAsync = args.length == 0 ||
/* 消息的发送方式:异步发送还是同步发送 */
!args[0].trim().equalsIgnoreCase("sync");

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
/* 客户端的 ID */
props.put("client.id", "DemoProducer");
/*
* 消息的 key 和 value 都是字节数组,为了将 Java 对象转化为字节数组,可以配置
* "key.serializer" 和 "value.serializer" 两个序列化器,完成转化
*/
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");

/* StringSerializer 用来将 String 对象序列化成字节数组 */
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

/* 生产者的核心类 */
KafkaProducer producer = new KafkaProducer<>(props);

/* 向指定的 test 这个 topic 发送消息 */
String topic = "test";

/* 消息的 key */
int messageNo = 1;

while (true) {
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();

if (isAsync) { /* 异步发送消息 */
/*
* 第一个参数是 ProducerRecord 类型的对象,封装了目标 Topic,消息的 kv
* 第二个参数是一个 CallBack 对象,当生产者接收到 Kafka 发来的 ACK 确认消息的时候,
* 会调用此 CallBack 对象的 onCompletion() 方法,实现回调功能
*/
producer.send(new ProducerRecord<>(topic, messageNo, messageStr),
new DemoCallBack(startTime, messageNo, messageStr));
} else { /* 同步发送消息 */
try {
/*
* KafkaProducer.send() 方法的返回值类型是 Future<RecordMetadata>
* 这里通过 Future.get 方法,阻塞当前线程,等待 Kafka 服务端的 ACK 响应
*/
producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get();
System.out.printf("Send message: (%d, %s)\n", messageNo, messageStr);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
/* 递增消息的 key */
++messageNo;
}
}
}

class DemoCallBack implements Callback {
/* 开始发送消息的时间戳 */
private final long startTime;
private final int key;
private final String message;

public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}

/**
* 生产者成功发送消息,收到 Kafka 服务端发来的 ACK 确认消息后,会调用此回调函数
* @param metadata 生产者发送的消息的元数据,如果发送过程中出现异常,此参数为 null
* @param exception 发送过程中出现的异常,如果发送成功为 null
*/
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.printf("message: (%d, %s) send to partition %d, offset: %d, in %d\n",
key, message, metadata.partition(), metadata.offset(), elapsedTime);
} else {
exception.printStackTrace();
}
}
}
Read more »
0%