背景

  • 使用 Postgresql Jdbc Driver 读取大量数据

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.6.0</version>
    </dependency>
  • 读取的数据中包含 timestamp with time zone 与 time with time zone

  • Pg Server 是 +08:00 时区

Read more »

主要概念解释

  • Reader:数据采集模块,负责数据读取,将数据发送给 Channel。
  • Writer:数据写入模块,负责不断读取 Channel 的数据,并将 Channel 的数据写入到目的端。
  • Channel:通过 pushXpullX 接口提供 Plugins 数据通道能力,同时统一统计、限速能力。
  • RecordSender:基于 Channel 封装的接口,用于 Reader 将 Record 传递到框架
  • RecordReceiver:基于 Channel 封装的接口,用于 Writer 从框架获取 Recrod
  • Job:单个数据同步的作业,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job 模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子 Task)、TaskGroup 管理等功能。
  • Task:Task 是 DataX 作业的最小单元,每一个 Task 都会负责一部分数据的同步工作。Task 由 TaskGroup 进行管理。Task 会固定启动 Reader -> Channel -> Writer 的线程来进行同步工作。
  • TaskGroup:管理一组 Task 的运行。
Read more »

结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
├── agent							[ClassLoader/代理启动]
├── arthas-agent-attach [Agent Attach?]
├── arthas-spring-boot-starter [针对 spring boot-2 提供的自动集成技能]
├── arthas-vmtool [JVM 工具模块]
├── async-profiler [profiler 的 so 文件]
├── bin [shell 脚本]
├── boot [启动模块]
├── client [client 模块]
├── common [通用模块]
├── core [核心功能模块]
├── demo [demo 展示]
├── lib [arthas jni]
├── math-game [demo]
├── memorycompiler [动态编译模块]
├── packaging [打包专用]
├── site [arthas 官网信息]
├── spy [定义 SPI(方法执行前后插入点)]
├── testcase [测试 case]
├── tunnel-client [tunnel-client/server 均是为了提供管理多个 arthas 的能力]
├── tunnel-common
├── tunnel-server
└── tutorials
Read more »

介绍

本文的起因是在工作中到 Kafka 的使用最广泛、性能要求最高、问题最多的组件。而其中性能问题的调优思路往往与 Kafka 的攒批息息相关。所以这篇文章中会一起学习 Kafka 攒批原理及如何使用 Kafka 的攒批。

Read more »

文章简介

  • [正常]叙述 Kafka 从 Producer 发送消息到 Server 日志落盘全过程
  • [提升]详解 Kafka 日志格式,并通过 Java / Rust 两类解析方式
  • [深入]Rust-Kafka-Client Producer 到 Kafka Server
  • 需要说明的是这篇文章的局限性
    • 仅介绍两个 topic 各有两个 partition 为基础进行介绍,会调用 flush 强行将消息刷新进入 topic 内。发送消息每个 partition 各 5 条
    • 不涉及鉴权部分
    • 不涉及事务消息及有序消息部分
    • Kafka 版本 3.1.0,如无必要不会涉及老版本的历史包袱说明
    • 流程图中只记录关键路径,关键信息,部分细节信息可能需要细看代码才行,但是不妨碍原理理解
    • 这其中会涉及到 ReplicaNetwork 的部分知识,但是在这篇文章中只会涉及到比较浅显的部分,我们默认网络层和主副本是对我们透明的,对其中细节及设计部分我们会在另外的文章中讲解。
Read more »

借助 Netty 官方 Echo 实例: Echo Demo

问题

  1. 如何与底层交互
  2. Bootstrap 与 ServerBootstreap 有什么异同
  3. NioSocketChannel 与 NioServerSocketChannel 有什么异同
  4. 为什么说 netty 是事件驱动,其事件是如何传播的
  5. 为什么 ServerBootstrap 需要两个 EventLoopGroup,分别有什么用
Read more »

Demo

1
2
3
4
5
6
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer();
buf.writeBytes("test".getBytes());

byte[] readBytes = new byte[buf.readableBytes()];
buf.readBytes(readBytes);
System.out.println("read content: " + new String(readBytes));

直接看 PooledByteBufAllocator.newHeapBuffer(int initialCapacity, int maxCapacity)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<byte[]> heapArena = cache.heapArena;

final ByteBuf buf;
if (heapArena != null) {
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

return toLeakAwareBuffer(buf);
}
Read more »

要点

  1. UnpooledByteBufAllocator 的 Heap 与 Direct 的实现
  2. Heap 内部实现使用了 byte[]
  3. Direct 内部依托于 PlatformDependent0 的各种 native 方法
  4. toLeakAwareBuffer 部分主要讨论了 Netty 是如何应对内存泄露,以及检测内存泄露跟踪的四个级别是如何实现的
Read more »
0%