# Set everything to be logged to the console
log4j.rootCategory=INFO, console,FILE
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.appender.FILE=org.apache.log4j.DailyRollingFileAppender
log4j.appender.FILE.Threshold=DEBUG
log4j.appender.FILE.file=/home/logs/spark.log
log4j.appender.FILE.DatePattern='.'yyyy-MM-dd
log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.FILE.layout.ConversionPattern=[%-5p] [%d{yyyy-MM-dd HH:mm:ss}] [%C{1}:%M:%L] %m%n
配置spark-env.sh
export SPARK_MASTER_IP=SparkIp1 #实际按照机器配置
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=1
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=1g
配置slaves文件
SparkIp1
SparkIp2
SparkIp3
启动master机器
sbin/start-master.sh
启动slave机器
sbin/start-slave.sh SparkIp1:7077 #这里需要每台slave机器都指定master
三、Spark监控
var file = sc.textFile("hdfs://127.0.0.1:9000/log/stdout.log")
var errors = file.filter(line=>line.contains("ERROR"))
errors.count()
/**
* @param poi_id poi序号
* @param poi_title 标题
* @param poi_address 地点
* @param poi_lon 经度
* @param poi_lat 维度
* @param poi_city 城市代码号
* @param poi_category_name poi类型代号
* @param poi_checkin_num poi签到次数
* @param poi_photo_num poi签到照片数量
*/
case class Poi(poi_id: String, poi_title: String, poi_address: String, poi_lon: Double, poi_lat: Double,
poi_city: String, poi_category_name: String, poi_checkin_num: Long, poi_photo_num: Long)
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "E:\\tools\\hadoop\\hadoop-2.7.2")
val warehouseLocation = "file:///"
val spark = SparkSession.builder().master("local").appName("Spark SQL").config("spark.sql.warehouse.dir", warehouseLocation).getOrCreate()
val provinceVec = Vector("anhui.csv","beijing.csv")
provinceVec.foreach((province:String) => runInferSchema(spark, province))
}
private def runInferSchema(spark: SparkSession, file: String): Unit = {
println("====================================" + file + "====================================")
import spark.implicits._
val poiDF = spark.sparkContext
.textFile("hdfs://127.0.0.1:9000/weibo_data/" + file)
.map(_.split(",")).filter(line => line.length==9)
.map(attr => Poi(attr(0).trim(), attr(1).trim(), attr(2).trim(), attr(3).trim().toDouble, attr(4).trim().toDouble, attr(5).trim(),
attr(6).trim(), attr(7).trim().toLong, attr(8).trim().toLong))
.toDF()
poiDF.createOrReplaceTempView("poi")
//poi数量,签到数量,照片数量统计
val sql_sum_result = spark.sql("select count(*) as sum_poi_count, sum(poi_checkin_num) as sum_poi_checkin_num, sum(poi_photo_num) as sum_poi_photo_num from poi")
//sql_sum_result.show(1)
//各个地区签到数量统计
val sql_group_by_checkin_num = spark.sql("select sum(poi_checkin_num) as sum_poi_checkin_num, poi_city from poi group by poi_city")
//sql_group_by_checkin_num.show()
//热门签到地点统计
val sql_group_by_poi_address_num = spark.sql("select count(*) as count, poi_address from poi group by poi_address")
//sql_group_by_poi_address_num.show(5)
//签到类型统计
val sql_group_by_poi_category_name_num = spark.sql("select count(*) as count, poi_category_name from poi group by poi_category_name")
//sql_group_by_poi_category_name_num.show(5)
//签到位置统计
val sql_group_by_location_num = spark.sql("select count(*) as count, poi_lon, poi_lat from poi group by poi_lon, poi_lat")
sql_group_by_location_num.show(5)
}
var sparkConf = new SparkConf().setAppName("SparkStreamReadFile").setMaster("local[2]")
var sc = new StreamingContext(sparkConf, Seconds(20))
var lines = sc.textFileStream("D:\\log\\")
var words = lines.flatMap(_.split(" "))
var wordConts = words.map(x => (x,1)).reduceByKey(_ + _)
wordConts.print()
sc.start()
sc.awaitTermination()
当我们手动向该目录增加文件时,我们可以从Spark日志中获取一下数据。
System.setProperty("hadoop.home.dir", "E:\\tools\\hadoop\\hadoop-2.7.2")
val conf = new SparkConf().setAppName("SparkFile").setMaster("local")
var sc = new SparkContext(conf)
var NUM_SAMPLES = 10000000;
val count = sc.parallelize(1 to NUM_SAMPLES).filter { _ =>
val x = math.random
val y = math.random
x*x + y*y <= 1
}.count()
println(s"Pi is roughly ${4.0 * count / NUM_SAMPLES}")
sc.stop()
通过多次模拟可知,该数值几乎趋于真实计算数值。
本文来自网易实践者社区,经作者田躲躲授权发布。