Kafka 数据同步
本文主要针对数据已经汇聚在 kafka 中的场景,为您介绍如何使用 logkit-pro 及智能日志平台,同步 kafka 中的数据,并给出了性能调优方案。
安装 logkit-pro
1.登录 logkit-pro,进入机器管理页面,点击添加机器。
2.手动安装:根据您机器的操作系统版本选择对应的命令,复制到命令行工具即可,如图所示:
详细的安装文档可以阅读 logkit-pro 安装 。
- 安装完毕后,您可以在机器列表页面看到您安装的机器:
添加收集器
安装好 logkit-pro agent 以后,您就可以使用 logkit-pro 服务端界面来收集日志。
进入数据收集页面,点击 添加收集器 -> 日志收集。
选择 kafka 数据源读取
根据您的 kafka 版本,选择对应的 kafka 数据源读取,我们提供了两个 kafka 读取,一个针对 0.8 及以前的,另一个针对 0.9 及以后的。
更多读取的参数配置项,可以查看 Kafka 数据读取
填写完毕后,可以选择您安装的 agent,尝试获取实际的数据,方便后面做数据解析。
根据数据的实际格式选择解析方式
读取到实际的数据后,可以根据数据实际的格式选择解析方式,如图所示,我们读取到的是 json
格式的数据,可以直接使用 json
解析方式对数据解析。
具体的不同解析方式选择和详细介绍,可以参考解析器文档进一步查看。
【可选功能】转换数据
解析完毕后,会进入到数据转换界面,可以根据您的需要进行数据转换。常见的一些转换需求如下:
1. 针对 IP 进行扩展,有助于在服务端进行日志分析时绘制中国地图及世界地图。
选择 IP
转换工具,将数据中的 IP 字段扩展出相应的区域、地理、运营商等信息。支持本地( IP 库)、服务端(七牛 IP 库)两种方式解析。如图所示,我们将 myip
字段的IP值解析出来了 myip_country
等字段。
2. 过滤掉不关心的数据,有助于在客户端做边缘计算,去除大量无用的字段,节省传输带宽。
选择 discard
转换工具,勾选需要去掉的字段,当然也可以选择 pick
工具,勾选需要保留的字段,其他都删除。如图所示,我们勾选了 haah
字段,在转换后的数据中已经没有这个字段。
3. 重命名,有助于解决大数据平台数据字段类型冲突问题
选择 rename
转换工具,将需要改名的字段,命名为新的名称。新的名称可以在随后定义为新的类型,不会与老的数据命名冲突。如图所示,我们将 myip
字段改名为了 client_ip
。如果要从本地解析 IP 转为使用服务端解析 IP,记得用重命名功能改个名字,因为服务端解析 IP 会将字段定义为 IP 类型,而本地解析只是字符串类型。
4. 时间类型转换,确定时间字段有助于后续按照日志的事件时间进行分析
选择 date
转换工具后,大多数情况下能自动识别出时间格式并转换,若无法识别,可以按照智能推荐填写,也可以根据date 转换文档中的说明填写自定义格式。如图所示,选择 time
字段后会自动识别并转化为 RFC3339
格式,转化后的数据字段没有改变只是因为我没有点击添加,您配置转换工具的时候也不要忘了添加哦。
5. 字段类型转换,有助于按特定类型进行分析,如数值类型求平均值等
选择 convert
转换工具,可以勾选字段,并转换为对应类型。如图所示,我们将 mynumber
字段改为了 long
类型,在样例日志中还是字符串类型,转换后就变成了整数,在这个例子中不想丢失精度也可以转为 float
类型。
还有非常多转换工具,不再一一介绍,可以参考 转换器相关文档了解更多内容。
发送数据
1. 发送到七牛智能日志管理平台,进行丰富的日志分析
七牛智能日志管理平台提供丰富而详尽的日志查询分析、报表展示、监控报警、数据智能等能力,解决您大规模数据量下的数据分析难题。可以查看智能日志管理平台、搜索分析、报表展示等文档,获得更多详细的功能说明。
如图所示,填写一个实时仓库名称和工作流名称即可,若仓库不存在会自动创建。默认情况下就会导出到日志仓库。发送到七牛智能日志管理平台的数据,可以同时存一份到七牛云存储,只需要开启自动导出到七牛云存储
功能即可。
2. 发送到七牛云存储,进行数据备份和分析
若您的数据不需要实时查询分析,也可以将数据单独发送到七牛云存储进行备份,后续也可以通过七牛的Spark 服务对数据进行批量分析。七牛云存储与七牛Spark服务相结合,使得您可以以极低的成本对海量数据进行分析。
3. 发送到 Kafka,进行机房间数据同步
当然,您完全可以选择其他发送服务,如发送到其他机房的 Kafka,进行数据同步功能。
更多发送源,可以查看发送源进行了解。
添加并分发到机器
接下来,点击下一步,提交并分发到机器即可。
查看运行状态
分发完成后,您可以在【数据收集】页面,也可以在【机器管理】页面,进入查看您的收集任务运行状态。
性能调优
如果读取的延迟较高,就需要进行性能调优,提升读取速率。
从本质上讲,性能调优是使得程序可以充分利用系统资源的过程。让我们按瓶颈出现的可能性逐一讲解功能配置方法。
查看程序对 CPU 用量的常见命令如: top
、 htop
、ps aux
查看程序内存用量的常见命令如: top
、htop
、ps aux
查看磁盘 IO 的常用命令如: iostat -x 10
、iotop
查看网络带宽的常用命令如: iftop
、netstat
1. 打开整体的 CPU 控制开关
首先要调高 logkit-pro 的主配置文件 logkit.conf
中针对 CPU 使用核数的整体开关,该配置为 logkit 占用系统 CPU 资源的上限。
主配置文件中有 runtime
-> max_procs
一项,该项配置控制整体使用的 CPU 数量,默认为1
,该参数理论上限为机器的逻辑核数,可以根据机器的配置调整,若不确定机器具体核数,写大了也没有关系。
{
"version": 1,
"service": {
"bind_host": ":3000",
"static_dir": "public",
"upload_dir": "data/upload"
},
"service_url": {
"smart_elf": "https://logkit-pro.qiniu.com/"
},
"runtime": {
"max_procs": 1
},
"security": {
"disable_auths": false,
"auths_file": "auths.conf"
}
}
注意:主配置文件调整后,要重启 logkit 才能生效。
2. 配置多个收集任务多线程读取
但是 Kafka 读取默认只使用单核,可以通过配置多实例来进行并发消费, 即多个收集任务,每个收集任务配置同一个 consumergroup
就可以保证数据不重复收集。
一方面,可以在单个机器上配置多个收集任务,除 Runner名称外,其他配置保持一致即可;
另一方面,也可以在多台机器上配置同样的收集任务,只要保证 consumergroup
是一样的即可。
理论上,收集任务的最大并发数量为 Kafka 中收集的 Topic
的 Partition
数量。
3. 配置多线程发送
调整了 logkit 整体可以使用的核数以后,就要想办法让程序利用上多核,在主配置的多核开启后,解析和转换会自动利用上多核能力,但是读取和发送是默认不开启多线程发送的。
发送端的多线程发送功能,可以充分利用 CPU 多核的能力,提升发送效率。这一点我们设计三种多线程发送方式【正常使用第一种即可】。
- 【推荐】纯并发发送,不经过任何队列,纯开启多线程并发发送。对于发送的 CPU 是性能瓶颈的情况下尤为有效。设置策略(
ft_stategy
) 为 并发发送(concurrent
),配置并发数量(ft_procs
),设置为 "2",就是开 2 个并发发送,速度就能大大提升,这里最大值可以设置为逻辑核数的数量。 - 磁盘队列并发,先保存到磁盘队列,再并发发送,使用磁盘队列主要便于读取和发送异步化,有利于发送需要大批量发,读取快的情况,发送可以攒一批数据发送。该功能适合接受数据的服务端能承受的
单次请求数据量较大,而单次请求响应较慢
的情况。设置策略(ft_stategy
) 为 磁盘队列缓存发送(always_save
),则所有数据会先发送到本地队列, 同样需要配置并发数量(ft_procs
), 设置为 "2",就是开 2 个线程在磁盘队列后并发发送,最大值可以设置为逻辑核数的数量。 - 内存队列,原理类似磁盘队列,但是在程序被强行中断时存在数据丢失风险,性能上比磁盘队列更优。配置上与磁盘队列相同,额外配置
ft_memory_channel
为true
即可。
上述针对 CPU
的配置调整完成后需要重新分发到对应的 agent 执行,若观察一段时间后,发现使用的 CPU 已经占满,并且发送速率依然较慢,极有可能是配置的解析和转换较多或者性能较低,此时可以从解析方式或者转换方式上考虑优化,如能用 csv 的转换就不用 grok 等等。当然,数据量大的情况下发送时使用压缩功能发送也极有可能消耗较多 CPU,但是一般情况下不建议关掉压缩功能,因为带宽资源通常比 CPU 资源更珍贵。
4. 查看带宽,考虑压缩发送
如果带宽使用率达到了您带宽大小的上线,那么就需要调整带宽了,或者看看是否采用压缩发送功能,注意开启。
logkit-pro 在几乎所有发送都设置了发送的压缩方式,如七牛智能日志管理平台的 压缩发送(pandora_gzip
) 功能, http发送的 是否启用gzip(http_sender_gzip
) 功能等。
至此,Kafka 数据同步的使用场景就介绍完了,祝您使用愉快!