spark学习笔记

specai

2020/5/4

[TOC]

第一章 Scala编程

1. Scala运行方式

scala 1.java -classpath .:/usr/local/scala/lib/scala-lirary-jar HelloWorld 2.scala -classpath . HelloWorld 3.Scala命令行执行::load test.scala

2. 数据类型

  • Byte

  • Char

  • Short

  • Int

  • Long

  • Float

  • Double

  • Boolean

    富包装类

3. 变量

  • val不可变变量--函数式编程

  • var可变变量--面向对象编程

  • 类型推断机制:字面量可以自动进行类型推断。

4. 输入输出

  • 插值字符串表达式:s,f,raw(对字面值中的字符不做编码外,raw 插值器与 s 插值器在功能上是相同的)

    | 输入函数 | 输入函数 | 输入函数 | | :-------: | :-------: | :---------: | | readInt | readShort | readChar | | readDoule | readFloat | readBoolean | | readByte | readLong | readLine |

    ```scala import scala.io.StdIn import scala.io.Source val i=1 println(s"$i\n") println(raw"$i\n") println(f"$i%.o1f")

    var f=readInt

    //文件读取 val file=Source.fromFile("E:\scalaIO.txt") for(line <- file.getLines) { println(line) } file.close ```

5. 控制结构

  • 判断结构

    scala if(){ 语句块; } else if{ 语句块; } else { 语句块; } val x=6 val i =if(x>0)1 else 0(if结果可以当成赋值给变量)

  • 循环结构

    ```scala while(表达式){ 循环体 }

    do{ 循环体 }while(表达式)

    (生成器)

    for(变量<-表达式){ 循环体 } for (i<-1 t0 5 by 2)printf(i)

    守卫(guard)表达式

    for (变量<-表达式 if 条件表达式)语句块

    for推导式 for (变量<-表达式)yield {语句块} breakable{表达式 break}(util.control.Breaks) ```

  • 异常处理

    • 受检异常

    • 不受检异常(Scala)

    scala try{} catch{} finally{}

6. 数据结构

  • 数据Array:元素具有相同类型

    scala val myStrArr=new Array[String](3) myStrArr(0)="BigData" for (i<-0 to 2)printin(myStrArr(i))

  • 元组Tuple:对多个不同类型对象的一种简单封装

    scala val tuple=("BigData",2015,45.0) println(tuple._1)

  • 容器Collection:

    scala scala.collection 可变容器和不可变容器 scala.collection.mutable scala.collection.immutable

    • 列表(List):不可变的相同类型对象

    scala var strList=List("BigData","Hadoop","Spark") val other="Apache"::strList(生成新副本,右结合)

    • 向量(Vector)

    val vecl=Vector(1,2) val Vec2=3+:4+:vec1 val Vec3=vec1:+5 vec3(3)

    • 容器操作

    • 遍历

      scala //列表 def foreach[U](f:Elem=>U):Unit val list=List(1,2,3) val f=(i:Int)=>println(i) list.foreach(f) //映射 val University = Map("XMU"->"Xiamen University","THU"->"Tsinghua University","PKU"->"Peking University") University foreach{kv=>println(kv._1+kv._2)}

    • 映射

      • 一对一映射Map

      scala val Book=List("Haddop","Hive","Spark") Book.map(k=>k.toUpperCase) Book.map(k=>k.length)

      • 一对多映射flatMap

      scala val Book=List("Haddop","Hive","Spark") Book.flatMap(k=>k.toList)

    • 过滤filter

      scala val I=List(1,2,3,4,5) filter {_%2==0} val xmus = university filter {kv=>kv._2 contains "Xiamen"}

    • 归约

      • reduce

      scala reduce(f):从容器中依次取出新元素做操作生成新的元素,然后作为结果与下一次取出的元素做操作,直到全部元素取出结束。 val list=List(1,2,3,4,5) list.reduce(_+_) list.reduceLeft(_+_) list.reduceRight(_+_)

      • fold

        scala fold(初始值)(f):fold可以传入初始值进去 reduceleft和reduceright val list=List(1,2,3,4,5) list.fold(1)(_+_) list.foldLeft(1)(_+_) list.foldRight(1)(_+_)

  • 序列Sequence

    • 区间(Range):带索引的不可变数字等差序列

    scala 起点<-终点 by 步长 val r=new Range(1,5,1) 起点 until 终点 (不包含终点)

  • 集合Set:不可变集合

    var mySet=Set("Hadoop","Spark") mySet+="Scala"

  • 映射Map:一系列键值对的容器(可变)

    scala val university=Map("XMU"->"dafd") val xmu=if(university.contains("XMU"))university("XMU") else 0 university("TJU")="dafd" //更新或者添加 university2+=("TJU"->"dafd","SJU"->"wuhan")

  • 迭代器Iterator

