这里使用spark一个简单的单词统计,文本来源是spark预编译包里的README.md文档,虽然这是一个很小的文件,根本不需要使用spark。但是可以对rdd的操作有一个最基本的实践。
代码
新建一个jupyter spark的kernel时,会自动新建一个变量sc,这是一个sparkContext对象,它是整个spark程序的入口。
1 | sc |
org.apache.spark.SparkContext@1d533154
- 首先,我们读取文件,读取本地文件是可以使用sc.textFile(),将本地路径填写进去即可
1 | val filePath = "/home/lm/Softwares/spark-2.1.0-bin-hadoop2.6/README.md" |
- 我们先看一下文本的前20行内容,可以发现,读取文件时一行数据在rdd里就是一条数据,共有104行。
1 | data.take(20).foreach(println) |
# Apache Spark
Spark is a fast and general cluster computing system for Big Data. It provides
high-level APIs in Scala, Java, Python, and R, and an optimized engine that
supports general computation graphs for data analysis. It also supports a
rich set of higher-level tools including Spark SQL for SQL and DataFrames,
MLlib for machine learning, GraphX for graph processing,
and Spark Streaming for stream processing.
<http://spark.apache.org/>
## Online Documentation
You can find the latest Spark documentation, including a programming
guide, on the [project web page](http://spark.apache.org/documentation.html).
This README file only contains basic setup instructions.
## Building Spark
-----------------------------
数据总量: 104
- 我们注意到,数据中有一些空行,对此我们可以过滤掉长度为0的数据
1 | val rdd1 = data.filter(_.length > 0) |
- 接着我们对每一行的句子进行split,符号为” “,这样可以取出每个独立的单词,那么数据就由 RDD[String] => RDD[List[String]]
1 | val rdd2 = rdd1.map(x => x.split(" ")) |
- 接着我们使用flatMap操作将List[String]取出并且flat,这样数据又由 RDD[List[String]] => RDD[String]
1 | val rdd3 = rdd2.flatMap(x => x) |
- 我们还发现,split后,每个单词里面有包含了标点”,”和”.”等等等,我们这里只暂时替换”,””.”,对此我们进行一下处理,去掉这些符号;
- 另外,单词中还包含纯标点符号,我们也要去掉这些脏数据;
1 | val regex="^[A-z]*$".r |
1 | rdd4.take(20).foreach(println) |
Apache
Spark
Spark
is
a
fast
and
general
cluster
computing
system
for
Big
Data
It
provides
APIs
in
Scala
Java
单词总数为: 436
- 接下来,我们把每个单词中的字母都统一为小写,然后把每个单词构造为key-value结构,key就是单词本身,value为1。
1 | val rdd5 = rdd4.map(x => (x, 1)) |
- 然后进行reduceByKey,value值在reduceByKey时相加
1 | val rdd6 = rdd5.reduceByKey{case(a, b) => a + b} |
- 到这里,我们已经对单词进行了统计,我们看一下结果。
1 | rdd6.take(10).foreach(println) |
(package,2)
(this,1)
(Because,1)
(Python,4)
(its,1)
(guide,1)
([run,1)
(general,3)
(have,1)
(locally,3)
出现的单词数量为: 224
如package,这里统计出现了两次,我们看一下原始文件。
原始文件出现了三次package,其中有一次出现的情况是”package.)”,我们replace了”.”,而没有处理”)”,故没有参与统计,可以做优化处理。
- 如果我们想看下哪个单词出现频率最高,还需要对次数做一个排序。
1 | val rdd7 = rdd6.map(x => (x._2, x._1)).sortByKey(false) |
1 | rdd7.take(5).foreach(println) |
(24,the)
(17,Spark)
(17,to)
(12,for)
(9,and)
- 我们发现排名前五的单词如上所示。
- 以上我们共创建了7个rdd,实际运用时可以使用链式法则将所有的transform操作直接串起来,这个就看个人喜好了。