spark简单应用-单词统计

  这里使用spark一个简单的单词统计,文本来源是spark预编译包里的README.md文档,虽然这是一个很小的文件,根本不需要使用spark。但是可以对rdd的操作有一个最基本的实践。

代码

  新建一个jupyter spark的kernel时,会自动新建一个变量sc,这是一个sparkContext对象,它是整个spark程序的入口。

1
sc
org.apache.spark.SparkContext@1d533154
  • 首先,我们读取文件,读取本地文件是可以使用sc.textFile(),将本地路径填写进去即可
1
2
val filePath = "/home/lm/Softwares/spark-2.1.0-bin-hadoop2.6/README.md"
val data = sc.textFile(filePath)
  • 我们先看一下文本的前20行内容,可以发现,读取文件时一行数据在rdd里就是一条数据,共有104行。
1
2
3
data.take(20).foreach(println)
println("-----------------------------")
println("数据总量: " + data.count())
# Apache Spark

Spark is a fast and general cluster computing system for Big Data. It provides
high-level APIs in Scala, Java, Python, and R, and an optimized engine that
supports general computation graphs for data analysis. It also supports a
rich set of higher-level tools including Spark SQL for SQL and DataFrames,
MLlib for machine learning, GraphX for graph processing,
and Spark Streaming for stream processing.

<http://spark.apache.org/>


## Online Documentation

You can find the latest Spark documentation, including a programming
guide, on the [project web page](http://spark.apache.org/documentation.html).
This README file only contains basic setup instructions.

## Building Spark

-----------------------------
数据总量: 104
  • 我们注意到,数据中有一些空行,对此我们可以过滤掉长度为0的数据
1
val rdd1 = data.filter(_.length > 0)
  • 接着我们对每一行的句子进行split,符号为” “,这样可以取出每个独立的单词,那么数据就由 RDD[String] => RDD[List[String]]
1
val rdd2 = rdd1.map(x => x.split(" "))
  • 接着我们使用flatMap操作将List[String]取出并且flat,这样数据又由 RDD[List[String]] => RDD[String]
1
val rdd3 = rdd2.flatMap(x => x)
  • 我们还发现,split后,每个单词里面有包含了标点”,”和”.”等等等,我们这里只暂时替换”,””.”,对此我们进行一下处理,去掉这些符号;
  • 另外,单词中还包含纯标点符号,我们也要去掉这些脏数据;
1
2
val regex="^[A-z]*$".r 
val rdd4 = rdd3.map(_.replace(",", "").replace(".", "")).filter(regex.findFirstMatchIn(_) != None).filter(_.length > 0)
1
2
rdd4.take(20).foreach(println)
println("单词总数为: " + rdd4.count())
Apache
Spark
Spark
is
a
fast
and
general
cluster
computing
system
for
Big
Data
It
provides
APIs
in
Scala
Java
单词总数为: 436
  • 接下来,我们把每个单词中的字母都统一为小写,然后把每个单词构造为key-value结构,key就是单词本身,value为1。
1
val rdd5 = rdd4.map(x => (x, 1))
  • 然后进行reduceByKey,value值在reduceByKey时相加
1
val rdd6 = rdd5.reduceByKey{case(a, b) => a + b}
  • 到这里,我们已经对单词进行了统计,我们看一下结果。
1
2
rdd6.take(10).foreach(println)
println("出现的单词数量为: " + rdd6.count())
(package,2)
(this,1)
(Because,1)
(Python,4)
(its,1)
(guide,1)
([run,1)
(general,3)
(have,1)
(locally,3)
出现的单词数量为: 224
  • 如package,这里统计出现了两次,我们看一下原始文件。

  • 原始文件出现了三次package,其中有一次出现的情况是”package.)”,我们replace了”.”,而没有处理”)”,故没有参与统计,可以做优化处理。

  • 如果我们想看下哪个单词出现频率最高,还需要对次数做一个排序。
1
val rdd7 = rdd6.map(x => (x._2, x._1)).sortByKey(false)
1
rdd7.take(5).foreach(println)
(24,the)
(17,Spark)
(17,to)
(12,for)
(9,and)
  • 我们发现排名前五的单词如上所示。
  • 以上我们共创建了7个rdd,实际运用时可以使用链式法则将所有的transform操作直接串起来,这个就看个人喜好了。

enjoy it!

0%