初识Spark

达芬奇密码2018-06-28 18:41
        Spark使用Scala语言进行实现,它是一种面向对象、函数式编程语言,能够像操作本地集合对象一样轻松地操作分布式数据集(Scala 提供一个称为 Actor 的并行模型,其中Actor通过它的收件箱来发送和接收非同步信息而不是共享数据,该方式被称为:Shared Nothing 模型)。在Spark官网上介绍,它具有运行速度快、易用性好、通用性强和随处运行等特点。

Spark集群搭建
         Spark集群搭建模式支持多种方式。比如Standalone Cluster、YARN Cluster、Mesos Cluster等方式,本文主要介绍使用Standalone Cluster模式搭建Spark集群。
一、Scala配置
         从官网下载Scala-2.11.8安装包。下载解压后,配置SCALA_HOME为Scala的解压目录。如下所示,则说明Scala配置成功。
二、Spark配置
        从官网下载Spark-2.0.1安装包。 下载解压后,配置SPARK_HOME为Spark的解压目录。
配置log4j.properties文件
# Set everything to be logged to the console
log4j.rootCategory=INFO, console,FILE
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

log4j.appender.FILE=org.apache.log4j.DailyRollingFileAppender
log4j.appender.FILE.Threshold=DEBUG
log4j.appender.FILE.file=/home/logs/spark.log
log4j.appender.FILE.DatePattern='.'yyyy-MM-dd
log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.FILE.layout.ConversionPattern=[%-5p] [%d{yyyy-MM-dd HH:mm:ss}] [%C{1}:%M:%L] %m%n

配置spark-env.sh

export SPARK_MASTER_IP=SparkIp1  #实际按照机器配置
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=1
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=1g
配置slaves文件 
 SparkIp1
SparkIp2
SparkIp3
启动master机器
sbin/start-master.sh
启动slave机器
sbin/start-slave.sh SparkIp1:7077 #这里需要每台slave机器都指定master
三、Spark监控
       当所有机器都配置启动后,可以访问Spark的UI界面查看具体Worker和执行任务。
          可以看到机器的master节点,具体worker节点以及节点各个参数配置

Spark总体架构
一、Spark生态架构    
    Spark提供了多种高级工具:SparkSQL、SparkStream、SparkMLlib、SparkGraphx等
    Spark可以基于自带的Standalone集群管理器独立运行,也可以部署在Apache Mesos和Hadoop YARN等集群管理器上运行
    Spark可以访问存储在HDFS、HBase、Cassandra、S3、本地文件系统等等上的数据,Spark支持文本文件,序列文件,以及任何Hadoop的InputFormat
二、Spark运行模型            
Driver Program:Spark中的Driver即运行上述Application的main()函数并且创建SparkContext,SparkContext负责和ClusterManager通信,进行资源的申请、任务的分配和监控等;当Executor部分运行完毕后,Driver负责将SparkContext关闭。通常用SparkContext代表Driver.
Executor:Application运行在Worker节点上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上,每个Application都有各自独立的一批Executor.
Worker:集群中任何可以运行Application代码的节点,类似于YARN中的NodeManager节点。在Standalone模式中指的就是通过Slave文件配置的Worker节点,在Spark on Yarn模式中指的就是NodeManager节点.
Job:包含多个Task组成的并行计算,往往由Spark Action催生,一个Job包含多个RDD及作用于相应RDD上的各种Operation.
Stage:每个Job会被拆分很多组Task,每组任务被称为Stage,也可称TaskSet,一个作业分为多个阶段.
Task:被送到某个Executor上的工作任务
三、Spark数据基础
       RDD是Spark的基石,它是弹性分布式数据集,是只读的分区记录集合。
var file = sc.textFile("hdfs://127.0.0.1:9000/log/stdout.log")
var errors = file.filter(line=>line.contains("ERROR"))
errors.count()
改程序是从HDFS文件里面读取日志,统计错误日志行数。可以通过下图来看下具体RDD转化关系。
         
    每一次对RDD的操作都造成了RDD的变换。其中RDD的每个逻辑分区Partition都对应于Block Manager(物理存储管理器)中的物理数据块Block(保存在内存或硬盘上)。RDD是应用程序中核心的元数据结构,其中保存了逻辑分区与物理数据块之间的映射关系。另外,还保存了父辈RDD的依赖转换关系。

Spark核心模块
一、SparkSQL
1、框架
       SparkSQL是Spark的一个模块,专门用于处理结构化数据。支持我们通过SQL或者Hive查询语言来查询数据。其大致执行过程如下图所示:                         
       Spark从不同数据源获取数据,通过定义的Sql Scheme信息组装成Dataframe流数据。Catalyst翻译Dataframe为最终的Spark程序。执行输出结果。
2、SparkSQL编码
      SparkSQL可以从JSON,HDFS,HIVE等数据源读取数据,转化成RDD。下面是SparkSQL读取HDFS文件数据执行SQL案例
