智能日志管理平台

  • Spark 服务

    最近更新时间:2018-09-07 15:17:00

    XSpark概念

    XSpark 是七牛提供的大数据云计算应用, 它提供了 Java, Scala, Python 和 R 的高级 API,以及一个支持通用的执行图计算的优化过的引擎。 它还支持一组丰富的高级工具,包括使用SQL处理结构化数据处理的Spark SQL,用于机器学习的MLlib,用于图计算的GraphX,以及 Spark Streaming。通过 XSpark 您可以做一站式数据分析,数据挖掘并实现数据可视化。

    XSpark基本概念

    1. 推荐将七牛云存储服务作为 XSpark 的数据来源。

      • XSpark 除了支持七牛云存储,如kodo://fusion://

      • XSpark 支持关系型数据库,mysql,mongodb等等。

    2. 使用 XSpark之前,先了解以下几个概念:

      • 资源容器(container):CPU(规格 & 数量)和内存(大小)。

      • 调度节点(Master):调度节点是用来管理和分发分布式计算任务。负责整个Spark计算过程的资源分配,任务调度等等工作

      • 计算节点(worker): XSpark 集群的执行节点,用来执行分布式计算任务,一般来说,计算节点越多,计算效率越高;它使用的资源也包含在资源容器当中。计算节点规格与数量的大小和执行效率成正比,请酌情选择。

      • Zeppelin:Zeppelin 是一个 Web 笔记形式的交互式数据查询分析工具,可以在线用 scala 和 SQL 对数据进行查询分析并生成报表。在 XSpark 里, Zeppelin 是我们的代码编写、运行、结果可视化核心的用户接口。

      • 监控:XSpark为每一个容器组件都提供了CPU,内存,网络,磁盘等使用率监控图表,用户在执行任务的过程中可通过观察各个节点的资源使用量来合理规划调整XSpark配置。

    创建应用

    1. 登录 https://c.qiniu.com, 选择区域为华东一区,在左侧边栏中选择 【应用市场】。

    1. 找到 XSpark 应用 ,点击申请开通。

    1. 应用开通之后点击【部署应用】即可部署一个新的 XSpark 应用。

      部署 XSpark 我们需要填写几个信息。

      • 应用名称: XSpark 实例的名称,用来标识该应用的唯一性。由 2~63 位的小写字母、数字或"-",只能字母开头,字母或数字结尾。

      • Spark 版本:Spark选您将要使用的Spark 版本。默认的Spark版本为推荐使用版本。

      • Worker 容器规格:选择计算节点需要的 CPU(规格 & 数量)和内存(大小)。

      • Worker 个数:选择计算节点的数量(Spark worker个数)。

      • AK/SK:用户的七牛AK/SK,提供一键读取功能:点击读取即可自动一键获取当前账户的 AK/SK。注:如果您想要使用七牛云存储上的数据请务必选择

    2. 填写完成之后点击【创建应用】。

    服务入口

    下面是XSpark 的主界面,在这里我们可以开始Spark的开发,和服务状态监控,集群管理相关工作。

    点击开始使用进入 Zeppelin 工作页面,在这里编写代码对数据进行计算并将结果可视化。

    点击 SparkUI 进入服务监控页面,查看当前任务的运行状态,包括调度节点、计算节点、正在运行中的程序以及运行完成的程序的相关信息。

    服务状态

    在服务状态区块,我们可以修改集群配额,关注节点监控信息。

    在服务状态下面可以看到调度节点(Master)、计算节点(worker)、Zeppelin 服务的 CPU、内存、磁盘规格、运行状态等信息。可以根据实际情况更改服务占用资源。点击修改即可。

    服务监控

    服务状态下面点击具体的服务名称进入服务的监控页面,对服务占用的 CPU、内存、网络流量进行实时监控。并且可以在右上角切换监控图的图表类型。

    集群管理

    该部分可以对集群进行管理。

    1. 重置集群

    当您的应用处于未知状态,完全不可用时,您可以通过这个按钮重置您的集群。(请先备份您的代码)

    2. 磁盘清理

    根据上面的监控信息,如果我们发现磁盘占用量比较大时,我们可以使用该按钮来清理磁盘。

    3. 代码备份/恢复

    如果您需要重新新建另外一个 XSpark 实例时,而又不想丢弃之前在 Zeppelin 写的代码,这时候您可以尝试 XSpark 提供的代码备份/恢复功能。当然除此之外,常常备份自己的代码是个好习惯。

    备份代码:点击备份,它会自动备份您当前实例的所有代码。

    恢复代码:恢复时第一步需要选择要恢复的代码来自于哪一个您曾经备份过的 XSpark 实例, 第二步选择需要恢复的代码。

    通过集群管理,您可以灵活的使用一个 XSpark 实例完成多次数据计算,节省资源。

    开始使用

    点击开始使用进入 Zeppelin 工作页面,通过 XSpark Tutorial 进来学习 Zepplin 的使用。

    当然,如果您要编写代码分析您自己的数据,点击 create new note,在您自己的 notebook 编写代码即可。

    Spark Scala 示例

    XSpark 提供的示例代码如下:

    在代码编辑窗口中,操作演示所使用的示例是读取七牛对象存储的某个 Bucket 中的 README.md 文件,然后输出这个文件的所有字段信息。这是一个简单的分析文本文件的例子,请自行替换上面的文件路径。

    操作演示:

    Spark SQL 示例

    我们可以在代码编辑窗口中编写 Spark SQL 来对数据进行分析。首先我们需要将读取的文件注册成一张表,然后即可编写 SQL 对数据进行分析,这里我们使用两段代码完成这个操作。

    操作示例:

    代码说明如下

        // 首先从对象存储中读取 data.json 文件
        val dataPath = "qiniu://test-xspark/data.json"
        // 然后使用json的方式来读取文件
        val table = sqlContext.read.json(dataPath)
        table.printSchema()
        // 将读取的文件注册为一张表,表名是 data
        table.registerTempTable("data")
    

    将文件注册成表之后,编写 SQL 对数据进行分析:

        %sql select * from data limit 10
    

    注意: 编写 SQL 时, 头部必须包含 %sql

    基于 XSpark 的 Python 语言支持

    基于 XSpark 的 R 语言支持

    基于 XSpark 的机器学习

    定时任务

    操作流程:

    XSpark 支持对数据计算任务设置定时执行频率。

    快捷选择项中,我们支持 1 分钟、5 分钟、1 小时、3 小时、6 小时、12 小时、1 天;

    也可以手动输入cron表达式自定义定时周期。

    建议勾选:auto-restart interpreter on cron execution使得每次运行都是独立的运行环境,但如果您在同一个应用里存在多个 notebook 时,请慎重勾选该选项,由于单个 App 里执行引擎是共享的,自动重启会阻断其他正在执行的任务。

    操作演示:

    XSpark 邮件告警功能

    邮件告警经常配合定时任务来使用,当任务失败时,会有邮件发送具体的失败信息到指定的邮箱内。

    Spark Interpreter

    当我们使用 Spark时,需要用到 Spark Interpreter,这里我们将主要介绍 Spark Interpreter 的概念,及使用方式

    XSpark Interpreter 解释器

    通过界面右上角的 Interpreter 标签进入 XSpark 解释器详情页

    搜索 spark 查看 Spark 解释器:

    这里我们能够观察到 Spark 的执行引擎的基本信息。

    Spark 解释器组由 5 个解释器组成:

    1. %spark:创建一个 SparkContext 并提供 Scala 环境
    2. %spark.pyspark:提供 Python 环境。
    3. %spark.r:提供具有 SparkR 支持的 R 环境。
    4. %spark.sql:提供 SQL 环境。
    5. %spark.dep:可以动态的添加下载依赖。

    您可以对 Interpreter 做重启和编辑:

    1. 重启 Interpreter:

      重启 Interpreter 会结束掉当前正在运行的 spark 任务,释放资源。

    2. 编辑 Interpreter:手动添加依赖。

    添加第三方依赖包

    当我们编写 spark 代码时难免会需要依赖第三方的 jar 包依赖,此时我们可以通过两种方式来实现你的需求。

    1. 通过 Interpreter 配置界面

    点击编辑 Spark Interpreter,找到低下的“Dependencies”模块,依次添加我们需要的依赖包。
    格式为:groupId:artifiactId:version


    点击保存后会提示自动重启Interpreter

    1. 以代码的形式添加依赖

    点击代码编辑窗口之间的缝隙,可以增加一个新的代码编辑窗口。

    在新的代码编辑窗口输入:

    %spark.dep
    

    换行后即可添加第三方依赖:

    z.load("第三方依赖地址")
    

    我们也支持添加第三方镜像仓库来加速依赖下载,示例如下:

    z.addRepo("3rdRepo").url("http://maven.aliyun.com/nexus/content/groups/public/")
    

    注意:如果遇到如下提示,需要停止 XSpark 已经在运行的任务,才能使用加载第三方依赖功能:

    Must be used before SparkInterpreter (%spark) initialized
    Hint: put this paragraph before any Spark code and restart Zeppelin/Interpreter
    

    场景分析

    XSpark 提供了一个以 Spark 计算引擎以及 Zeppelin 为主要的用户接口,我们可以在 Zeppelin 编写任意的代码。下面我们使用 XSpark 来演示一些经典场景。

    注:下面演示在 zeppelin 的 notebook 完成,因此需要先创建好 xspark 应用,然后新建一个空白的 notebook。

    Ngnix 日志分析

    1. 加载 Nginx 日志

    val dataPath = "qiniu://<bucketName/nginxlog/date=2018-08-21/"
    val df = sqlContext.read.parquet(dataPath)
    df.registerTempTable("nginxlog")
    df.cache()
    

    仅仅需要上面几行代码,我们就可以读取 Nginx 日志并将其注册成 table 格式(将其注册成 table 的目的是后面使用 SQL 语法查询表格数据)。

    注意:此处使用的是七牛 pandora 里面标准的 parquet 存储的 Nginx 日志,如果您需要加载其他格式的日志,比如 json,text,请先使用 XSpark Tutorial 里面的 json,text 格式示例,并做必要的 ETL。

    2. 查询表格数据

    3. 查询城市分布情况并用饼图展示

    4. 查询响应时间趋势并用柱状图展示

    5. 查询慢请求情况并用区域图展示

    以上内容是否对您有帮助?
  • Icon free helper
    Close