概述
Spark 可以认为是改进版的 MapReduce,改进了 MapReduce 存在的以下问题
- 调度慢,启动耗时,由于 MapReduce 使用进程级的调度,相比 Spark 线程级调度,启动较慢;
- 计算慢,每一步都要保存中间结果到磁盘,相比 Spark 使用内存缓存中间结果较为耗时;
- 使用复杂,只提供 Map/Reduce 两种形式,相比 Spark 提供 flatMap、join、group 等难以使用;
- 缺乏作业流管理,多步任务需要多次 MapReduce,相比 Spark 提供 DAG 图管理作业流较为复杂。
下载安装
在 Spark 官网 下载安装包,这里选择 2.3.0 (Feb 28 2018)
和 Pre-build with user-provided Apache Hadoop
,也就是 spark-2.3.0-bin-without-hadoop.tgz
。使用之前安装的 Hadoop。
解压
|
|
配置
使用已经安装的 Hadoop 需要编辑 conf/spark-env.sh
文件
|
|
因为我将 hadoop
命令添加到了 PATH
环境变量,这里直接添加下面的内容到 spark-env.sh
最后即可
|
|
运行样例程序
|
|
这个程序会计算 PI 的近似值,执行之后可以在一团日志中找到
|
|
使用 spark-shell
spark-shell 提供了非常方便的基于 Scala 语言的命令行方式来是哟个 Spark,是学习 Spark 框架的很好方式。
这里以计算 README.md
中的 WordCount 为例,首先进入命令行
|
|
可以看到,Spark 的 Web UI 可以从本机的 4040 端口访问。Spark context
与 Spark session
已经被初始化为 sc
与 spark
可以直接在命令行中使用。
当前 Spark 的版本是 2.3.0,Scala 版本是 2.11.8。
|
|
以上两条命令读取了 README.md 并计算了它的行数。
|
|
这里执行了一系列的函数。首先将行通过 flatMap
转换为单词,然后通过 filter
过滤掉了大于 20
的单词和空的单词,再将单词通过 map
转换为 key-value, 然后以相同的 key 进行 reduceByKey
,将 value 相加,最后以相加后的值倒序排序。
最后,通过 take
取前十个,也就是出现最多的前十个单词,可以通过结果看到,单词 the 出现了 24 次。
至此,完成了对于 README.md 的单词数目统计,可以看到,依托于 spark 提供的丰富的函数,可以很方便的对数据进行转换。
在输出中可以看到,变量 a
的类型是 org.apache.spark.rdd.RDD[String]
,变量 w
的类型是 org.apache.spark.rdd.RDD[(String, Int)]
。
RDD 也就是 Resilient Distributed Datasets, 在 Spark 2.0 之前,RDD 是主要的 Spark 编程接口,也就是说对于数据的转换操作是围绕着 RDD 来的。
在 Spark 2.0 之后,使用 Dataset 替换了 RDD, Dataset 拥有与 RDD 类似的强类型属性,但在对比 RDD 更加优化。 RDD 依然是支持的,但 Spark 非常推荐开发者转换到 Dataset,因为它有着更好的性能。
使用 Dataset 来编写上面的单词统计程序
|
|
在运算过程中,可以使用 cache 来缓存结果
|
|
使用 spark-sql 统计 Nginx 日志
数据依旧是使用几行 Nginx 日志
|
|
这里目标是统计各个 IP 的访问次数,简单的使用空格分隔 IP 与其他数据
|
|
这里首先定义了一个类 Log
用来表达一条日志,然后将 RDD 通过 toDF
转换为 DataFrame,然后 df.createOrReplaceTempView
创建了一个 View
类似于关系型数据库中的视图概念。
最后通过执行一条 SQL 语句,很方便的查询出访问最多的几个 IP 地址。
DataFrame 是组织到命名列中的 Dataset, 概念上等同于关系型数据库中的表。
DateFrame 可以从各种来源构建,比如结构化的数据文件、Hive Table、外部数据库,现有的 RDD,在 Scale API 中 DataFrame 只是 Dataset[Row] 的别名,在 Java API 中,开发者需要使用 Dataset
Dataset 编程接口从 Spark 1.6 添加进来。