liujijiang

Spark

2021.01.19

安装

spark官网

image.png

版本:2.2.0, hadoop:2.7

传到服务器解压

tar zxvf spark......

配置

配置spark/conf/spark-env 文件

1.把模板文件复制一份

cp spark-env.sh.template spark-env.sh

2.编辑

vim spark-env.sh

3.添加配置

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.272.b10-1.el7_9.x86_64

export SPARK_MASTER_HOST=node01
export SPARK_MASTER_PORT=7077

image.png

配置slaves文件

  • 配置从节点

1.复制模板

cp slaves.template slaves

2.编辑

vim slaves

3.添加workr工作从节点host

image.png

配置HistoryServer

  • 日志文件

1.复制文件

cp spark-defaults.conf.template spark-defaults.conf

2.编辑文件

vim spark-defaults.conf

3.添加配置

spark.eventLog.enabled true
spark.eventLog.dir hdfs://node01:8020/spark_log
spark.eventLog.compress true

image.png

4.配置spark-env.sh,使得HistoryServer在启动的时候读入HDFS中的日志

编辑

vim spark-env.sh

添加配置

export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://node01:8020/spark_log"

image.png

5.为spark创建hdfs日志目录

hdfs dfs -mkdir -p /spark_log

分发:

scp -r spark host:$PWD

启动集群:

sbin/start-all.sh

启动 history-server.sh

sbin/start-history-server.sh

停止:

sbin/stop-all.sh

注意:阿里云的服务器如果你主节点host用的是公网IP会报错,无法连接到主机,将master上的配置文件改成127.0.0.1

高可用配置

使用zookeeper配置master主备切换

编辑 spark-env.sh 文件,

注释 SPARK_MASTER_HOST, 添加配置

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=host1:2
181,host2:2181 -Dspark.deploy.zookeeper.dir=/spark"

配置分发给所有节点

scp -r spark-env.sh host:$PWD

启动集群

sbin/start-all.sh

在备用节点上自动备用master

sbin/start-master.sh

访问UI:host:8080

主节点显示为ALIVE,备用节点显示为STANDBY

Spark-Core

spark-shell

启动spark-shell

bin/spark-shell --master [host]

host:

spark本地程序

maven

  <dependencies>
        <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.5</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

统计词频代码

本地运行:

