Spark是一个快速、分布式、可扩展(随时可以进行节点的扩充)、容错(节点宕机了。那么它可以重新构建恢复这个数据)的集群计算框架。低延迟的复杂分析,因为Spark的低延迟,延迟低是因为Spark是在内存里面计算的。(Spark已经成为Apache软件基金会旗下的顶级开源项目)
MapReudce不适合迭代和交互式任务,Spark主要为交互式查询和迭代算法设计,支持内存存储和高效的容错恢复。Spark拥有MapReduce具有的优点,但不同于MapReduce,Spark中间输出结果可以保存在内存中,减少读写HDFS的次数。
Scala是一门多范式的编程语言,设计初衷是要集成面向对象编程和函数式编程的各种特性。因此Scala是一种纯面向对象的语言,每个值都是对象。同时Scala也是一种函数式编程语言,其函数也能当成值来使用。由于Scala整合了面向对象语言和函数式编程的特性,Scala相对于Java、C#、C++等其他语言更加简洁。Scala源代码被编译成Java字节码,所以它可以运行于JVM之上,并可以调用现有的Java类库。Scala一开始就打算基于Java的生态系统发展自身,而这令Scala受益匪浅。
数据类型 | 描述 |
---|---|
Byte | 8位有符号补码整数。数值区间为 -128 到 127 |
Short | 16位有符号补码整数。数值区间为 -32768 到 32767 |
Int | 32位有符号补码整数。数值区间为 - 到 。 |
Long | 64位有符号补码整数。数值区间为 - 到 |
Float | 32 位, IEEE 754 标准的单精度浮点数。浮点数后面有f或者F后缀时,表示这是一个Float类型 |
Double | 64 位 IEEE 754 标准的双精度浮点数。浮点数后没有F或f的,是Double类型 |
Char | 16位无符号Unicode字符, 区间值为 U+0000 到 U+FFFF |
String | 字符序列,即字符串,其用法是用双引号包含一系列字符。 |
Boolean | 只能保存两个特殊的值,ture和flase。 |
Unit | 在Java中创建一个方法时经常用void表示该方法无返回值,而Scala中没有void关键字,Scala中用Unit表示无值,等同于Java中的void。 |
Null | null 或空引用 |
Nothing | Nothing类型在Scala的类层级的最底端;它是任何其他类型的子类型。 |
Any | Any是所有其他类的超类 |
AnyRef | AnyRef类是Scala里所有引用类(reference class)的基类 |
Range | 是一种数据范围或序列, 支持Range的类型包括Int、Long、Float、Double、Char、BigInt和BigDecimal。 |
var变量:可以在它的声明周期中被多次赋值
val常量:一旦初始化了,val就不能再被赋值
注意:val 对并发或分布式编程很有好处
if (表达式) {
//代码块 } else if(表达式){
//代码块 }else{
//代码块 }
(1)定长数组
定义定长数组的两种方式:
操作定长数组:
(2)变长数组(即长度可变的数组)
定义变长数组之前需要先导入包:
import scala.collection.mutable.ArrayBuffer
定义变长数组:
val bufArr = new ArrayBuffer[Int]()
操作变长数组:定长数组的操作均可以使用
(3)数组其他常用的方法:
需要导入包:import Array._
var arr1 = Array(1,2) var arr2 = Array(3,4) var arr3 = concat(arr1,arr2) // 结果(1,2,3,4)
val arr:Array[Int]=fill(3)(2)
val arr1:Array[Int]=ofDim(3) val arr2:Array[Array[Int]]=ofDim(3,3) //二维数组
val arr3:Array[Int]=range(1,10,2) val arr4:Array[Int]=range(1,10)
连接数组:连接两个数组既可以使用操作符++
,也可以使用concat方法。但是使用concat方法需要先使用import Array._
引入包。
创建多维数组:
var arrInt = Array(Array(2,3),Array(5,6))
方法和函数:二者在语义上的区别很小。Scala 方法是类的一部分,而函数是一个对象可以赋值给一个变量。换句话来说在类中定义的函数即是方法。
使用 val 语句可以定义函数,def 语句定义方法。
(1)方法
方法的定义方式:
object Test {
//方法 def addInt( a:Int, b:Int) :Int = {
var sum:Int =0 sum = a + b return sum } } //调用方法形式1:带类名调用 Test.addInt(1,2)
//既不带参数也不返回结果 def printMe() : Unit = {
println("Hello, Scala!") } //调用方法形式2:直接调用 printMe()
方法的参数:
def printStrings( args:String*) = {
var i :Int = 0; for (arg <- args){
println("arg value[" + i + "] = " + arg); i += 1; } } printStrings("abc", "cd")
def addInt ( a:Int=5, b:Int=7) :Int = {
var sum:Int = 0 sum = a+b return sum }
(2)函数
函数:函数是一个对象可以赋值给一个变量,使用val进行定义。函数可作为一个参数传入到方法中,而方法不行。
函数的定义:
val f = (x:Int,y:Int) => {
x + y x + y + 10 }
(1)while循环
var a = 10; while( a < 20 ){
println( "Value of a: " + a ); a = a + 1; }
注意:++i和i++在Scala里不起作用,要在得到同样效果,必须要么写成i=i+1,要么写成i+=1
(2)do while循环
var a = 10; do{
println( "Value of a: " + a ); a = a + 1; }while( a < 20 )
(3)for循环
var a = 0; for( a <- 1 to 10){
println( "Value of a: " + a ); } //----等同于 for( a <- 1 until 10){
println( "Value of a: " + a ); }
注意:左箭头<-
用于为变量 a 赋值。
for循环和数组:
var a = 0; val numList = Array(1,2,3,4,5,6); for( a <- numList ){
println( "Value of a: " + a ); }
for循环和过滤
语法:
for( var x <- List if condition1; if condition2... ){
statement(s); }
示例:
for( var x <- List if condition1; if condition2... ){
statement(s); }
for循环和yield
语法:将 for 循环的返回值作为一个变量存储
for( var x <- List if condition1; if condition2... ){
statement(s); }
示例:
for( var x <- List if condition1; if condition2... ){
statement(s); }
(4)foreach方法
for( var x <- List if condition1; if condition2... ){
statement(s); }
定义列表:
方式1:
val list:List[String] = List("baidu", "google")
方式2:Nil 代表一个空,最后一个必须为Nil ,否则会报错
val list:List[Int] = 1::2::3::Nil
操作列表:
::
:用于连接两个列表:::
:用于连接两个列表concat
:用于连接两个列表val num:List[Int] = List(1,2,3,4,5) val list = num.filter(x => x%2==0) //List(2, 4)
val num:List[Int] = List(1,2,3,4,5) val list = num.map(x => x*2) //List(2, 4, 6, 8, 10)
val num:List[Int] = List(1,2,3,4,5) val list = num.mkString(",") //1,2,3,4,5
val num:List[Int] = List(1,2,3,4,5) val list = num.foreach(x=>println(x)) //函数 //---代码块--- val list = num.foreach(x=>{
println(x) })
使用可变列表首先要导入:
import scala.collection.mutable.ListBuffer
定义可变列表:
val list = ListBuffer[Int](1, 2, 3)
操作可变列表:
+=
:list += 4
append(data)
:list.append(4)
remove(index)
:list.remove(0)
删除下标为0的元素默认情况下,Scala 使用的是不可变集合,如果你想使用可变集合,需要引用 scala.collection.mutable.Set
包。Set是没有重复的对象集合,所有的元素都是唯一的。
定义集合:
val set = Set(1,2,3,3) // 元素唯一性 val set = Set(1,2,3) //二者结果相同
操作集合:
val set = Set(1,2,3,3) set.foreach(x => {
println(x)} )
add(data)
+=
remove(data)
-=
++
set1.++(set2)
val set1 = Set(1,2,3,4) val set2 = Set(3,4,5,6) val set3 = set1 ++ set2 val set4 = set1.++(set2)
set1.&(set2)
set.intersect(set2)
val set1 = Set(1,2,3,4) val set2 = Set(3,4,5,6) val set5 = set1.&(set2) val set6 = set1.intersect(set2)
set.max
set.min
Map(映射)是一种可迭代的键值对(key/value)结构。所有的值都可以通过键来获取。
同样的 Map 有两种类型,可变与不可变,区别在于可变对象可以修改它,而不可变对象不可以。
默认情况下 Scala 使用不可变 Map。如果你需要使用可变集合,你需要显式的引入。
import scala.collection.mutable.Map
定义 Map 映射:
val map1 = Map("a" -> 1, "b" ->2, "c" ->3)
操作 Map 映射:
val map = Map("a" -> 1, "b" ->2, "c" ->3) println(map.keys) //Set(a, b, c) println(map.values) //MapLike.DefaultValuesIterable(1, 2, 3) println(map.isEmpty) //false map.foreach(x=>println(x))
+=
val map1 = Map("a" -> 1, "b" ->2, "c" ->3) map1 += ("d" -> 4) var map = Map[String,Int]("a" -> 1,"b" -> 2) //引用可变,支持读写操作; map += ("c" -> 3) //新增 println(map) val map2 = Map[String,Int]("a" -> 1,"b" -> 2) //引用不可变,只能第一次写入值,之后只能读取; map2 += ("c" -> 3) //此时不能加,直接报错; val map3 = scala.collection.mutable.Map[String,String]() //引用不可变,支持读写操作; Map3 += (“c” -> 3)
++
map.++()
val map1 = Map("a" -> 1, "b" ->2, "c" ->3) val map2 = Map("c" -> 4, "d" ->5) val map3 = map1 ++ map2 val map4 = map1.++(map2)
与列表一样,元组也是不可变的,但与列表不同的是元组可以包含不同类型的元素。
Scala 支持的元组最大长度为 22。
定义 Tuple 元组:元组的值是通过将单个的值包含在圆括号中构成的。
val t = (1, 3.14, "Fred") val t = new Tuple3(1,2,"string")
使用t._1
访问第一个元素, t._2
访问第二个元素:
val t = (1, 3.14, "Fred") println(t._1) //1 println(t._2) //3.14 println(t._3) //"Fred"
补充:Scala Option(选项)类型用来表示一个值是可选的(有值或无值)。
Scala Iterator(迭代器)不是一个集合,它是一种用于访问集合的方法。迭代器的两个基本操作是 next
和 hasNext
。
val map = Map("a" -> 1, "b" ->2, "c" ->3) val iterator = map.iterator while (iterator.hasNext){
val ele = iterator.next() println("key:" + ele._1 + ";value:" + ele._2) }
map:通过一个函数重新计算列表中所有元素,并且返回一个相同数目元素的新列表。
foreach:foreach没有返回值,foreach只是为了对参数进行作用。
filter:过滤移除使得传入的函数的返回值为false的元素。
val num:List[Int] = List(1,2,3,4,5) num.map(x => x*2) num.foreach(x => println(x*x + "\t")) num.filter(x => x%2 ==0)
val list = List(List(1,2,3), List(3,4,5)) var newlist = list.flatten //List(1, 2, 3, 3, 4, 5)
val list:List[List[Int]] = List(List(1,2,3), List(3,4,5)) var newlist = list.flatMap(x => x.map(_*2)) //List(2, 4, 6, 6, 8, 10)
val intList:List[Int] = List(1,2,3,4,5,6) var newlist = intList.groupBy(x=>x%2==0) //结果: Map(false -> List(1, 3, 5), true -> List(2, 4, 6))
没有指定访问修饰符,默认情况下,访问级别都是 public
class Outer{
class Inner{
private def f(){
println("f")} class InnerMost{
f() } } (new Inner).f() //error }
class Outer {
class Inner {
def f() {
println("f") } class InnerMost {
f() // correct } } (new Inner).f() // correct }
Scala是一种纯粹的面向对象语言,两个重要的概念:类和对象。类是对象的抽象,也可以把类理解为模板,对象才是真正的实体。Scala中的类不声明为public。Scala 的类定义可以有参数,称为类参数;类参数在整个类中都可以访问。
类的使用:
class Point(xc: Int, yc: Int) {
var x: Int = xc var y: Int = yc def move(dx: Int, dy: Int) {
x = x + dx y = y + dy println ("x : " + x); println ("y : " + y); } }
一个Scala源文件中可以有多个类。可以使用 new 来实例化对象,并访问类中的方法和变量
//scala文件 import java.io._ class Point(xc: Int, yc: Int) {
var x: Int = xc var y: Int = yc def move(dx: Int, dy: Int) {
x = x + dx y = y + dy println ("x 的坐标点: " + x); println ("y 的坐标点: " + y); } } object Test {
def main(args: Array[String]) {
val pt = new Point(10, 20); // 移到一个新的位置 pt.move(10, 10); } }
类的继承:Scala 使用 extends 关键字来继承一个类,子类会继承父类的所有属性和方法,Scala 只允许继承一个父类。
Scala继承一个基类跟Java很相似, 但我们需要注意以下几点:
1、重写一个非抽象方法必须使用override修饰符。
2、只有主构造函数才可以往基类的构造函数里写参数。
3、在子类中重写超类的抽象方法时,你不需要使用override关键字
class Location(val xc: Int, val yc: Int, val zc :Int) extends Point(xc, yc){
var z: Int = zc override def move(dx: Int, dy: Int) {
x = x + dx + 100 y = y + dy + 100 println ("x location : " + x); println ("y location : " + y); } def move(dx: Int, dy: Int, dz: Int) {
x = x + dx y = y + dy z = z + dz println ("x : " + x); println ("y : " + y); println ("z : " + z); } } object Test {
def main(args: Array[String]) {
val loc = new Location(10, 20, 15); loc.move(10, 10); } }
子类继承父类中已经实现的方法需要使用关键字override
,子类继承父类中未实现的方法可以不用override
关键字。
//Cat Animal 父子类方法的覆盖 //使用eat方法,使用{}包容代码,在方法里使用println abstract class Animal {
def showName(str:String)={
println("animal")} def eat(food:String) } class Cat extends Animal {
override def showName(str:String)={
println("cat")} //子类继承父类中已经实现的方法需要使用关键字`override` def eat(food:String) = {
println("fish")}//子类继承父类中未实现的方法可以不用`override`关键字。 }
单例对象:在整个程序中只有这么一个实例。Scala中没有static关键字,因此Scala的类中不存在静态成员。 Scala中使用单例模式时需要使用object定义一个单例对象。object 对象与类的区别在于object 对象不能带参数。包含 main 方法的 object 对象可以作为程序的入口点。
伴生对象:是一种特殊的单例对象,是一种相对概念。需要两个条件:
类和伴生对象之间可以相互访问私有的方法和属性。
构造器:
构造器分为两类:主构造器(只有一个),辅助构造器(有多个)
主构造器直接在类名后面定义,每个类都有主构造器,主构造器的参数直接放在类名后,与类交织在一起
如果没有定义构造器,类会有一个默认的空参构造器
class Point(xc: Int, yc: Int) {
//主构造器 var x: Int = xc var y: Int = yc def move(dx: Int, dy: Int) {
x = x + dx y = y + dy println ("x 的坐标点: " + x); println ("y 的坐标点: " + y); } }
辅助构造器定义,使用def this,必须调用主构造器,或者其他构造器。
class Person {
//空参构造器 var name:String = "Tom" var sex:String = "man" val age:Int =18 println("main constructor") def this(name:String){
//辅助构造器 this() println("name constructor") } def this(name:String, sex:String) {
this(name) println("name sex construtor") } }
注意:
任何Spark程序都是以SparkContext对象开始的,因为SparkContext是Spark应用程序的上下文和入口,都是通过SparkContext对象的实例来创建RDD。因此在实际Spark应用程序的开发中,在main方法中需要创建SparkContext对象,作为Spark应用程序的入口,并在Spark程序结束时关闭SparkContext对象。
通过SparkConf对象设置集群配置的各种参数。
val conf = new SparkConf().setMaster("local").setAppName("appName") var sc = new SparkContext(conf)
在另外一台机器上启动一个管理所有节点以及存储在上面数据块的服务。
hadoop fs –ls /user/hadoop-twq
hadoop fs -copyFromLocal word.txt /users/hadoop-twq
hadoop fs -get /users/hadoop-twq/word.txt
hadoop fs -setrep 2 /users/hadoop-twq/word.txt
hadoop fs -rm /users/hadoop-twq/word.txt
简介:在每一个block所在的机器针对block数据进行计算,将结果汇总到计算master
原则: 移动计算而尽可能少的移动数据
含义: 其实就是将单台机器上的计算扩展到多台机器上进行并行计算
特点:就近计算,计算分摊到每个节点,基于性质相同的每个数据块,同时使用相同方法来计算。
基本结构:计算主节点和资源主节点
文件的每一个block就是一个分区,当然我们也可以设置成2个block一个分区,对于key-value类型的数据的分区。我们可以根据key按照某种规则来进行分区,比如按照key的hash值来分区。
可能会发生,但是需要尽量避免,我们需要遵循移动计算而不移动数据的原则。每一个数据块都包含了它所在的机器的信息,我们需要根据这个数据块所在的机器,然后将计算任务发送到对应的机器上来执行,这个就是计算任务的本地性。
不是,数据的存储只有一份,就是一开始的数据存储,在 shuffle 的时候会有中间临时数据的存储。
MapReduce是基于磁盘的。
不完全基于内存。spark的shuffle中间结果也是需要写文件的,只是对内存的利用比较充分而已。
复用场景:一种是迭代式计算应用;一种是交互型数据挖掘应用
存储方式:hadoop(磁盘)Spark(内存)
将中间结果放到内存中,充分发挥内存的读写效率,避免重复性的工作一遍遍浪费CPU。
(1)RDD定义: RDD叫做弹性分布式数据集 ,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。
(2)使用RDD的驱动力:
注意:RDD一旦创建就不可更改。重建不是从最开始的点来重建的,可以是上一步开始重建
(3)RDD特点:
(4)RDD两种算子:
(5)RDD的宽依赖与窄依赖的概念以及各自的方法?
(5)RDD的创建方式?
RDD的转换和行动操作都需要创建 SparkContext 执行上下文,下面示例中的 sc 来自这里:
val conf = new SparkConf().setMaster("local").setAppName("appName") var sc = new SparkContext(conf)
val rdd_arr = sc.parallelize(Array("b", "a", "c")) val rdd_map = rdd_arr.map(x => (x,1)) //rdd.collect(): 将RDD类型的数据转化为数组 println(rdd_arr.collect().mkString(", ")) // b, a, c println(rdd_map.collect().mkString(", ")) //(b,1), (a,1), (c,1)
//使用flatMap对多个集合中的每个元素进行操作再扁平化 val data = sc.parallelize(List("I am learning Spark", "I like Spark")) val res = data.flatMap(x => x.split(" ")) println(res.collect.mkString(", ")) //I, am, learning, Spark, I, like, Spark //对一个集合中的元素进行扩充 val arr = sc.parallelize(Array(1,2,3)) val newArr = arr.flatMap(n => Array(n, n*100, 42)) println(arr.collect().mkString(", ")) //1, 2, 3 println(newArr.collect().mkString(", ")) //1, 100, 42, 2, 200, 42, 3, 300, 42
val rdd_filter = sc.parallelize(Array(1,2,3)).filter(n => n>2) println(rdd_filter.collect().mkString(", ")) //3
val arr = sc.parallelize(Array(1,2,3,3,4)) val newArr = arr.distinct() println(newArr.collect().mkString(", ")) //4, 1, 3, 2
val arr = sc.parallelize(Array("John", "Fred", "Anna", "James")) val newArr = arr.groupBy(w => w.charAt(0)) println(newArr.collect().mkString(", ")) //(A,CompactBuffer(Anna)), (J,CompactBuffer(John, James)), (F,CompactBuffer(Fred))
val words = Array("one", "two", "two", "three", "three", "three") val wordRDD = sc.parallelize(words).map(word => (word, 1)) val wordCountsWithReduce = wordRDD .reduceByKey(_ + _) .collect() val wordCountsWithGroup = wordRDD .groupByKey() .map(t => (t._1, t._2.sum)) .collect() println(wordCountsWithReduce.mkString(", ")) //(two,2), (one,1), (three,3) println(wordCountsWithGroup.mkString(", ")) //(two,2), (one,1), (three,3)
reduceByKey和groupByKey区别?
reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义,当采用reduceByKey时,Spark可以在每个分区移动数据之前将待输出数据与一个共用的key结合
groupByKey也是对每个key进行操作,但只生成一个sequence,groupByKey本身不能自定义函数,需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作,当采用groupByKey时,由于它不接收函数,Spark只能先将所有的键值对(key-value pair)都移动,这样的后果是集群节点之间的开销很大,导致传输延时。
val arr = sc.parallelize(Array(('J',"James"),('F',"Fred"),('A',"Anna"),('J',"John"))) val keyCount = arr.countByKey() println(keyCount) //Map(A -> 1, J -> 2, F -> 1)
val arr = sc.parallelize(Array(1,2,3,4)) val res = arr.reduce((a,b) => a+b) println(arr.collect.mkString(", ")) //1, 2, 3, 4 println(res) //10
rdd.saveAsTextFile("file:///f:/output-test.txt")
(1)RDD分区定义:RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。
(2)使用RDD分区的优点:
(3)能从spark分区中获取的操作有:cogroup()
、groupWith()
、join()
、leftOuterJoin()
、rightOuterJoin()
、groupByKey()
、reduceByKey()
、combineByKey()
以及lookup()
。
(4)RDD分区原则:使得分区的个数尽量等于集群中的CPU核心(core)数目
(5)如何手动设置分区:
(6)RDD惰性机制:执行“动作”类型操作时,才是真正的计算。
(7)RDD的持久化:为了避免这种重复计算的开销,可以使用persist()方法对一个RDD标记为持久化。
(1)概念:键值对RDD由一组组的键值对组成,这些RDD被称为PairRDD。PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口,虽然大部分Spark的RDD操作都支持所有种类的单值RDD,但是有少部分特殊的操作只能作用于键值对类型的RDD。
(2)创建方式:从文件中加载,通过并行集合(数组)创建RDD
(3)键值对常用转换操作的方法及其作用:
内容详见我的另一篇博客,传送门。
内容详见我的另一篇博客,传送门。