/**
    * @param poi_id poi序号
    * @param poi_title 标题
    * @param poi_address 地点
    * @param poi_lon 经度
    * @param poi_lat 维度
    * @param poi_city 城市代码号
    * @param poi_category_name poi类型代号
    * @param poi_checkin_num poi签到次数
    * @param poi_photo_num poi签到照片数量
    */
  case class Poi(poi_id: String, poi_title: String, poi_address: String, poi_lon: Double, poi_lat: Double,
                 poi_city: String, poi_category_name: String, poi_checkin_num: Long, poi_photo_num: Long)

  def main(args: Array[String]): Unit = {

    System.setProperty("hadoop.home.dir", "E:\\tools\\hadoop\\hadoop-2.7.2")
    val warehouseLocation = "file:///"

    val spark = SparkSession.builder().master("local").appName("Spark SQL").config("spark.sql.warehouse.dir", warehouseLocation).getOrCreate()

    val provinceVec = Vector("anhui.csv","beijing.csv")

    provinceVec.foreach((province:String) => runInferSchema(spark, province))

  }

  private def runInferSchema(spark: SparkSession, file: String): Unit = {

    println("====================================" + file + "====================================")

    import spark.implicits._

    val poiDF = spark.sparkContext
      .textFile("hdfs://127.0.0.1:9000/weibo_data/" + file)
      .map(_.split(",")).filter(line => line.length==9)
      .map(attr => Poi(attr(0).trim(), attr(1).trim(), attr(2).trim(), attr(3).trim().toDouble, attr(4).trim().toDouble, attr(5).trim(),
          attr(6).trim(), attr(7).trim().toLong, attr(8).trim().toLong))
      .toDF()

    poiDF.createOrReplaceTempView("poi")

    //poi数量,签到数量,照片数量统计
    val sql_sum_result = spark.sql("select count(*) as sum_poi_count, sum(poi_checkin_num) as sum_poi_checkin_num, sum(poi_photo_num) as sum_poi_photo_num from poi")
    //sql_sum_result.show(1)

    //各个地区签到数量统计
    val sql_group_by_checkin_num = spark.sql("select sum(poi_checkin_num) as sum_poi_checkin_num, poi_city from poi group by poi_city")
    //sql_group_by_checkin_num.show()

    //热门签到地点统计
    val sql_group_by_poi_address_num = spark.sql("select count(*) as count, poi_address from poi group by poi_address")
    //sql_group_by_poi_address_num.show(5)

    //签到类型统计
    val sql_group_by_poi_category_name_num = spark.sql("select count(*) as count, poi_category_name from poi group by poi_category_name")
    //sql_group_by_poi_category_name_num.show(5)

    //签到位置统计
    val sql_group_by_location_num = spark.sql("select count(*) as count, poi_lon, poi_lat from poi group by poi_lon, poi_lat")
    sql_group_by_location_num.show(5)

  }
二、SparkStream
1、基本流程
        SparkStream是建立在Spark上的实时计算框架,通过它提供的丰富的API、基于内存的高速执行引擎,用户可以结合流式操作进行批处理和交互式查询应用。其基本原理是将流数据分成小的时间片段,以类似批处理方式来处理小数据。其大致路程图如下所示                                       
         SparkStream从Kafka、Flume、HDFS等数据源读取数据。SparkStream将Input Stream数据分片成很多小的数据流进行批量数据处理,最终得到数据集。
2、SparkStream编码
   该程序是每20秒从D:\\log目录读取文件,进行wordCount计算。
      var sparkConf = new SparkConf().setAppName("SparkStreamReadFile").setMaster("local[2]")
      var sc = new StreamingContext(sparkConf, Seconds(20))

      var lines = sc.textFileStream("D:\\log\\")
      var words = lines.flatMap(_.split(" "))
      var wordConts = words.map(x => (x,1)).reduceByKey(_ + _)

      wordConts.print()
      sc.start()
      sc.awaitTermination()
    当我们手动向该目录增加文件时,我们可以从Spark日志中获取一下数据。  
        -------------------------------------------
               Time: 1498705520000 ms
        -------------------------------------------
               (a,4)    //增加a文件,存储4个a字母
        -------------------------------------------
               Time: 1498705540000 ms
        -------------------------------------------
               (b,4)    //增加b文件,存储4个b字母
        -------------------------------------------
               Time: 1498705560000 ms
        -------------------------------------------
               (c,4)    //增加c文件,存储4个c字母
 SparkStream也有其他运行场景,比如:有无状态,Window操作等。
三、SparkMLlib
1、基本架构
        SparkMLlib是Spark对常用的机器学习算法的实现库,同时包括相关的测试和数据生成器。MLlib目前包括四种常见的机器学习问题:二元分类,回归,聚类以及协同过滤,同时也包括了一个底层的梯度下降优化基础算法。
                                        
                                               底层基础:包括Spark的运行库、矩阵库和向量库;
                                               算法库:包含广义线性模型、推荐系统、聚类、决策树和评估的算法;
                                               实用程序:包括测试数据的生成、外部数据的读入等功能。
2、SparkMLlib编码
   该程序是用投掷法来模拟π的数值,代码如下:
    System.setProperty("hadoop.home.dir", "E:\\tools\\hadoop\\hadoop-2.7.2")
    val conf = new SparkConf().setAppName("SparkFile").setMaster("local")
    var sc = new SparkContext(conf)

    var NUM_SAMPLES = 10000000;

    val count = sc.parallelize(1 to NUM_SAMPLES).filter { _ =>
      val x = math.random
      val y = math.random
      x*x + y*y <= 1
    }.count()

    println(s"Pi is roughly ${4.0 * count / NUM_SAMPLES}")

    sc.stop()

   通过多次模拟可知,该数值几乎趋于真实计算数值。


Spark学习资源

          Spark技术社区
          spark.apache.org

            SparkLearnDoc

本文来自网易实践者社区,经作者田躲躲授权发布。