val conf = new SparkConf().setAppName("word_count").setMaster("local[6]")
    new SparkContext(conf).textFile("./data/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).foreach(println)

线上运行:

val conf = new SparkConf().setAppName("word_count")
    new SparkContext(conf).textFile("hdfs:///data/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).foreach(println)

spark运行jar

bin/spark-submit --class cn.redarm.Main --master spark://host:7077 original-spark-test-1.0-SNAPSHOT.jar

RDD

Resilient Distributed Datasets:弹性分布式数据集

数据集,编程模型,相互之间有依赖关系,可以分区

SparkCore入口SparkContext:连接集群,创建RDD,广播等

// Spark 配置
  val conf = new SparkConf().setMaster("local[4]").setAppName("word_count")
  // 创建SparkContext
  val sparkContext = new SparkContext(conf)

  // 三种创建RDD方式
  def getRDD(): Unit ={
    // 从本地集合创建,4:分区数
    val rdd1 = sparkContext.parallelize(Seq(1,2,3),4)
    // 读取文件创建,HDFS:HDFS:// 本地:file://
    val rdd2 = sparkContext.textFile("hdfs:///data/words.txt")
    // 从RDD中衍生 
    val rdd3 = rdd1.map("num is " + _)
  }

属性:

  • PartitionList:分片列表
  • ComputeFunction:RDD之间进行转换所执行的函数
  • Dependencies:RDD之间的依赖关系
  • Partitioner:用来计算数据所在分区的函数
  • PreferredLocation:RDD最好放在哪个位置上执行计算

RDD的算子可以分为两大类:

  1. Transformation(转换):将一个RDD的数据转换到另一个RDD,这个步骤是Lazy(懒惰)的,只有当有Action调用的时候才会进行转换步骤
  2. Action(动作):执行分区的计算任务,将结果返回到Driver中

RDD存放数据的类型:

  1. 基础类型,如String
  2. Key-Value形式的byKey算子
  3. 数字类型

转换算子:

  1. Map:一对一转换
  2. flatMap:一对多转换
  3. reduceByKey:相同key规约
  4. mapPartitions:针对一个分区的数据一对一转换
  sparkContext.textFile("data/words.txt").mapPartitions(_.map("value is "+ _)).foreach(println)

5.mapPartitionsWithIndex:带分区位置的

  sparkContext.textFile("data/words.txt").mapPartitionsWithIndex((a,b) => b.map(s"index is $a, value is " + _)).foreach(println)	

6.filter:过滤,数据清洗

sparkContext.textFile("data/words.txt").filter(_.contains("hello")).foreach(println)

7:sample:随机采样,参数:是否放回,取样比例,随机数种子

sparkContext.textFile("data/words.txt").sample(false, 0.3,3).foreach(println)

8:mapValues:对map的value进行一对一转换

sparkContext.textFile("data/words.txt").flatMap(_.split(" ")).map((_,1)).mapValues(_*2).foreach(println)

9:intersection:交集

10:union:并集

11:subtract:差集

12:groupByKey:根据key分组

13:foldByKey:可以指定初始值,作用于每一个元素

sparkContext.textFile("data/words.txt").flatMap(_.split(" ")).map((_,1)).foldByKey(10)(_+_).foreach(println)

14:join:想用key的做笛卡尔积

    var value1 = sparkContext.parallelize(Seq(("hello",1), ("hello",2), ("hello",3)))
    var value2 = sparkContext.parallelize(Seq(("hello",1), ("hello2",2), ("hello3",4)))
    value1.join(value2).foreach(println)

15:sortBy:排序,参数:指定按照哪个值进行排序

sparkContext.textFile("data/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2).foreach(println)

16:sortByKey:按照Key进行排序,ascending = false:降序排序

17:repartition:重新指定分区数

18:coalesce:减少几个分区

Auction算子:

1:reduce:针对于所有类型的规约操作,结果为一条数据,两个参数:每个数据,当前数据结果

println(sparkContext.parallelize(Seq(1, 2, 3, 4, 5)).reduce(_ + _))

2:collect:以数组的形式返回所有元素

3:count:返回元素个数

4:first:返回第一个元素

5:take:返回前n个元素

6:saveAsTextFile:存入指定路径的文件中

7:countByKey:返回key以及key的次数

8:foreach:遍历

数字类型的支持:

  1. count:个数

  2. mean:均值

  3. sum:求和

  4. max:最大值

  5. min:最小值

  6. variance:方差

  7. sampleVariance:采样中计算方差

  8. stdev:标准差

  9. sampleStdev:采样的标准差

闭包:

函数返回值是函数,返回的函数中用到了当前函数的变量

def main(args: Array[String]): Unit = {
    println(fun()(3))
  }

  def fun() ={
    val pi = 3.14
    (r:Double) => 2 * pi * r
  }

spark的闭包分发:

spark的操作函数会分发到每个Executor执行,如果函数是闭包,则函数用到的变量所在的类也要分发到每个Executor,类需要支持序列化

def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("word_count")
    val sparkContext = new SparkContext(conf)
    // 执行map的时候,函数分发到每个Executor,函数中用到了pi,所有pi也会分发到每个Executor
    sparkContext.parallelize(Seq(1,2,3)).map(fun()(_)).foreach(println)
  }

  def fun() ={
    val pi = 3.14
    (r:Double) => 2 * pi * r
  }

全局累加器:

  1. 默认的数值型累加器

        // 默认的数值类型累加器
        var count = sparkContext.longAccumulator("counter")
        sparkContext.parallelize(Seq(1,2,3)).foreach(count.add(_))
        println(count.value)
    
  2. 自定义累加器

// 自定义的累加器,累加字符串
class MyAccumulator extends AccumulatorV2[String, List[String]] {
  private val data:ListBuffer[String] = ListBuffer[String]()
  override def isZero: Boolean = {
    data.isEmpty
  }

  override def copy(): AccumulatorV2[String, List[String]] = {
    val newAccumulator = new MyAccumulator()
    data.synchronized {
      newAccumulator.data ++= this.data
    }
    newAccumulator
  }

  override def reset(): Unit = {
    data.clear()
  }

  override def add(v: String): Unit = {
    data += v
  }

  override def merge(other: AccumulatorV2[String, List[String]]): Unit = {
    data ++= other.value
  }

  override def value: List[String] = {
    data.toList
  }
}
// 自定义的累加器
    val myAccumulator = new MyAccumulator()
    sparkContext.register(myAccumulator, "myAcc")
    sparkContext.parallelize(Seq("a","b","c")).foreach(myAccumulator.add(_))
    println(myAccumulator.value)

广播变量:

提前向Executor中分发数据,防止当Task比Executor多的时候数据对Executor的重复分发

val map = Map[String, String]("name" -> "liu", "age" -> "13", "height" -> "180")
    val conf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("word_count")
    val sparkContext = new SparkContext(conf)
    // 创建广播
    val bc = sparkContext.broadcast(map)
    sparkContext.parallelize(Seq("name", "age", "height")).map(bc.value.get(_)).foreach(println(_))

Spark-SQL

数据集:

  1. 结构化数据集:MySQL的一张表
  2. 半结构化数据集:JSON,没有约束
  3. 非结构化数据集:图片,音频,视频

RDD主要用于处理非结构化数据和半结构化数据

Spark-SQL主要用于处理结构化数据

maven:

				<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

通过DataSet操作:命令式API,可以通过列名操作

object Main {
  def main(args: Array[String]): Unit = {
    val spark = new SparkSession.Builder()
      .appName("sql")
      .master("local[3]")
      .getOrCreate()
    import spark.implicits._
    val value = spark.sparkContext.parallelize(Seq(Person("liu", 12), Person("ji", 15), Person("jiang", 14)))
      .toDS()
      .where('age > 14)
      .select('name)
      .orderBy('name)
      .as[String]
    value.show()
  }
}
case class Person(name:String, age:Int)

通过DataFrame:声明式API

val value = spark.sparkContext.parallelize(Seq(Person("liu", 12), Person("ji", 15), Person("jiang", 14)))
      .toDF()
      .createOrReplaceTempView("person")
    // 执行SQL语句
      spark.sql("select name, age from person where age > 10 and age < 14").show()

因为Catalyst优化器的存在,使得Spark-SQL执行效率比RDD高

Spark-Streaming

Spark-MLlib