智能日志管理平台

  • Kafka 数据同步

    最近更新时间:2018-09-12 10:59:14

    本文主要针对数据已经汇聚在 kafka 中的场景,为您介绍如何使用 logkit-pro 及智能日志平台,同步 kafka 中的数据,并给出了性能调优方案。

    安装 logkit-pro

    1.登录 logkit-pro,进入机器管理页面,点击添加机器。

    2.手动安装:根据您机器的操作系统版本选择对应的命令,复制到命令行工具即可,如图所示:

    详细的安装文档可以阅读 logkit-pro 安装

    1. 安装完毕后,您可以在机器列表页面看到您安装的机器:


    添加收集器

    安装好 logkit-pro agent 以后,您就可以使用 logkit-pro 服务端界面来收集日志。

    进入数据收集页面,点击 添加收集器 -> 日志收集

    选择 kafka 数据源读取

    根据您的 kafka 版本,选择对应的 kafka 数据源读取,我们提供了两个 kafka 读取,一个针对 0.8 及以前的,另一个针对 0.9 及以后的。

    更多读取的参数配置项,可以查看 Kafka 数据读取

    填写完毕后,可以选择您安装的 agent,尝试获取实际的数据,方便后面做数据解析。

    根据数据的实际格式选择解析方式

    读取到实际的数据后,可以根据数据实际的格式选择解析方式,如图所示,我们读取到的是 json 格式的数据,可以直接使用 json 解析方式对数据解析。

    具体的不同解析方式选择和详细介绍,可以参考解析器文档进一步查看。

    【可选功能】转换数据

    解析完毕后,会进入到数据转换界面,可以根据您的需要进行数据转换。常见的一些转换需求如下:

    1. 针对 IP 进行扩展,有助于在服务端进行日志分析时绘制中国地图及世界地图。

    选择 IP 转换工具,将数据中的 IP 字段扩展出相应的区域、地理、运营商等信息。支持本地( IP 库)、服务端(七牛 IP 库)两种方式解析。如图所示,我们将 myip 字段的IP值解析出来了 myip_country 等字段。

    2. 过滤掉不关心的数据,有助于在客户端做边缘计算,去除大量无用的字段,节省传输带宽。

    选择 discard 转换工具,勾选需要去掉的字段,当然也可以选择 pick 工具,勾选需要保留的字段,其他都删除。如图所示,我们勾选了 haah 字段,在转换后的数据中已经没有这个字段。

    3. 重命名,有助于解决大数据平台数据字段类型冲突问题

    选择 rename 转换工具,将需要改名的字段,命名为新的名称。新的名称可以在随后定义为新的类型,不会与老的数据命名冲突。如图所示,我们将 myip 字段改名为了 client_ip。如果要从本地解析 IP 转为使用服务端解析 IP,记得用重命名功能改个名字,因为服务端解析 IP 会将字段定义为 IP 类型,而本地解析只是字符串类型。

    4. 时间类型转换,确定时间字段有助于后续按照日志的事件时间进行分析

    选择 date 转换工具后,大多数情况下能自动识别出时间格式并转换,若无法识别,可以按照智能推荐填写,也可以根据date 转换文档中的说明填写自定义格式。如图所示,选择 time 字段后会自动识别并转化为 RFC3339 格式,转化后的数据字段没有改变只是因为我没有点击添加,您配置转换工具的时候也不要忘了添加哦。

    5. 字段类型转换,有助于按特定类型进行分析,如数值类型求平均值等

    选择 convert 转换工具,可以勾选字段,并转换为对应类型。如图所示,我们将 mynumber 字段改为了 long 类型,在样例日志中还是字符串类型,转换后就变成了整数,在这个例子中不想丢失精度也可以转为 float 类型。

    还有非常多转换工具,不再一一介绍,可以参考 转换器相关文档了解更多内容。

    发送数据

    1. 发送到七牛智能日志管理平台,进行丰富的日志分析

    七牛智能日志管理平台提供丰富而详尽的日志查询分析、报表展示、监控报警、数据智能等能力,解决您大规模数据量下的数据分析难题。可以查看智能日志管理平台搜索分析报表展示等文档,获得更多详细的功能说明。

    如图所示,填写一个实时仓库名称和工作流名称即可,若仓库不存在会自动创建。默认情况下就会导出到日志仓库。发送到七牛智能日志管理平台的数据,可以同时存一份到七牛云存储,只需要开启自动导出到七牛云存储功能即可。

    2. 发送到七牛云存储,进行数据备份和分析

    若您的数据不需要实时查询分析,也可以将数据单独发送到七牛云存储进行备份,后续也可以通过七牛的Spark 服务对数据进行批量分析。七牛云存储与七牛Spark服务相结合,使得您可以以极低的成本对海量数据进行分析。

    3. 发送到 Kafka,进行机房间数据同步

    当然,您完全可以选择其他发送服务,如发送到其他机房的 Kafka,进行数据同步功能。

    更多发送源,可以查看发送源进行了解。

    添加并分发到机器

    接下来,点击下一步,提交并分发到机器即可。

    查看运行状态

    分发完成后,您可以在【数据收集】页面,也可以在【机器管理】页面,进入查看您的收集任务运行状态。

    性能调优

    如果读取的延迟较高,就需要进行性能调优,提升读取速率。

    从本质上讲,性能调优是使得程序可以充分利用系统资源的过程。让我们按瓶颈出现的可能性逐一讲解功能配置方法。

    查看程序对 CPU 用量的常见命令如: tophtopps aux
    查看程序内存用量的常见命令如: tophtopps aux
    查看磁盘 IO 的常用命令如: iostat -x 10iotop
    查看网络带宽的常用命令如: iftopnetstat

    1. 打开整体的 CPU 控制开关

    首先要调高 logkit-pro 的主配置文件 logkit.conf 中针对 CPU 使用核数的整体开关,该配置为 logkit 占用系统 CPU 资源的上限。

    主配置文件中有 runtime -> max_procs 一项,该项配置控制整体使用的 CPU 数量,默认为1,该参数理论上限为机器的逻辑核数,可以根据机器的配置调整,若不确定机器具体核数,写大了也没有关系。

    {
      "version": 1,
      "service": {
        "bind_host": ":3000",
        "static_dir": "public",
        "upload_dir": "data/upload"
      },
      "service_url": {
        "smart_elf": "https://logkit-pro.qiniu.com/"
      },
      "runtime": {
        "max_procs": 1
      },
      "security": {
        "disable_auths": false,
        "auths_file": "auths.conf"
      }
    }
    

    注意:主配置文件调整后,要重启 logkit 才能生效。

    2. 配置多个收集任务多线程读取

    但是 Kafka 读取默认只使用单核,可以通过配置多实例来进行并发消费, 即多个收集任务,每个收集任务配置同一个 consumergroup 就可以保证数据不重复收集。

    一方面,可以在单个机器上配置多个收集任务,除 Runner名称外,其他配置保持一致即可;
    另一方面,也可以在多台机器上配置同样的收集任务,只要保证 consumergroup 是一样的即可。

    理论上,收集任务的最大并发数量为 Kafka 中收集的 TopicPartition 数量。

    3. 配置多线程发送

    调整了 logkit 整体可以使用的核数以后,就要想办法让程序利用上多核,在主配置的多核开启后,解析和转换会自动利用上多核能力,但是读取和发送是默认不开启多线程发送的。

    发送端的多线程发送功能,可以充分利用 CPU 多核的能力,提升发送效率。这一点我们设计三种多线程发送方式【正常使用第一种即可】。

    1. 【推荐】纯并发发送,不经过任何队列,纯开启多线程并发发送。对于发送的 CPU 是性能瓶颈的情况下尤为有效。设置策略(ft_stategy) 为 并发发送(concurrent),配置并发数量(ft_procs),设置为 "2",就是开 2 个并发发送,速度就能大大提升,这里最大值可以设置为逻辑核数的数量。
    2. 磁盘队列并发,先保存到磁盘队列,再并发发送,使用磁盘队列主要便于读取和发送异步化,有利于发送需要大批量发,读取快的情况,发送可以攒一批数据发送。该功能适合接受数据的服务端能承受的单次请求数据量较大,而单次请求响应较慢的情况。设置策略(ft_stategy) 为 磁盘队列缓存发送(always_save),则所有数据会先发送到本地队列, 同样需要配置并发数量(ft_procs), 设置为 "2",就是开 2 个线程在磁盘队列后并发发送,最大值可以设置为逻辑核数的数量。
    3. 内存队列,原理类似磁盘队列,但是在程序被强行中断时存在数据丢失风险,性能上比磁盘队列更优。配置上与磁盘队列相同,额外配置 ft_memory_channeltrue 即可。

    上述针对 CPU 的配置调整完成后需要重新分发到对应的 agent 执行,若观察一段时间后,发现使用的 CPU 已经占满,并且发送速率依然较慢,极有可能是配置的解析和转换较多或者性能较低,此时可以从解析方式或者转换方式上考虑优化,如能用 csv 的转换就不用 grok 等等。当然,数据量大的情况下发送时使用压缩功能发送也极有可能消耗较多 CPU,但是一般情况下不建议关掉压缩功能,因为带宽资源通常比 CPU 资源更珍贵。

    4. 查看带宽,考虑压缩发送

    如果带宽使用率达到了您带宽大小的上线,那么就需要调整带宽了,或者看看是否采用压缩发送功能,注意开启。

    logkit-pro 在几乎所有发送都设置了发送的压缩方式,如七牛智能日志管理平台的 压缩发送(pandora_gzip) 功能, http发送的 是否启用gzip(http_sender_gzip) 功能等。

    至此,Kafka 数据同步的使用场景就介绍完了,祝您使用愉快!

    以上内容是否对您有帮助?
  • Icon free helper
    Close