7. 类

  • 变量定义:val 不可变变量;val 可变变量
  • 方法定义:def 方法名(参数列表):返回结果类型={方法体}

scala class Counter{ var value=0 def increment(step:Int):Unit={value+=step} def current():Int={value} } val myCounter=new Counter

类的可见性

  • public

  • protect

  • private

    • getter:value

    • setter:value_

    • 方法调用:中缀调用法,点表示法,括号表示法

  • 构造器

    scala 类名称(参数列表) 主构造器参数加val、var修饰会自动变成类内部成员 class Counter(var name:String) var mycounter=new Counter("runner") mycounter.name="Timer" 辅助构造器:约定,一个辅助构造器要先调用前一个辅助构造器或者主构造器 def this(name:String){//第一个辅助构造器 this() //调用主构造器 this.name=name } def this(name:String,step:lnt){//第二个辅助构造器 this(name) //调用第一个辅助构造器 this.step=step }

8. 对象

  • 单例对象:不需要实例化就能调用成员字段和成员方法(静态对象)

  • 伴生对象和独立对象。(有无伴生类存在,可以互相访问对方的成员变量和成员方法)

    scala object Person{ private var lastId=0 //一个人的身份编号 def newPersonId()={ lastId+=1 lastId }

  • apply方法:调用伴生对象的apply方法,不断生成实例对象。(函数式编程与面向对象编程风格融合,括号调用与点调用)

    ```scala class Car(name:String){ def info(){ println("Car name is "+name) } } object Car{ def apply(name:String)=new Car(name)//调用伴生类Car的构造方法 } object MyTestApply{ def main(args:Array[String]){ val mycar=Car("BMW")//调用伴生对象中的apply方法,区别与new生成实例 mycar.info()//输出结构为“Car name is BMW” } }

    def add=(x:Int,y:Int)=>x+y add(4,5)//采用数学界的括号调用样式 add.apply(4,5)//lambda表达式自动生成apply方法,add也是对象,采用点号形式调用apply方法 case class Book(val name:String,val price:Double)//case语法自动生成apply方法 ```

  • update方法

    scala val myStrArr=new Array[String](3)//声明一个长度为3的字符串数组,每个数组元素初始化为NULL myStrArr(0)="BigDATA"//实际上,调用了伴生类Array中的update方法,执行myStrArr.update(0,"BigDATA")

  • unapply方法

    ```scala 与apply相反操作,apply是输入参数生成对象,unapply是输入对象生成参数 object TestUnapply{

    def main(args:Array[String]){ var Car(carbrand,carprice)=Car("BMW",10000) println("brand:"+carbrand+"and carprice:"+carprice) } ```

9. 类的继承

  • 抽象类

    scala abstract class Car{ val carBrand:String//抽象字段,没有初始化,必须给出类型定义 def info()//抽象方法,没有实现 def greeting(){println("Welcome to my car!")} }

  • 扩展类(extends表示继承,override表示覆盖,如果父类是抽象的可以不用写,如果字段是var类型可以不用写)

    scala class BMWCar extends Car{ override val carBrand='BMW' def info(){ println("This is a %s car.It is expensive.\n",carBrand) } override def greeting(){ println("Welcome to my BYD car!") } }

  • 类型

    • 值类型:继承自AnyVal,无法实例化,保存在寄存器中。
    • 引用类型:继承自AnyRef,需要new实例化,保存在堆中。Null是所有AnyRef的子类型。
    • Nothing是所有类型的子类型,用来处理异常和错误。
    • Option类:抽象类,处理不确定是否有返回值的问题。
    • Some子类(有返回值)
    • None子类(没有返回值)
  • 特质:类似接口,可以定义抽象方法和具体方法,一个类可以混入多个特质。

    ```scala trait flyale{ var maxFlyHeight:Int//抽象字段 def fly()//抽象方法 def breathe(){//具体的方法 println("i can breathe") } } class Bird(flyHeight:Int) extends Flyable{ var maxFlyHeight:Int=flyHeight //重载特质的抽象字段 def fly(){ printf("I can fly at the height of %d.",maxFlyHeight) }//重载特质的抽象方法 }

    trait flyale{ var maxFlyHeight:Int//抽象字段 def fly()//抽象方法 def breathe(){//具体的方法 println("i can breathe") } } trait HasLegs{ val legs:Int//抽象字段 def move(){//具体的方法 println("I can walk with %d",legs) } } class Animal(val category:String) { def info(){ printf("This is a "+category) } } class Bird(flyHeight:Int) extends Animal("Bird") with Flyable with HasLegs{ var maxFlyHeight:Int=flyHeight //重载特质的抽象字段 var legs=2 //重载特质的抽象字段 def fly(){ printf("I can fly at the height of %d.",maxFlyHeight) }//重载特质的抽象方法 } ```

10. 模式匹配

  • match:可以添加守卫(guard)

    import scala.io.stdIn._ println("please ") country match{ case "China"=>println("中国") case "Japan"=>println("日本") case _=>println("我不认识") } //匹配变量 for (elem<-List(6,9,9.122,"Hadoop","Hello")){ val str = elem match { case _ if(elem%2==0)+>println(elem+"is even.") case i:Int => i + "is an int value."//匹配整型的值,并赋值给i case d:Double => d + "is an Double value."//匹配浮点型的值,并赋值给d case "Spark" => s + "Spark is found."//匹配特定的字符串 case _+>"unexpected value: "+elem//不匹配 } }

  • case类:自动添加许多重载的方法,自动生成伴生对象(apply方法和unapply方法)

    scala case class Person(name:String) val person=Person("dsa") //自动生成apply方法 val Person(name)=Person("ad")//自动生成unapply方法 println(name)

11. 包

12. 函数

  • 函数类型:参数类型=>返回值类型

  • 函数值参数值=>返回值

  • 定义:val counter : Int => Int = {(value) =>value+=1}

  • 匿名函数:lambda表达式

    scala (参数)=>表达式 //如果参数只有一个,参数的圆括号可以省略 val myNumFunc:Int=>Int=(num:Int)=>num*2 println(myNumFunc(3))

  • 高阶函数:函数作为函数的参数,运用了递归思想。

    ```scala def powerOfTwo(x:Int):Int={ if(x==0)0 else 2*powerOfTwo(x-1) }

    def sumSquares(a:Int,b:Int):Int={ if(a>b)0 else a*a+sumSquares(a+1,b) }

    def sumPowerOfTwo(a:Int,b:Int):Int={ if(a>)0 else powerOfTwo(a)+sumPowerOftwo(a+1,b) }

    def sum(f:Int=>Int,a:Int,b:Int):Int={ if(a>b)0 else f(a)+sum(f,a+1,b) } //高阶函数 sum(x=>x,1,5) sum(x=>x*x,1,5) ```

13. 函数式编程实例WordCount

scala import java.io.File import scala.io.Source import collection.mutable.Map object WordCount{ def main(args:Array[String]){ val dirfile=new File("test") val files=dirfile.listFiles val results=Map.empty[String,Int] for(file<-files){ val data=Source.fromFile(file) val strs=data.getLines.flatMap{k=>k.split(" ")} strs foreach{ word=>if(results.contains(word)) results(word)+=1 else results(word)=1 } } results foreach{case (k,v)=>println(s"$k:$v")} } }

14. 错误提示

  • unclosed string literal:引号引起的异常,检查是否存在单双引号不匹配,引号是否不全。

第二章 spark编程

1. 基本概念

  • RDD(弹性分布式数据集):spark基本数据抽象
  • DGA(有向无环图):由多个RDD组成的图依赖关系
  • HDFS,Amazon S3,Hbase:分布式文件系统
  • YARN,Mesos:调度资源管理器
  • Spark Core:包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。
  • Spark SQL:提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,Spark SQL查询被转换为Spark操作。
  • Spark Streaming:对实时数据流进行处理和控制。
  • MLlib:一个常用机器学习算法库,算法被实现为对RDD的Spark操作。
  • Graphx:控制图、并行图操作和计算的一组算法和工具的集合。GraphX扩展了RDD API,包含控制图、创建子图、访问路径上所有顶点的操作

2. 运行流程

  • Driver: 运行Application 的main()函数
  • Cluster Manager:在standalone模式中即为Master主节点,控制整个集群,监控worker。在YARN模式中为资源管理器
  • Worker节点:从节点,负责控制计算节点,启动Executor或者Driver
  • Executor:执行器,是为某个Application运行在worker node上的一个进程
  • 应用=》作业=》阶段=》任务
  • DGA依赖关系
    • 宽依赖:经历shuffle(洗牌),有多个父依赖
    • 窄依赖:只有一个父依赖
    • 递归算法
  • img
  • img
  • img

3. RDD操作

  • 转换操作:不会计算操作,只会记录要执行的操作(惰性机制)

    • filter:筛选出满足func的元素,并返回一个新的数据集。

    scala val lines=sc.textFile(file:///word.txt) val linesWithSpark=lines.filter(line=>line.contains("Spark"))

    • map:将每个元素传递到函数func中,并将结果返回为一个新的数据集。

    scala val data=Array(1,2,3,4,5) val rdd1=sc.parallelize(data) val rdd2=rdd1.map(x=>x+10)

    • flatmap:与map()相似,但每个输入元素都可以映射到0或多个输出结果。

    scala val lines=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") val words=lines.flatMap(line=>line.split(" "))

    • groupByKey:应用于(K,V)键值对的数据集时,返回一个新的(K,Iterable)形式的数据集。

    scala val lines=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") val words=lines.flatMap(line=>line.split(" ")) words.groupByKey()

    • reduceByKey:应用于(K,V)键值对的数据集时,返回一个新的(K,V)形式的数据集。其中每个值是将每个Key传递到func聚合后的结果。(key相同的value足会构成value-list)

    scala val lines=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") val words=lines.flatMap(line=>line.split(" ")) words.reduceByKey((a,b)=>a+b)

  • 行动操作:真正执行计算环节

    | 操作 | 含义 | | ------------- | -------------------------------------------------------- | | count() | 返回数据集中的元素个数 | | collect() | 以数组的形式返回数据集中的所有元素 | | first() | 返回数据集中的第一个元素 | | take(n) | 以数组的形式返回数据集中的前n个元素 | | reduce(func) | 通过函数func(输入连个按时并返回一个值)聚合数据集中的元素 | | foreach(func) | 将数据集中的每个元素传递到函数func中运行 |

    scala val rdd=sc.parallelize(Array(1,2,3,4,5)) rdd.count() rdd.first() rdd.take() Array[Int]=Array(1,2,3) rdd.reduce((a,b)=>a+b) rdd.collect() Array[Int]=Array(1,2,3,4,5) rdd.foreach(elem=>println(elem))

4. 持久化

  • 每一次遇到行动操作会生成一个job,从头到尾计算一次,如果需要重复操作,需要持久化缓存。

  • 命令:persist(),对一个RDD标记为持久化。

    • .persist(MEMORY_ONLY)方法:只会保存在内存中。(.cache())
    • .persist(MEMORY_AND_DISK)方法:内存和磁盘,内存不足会保存到磁盘。
    • .unpersist()方法:清空缓存。

    scala val list=List("Hadoop"."Spark","Hive") val rdd=sc.parallelize(list) rdd.cache() println(rdd.count()) println(rdd.collect().mkString(","))

5. 分区

  • 增加并行度

  • 减少通信开销

  • 设置分区

    ```scala val array=Array(1,2,3,4,5) val rdd=sc.parallelize(array,2) //设置两个分区

    val data=sc.textFile("file///usr/local/spark/mycode/rdd/word.txt",2) data.partitions.size //显示data这个RDD的分区数量,结果为2 val rdd=data.repartition(1) //对这个dataRDD进行重新分区 data.partitions.size //显示data这个RDD的分区数量 结果为1 ```

  • 自定义分区:只支持键值对

    scala import org.apache.spark.{Partitioner,SparkContext,SparkConf} //自定义分区类 class Mypartitioner (numParts:Int) extends Partitioner{ 覆盖分区数 override def numPartitions:Int = numParts //覆盖获取分区数 override def getPartition(key:Any):Int = key.toString.toInt%10 } object TestPartitioner{ def main(args:Array[String]){ val conf=new SparkConf() val sc=new SparkContext(conf) //模拟5个分区的数据 val data=sc.parallelize(1 to 10,5) //跟进尾号转换成10个分区,分别写到10个文件 data.map(_,1)。partitionBy(new Mypartitioner(10).map_._1).saveAsTextFile("file///usr/local/spark/mycode/rdd/partitioner")//只把key取出来 } }

6. 综合实例

scala val lines=sc.textFile("file///wordCount.txt") val wordCount=lines.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey((a,b)=>a+b) wordCount.collect() wordCount.foreach(word=>println(word))