【大数据】改进Azkaban提交Spark作业参数传递方法

社区编辑2018-05-17 15:22

一. Azkaban简介

Azkaban来自LinkedIn公司,用于管理他们的Hadoop批处理工作流。Hadoop任务在Azkaban上由两部分组成,一个是配置该任务的Job文件,该文件是一个Property格式的文件。另一个是Hadoop的Java类,该Java类需要实现一个带Properties参数的构造函数和一个run方法。

 

public AzkabanJob(String name, Properties props)

public void run()

 

Azkaban将Job文件中的参数封装成Properties,通过构造函数传递给Hadoop的Java类,用户通过构造函数中的Properties拿到配置在Job文件中的参数。

 

Spark任务在Azkaban上传递参数的方式有所不同。Azkaban会去调用Spark任务的main方法来调用任务。任务参数只能从main方法的参数中通过数组的形式传入。在Job文件的配置中,需要配置如下参数:

 

params=firstParam secParam

 

Azkaban会将params参数按照空格切分成数组传入main方法的参数中。显然,这种传参方式十分不方便。这里提出一种解决办法,将Spark任务的传参方式改成和Hadoop任务传参方式一致。实现将Job配置文件中的参数传递给Spark程序的功能。

 

二. Spark任务执行原理

在azkaban-plugins的包中,有个jobtype模块,在azkaban.jobtype包中找到HadoopSparkJob和HadoopSecureSparkWrapper类。当用户在Job文件中指定type=spark后,Azkaban调度框架会使用HadoopSparkJob类去启动JVM调用由HadoopSecureSparkWrapper类封装后的任务类的main方法。通过观察源码,我们可知,其实HadoopSecureSparkWrapper类也是通过Spark自带的org.apache.spark.deploy.SparkSubmit类提交Spark任务的。

 

org.apache.spark.deploy.SparkSubmit$.MODULE$.main(newArgs);

 

简单来说,Azkaban实际上就是将参数拼接后传给spark的submit进行任务提交的。

通过跟踪Azkaban的代码,可以发现,传递给HadoopSecureSparkWrapper的main方法的数组参数长度为1,内容就是所有的spark任务启动参数,通过一些用户代理的操作后,main方法又将参数传递给runSpark方法,runSpark方法将参数组织成SparkSubmit的参数。

另外一个很重要的是,HadoopSparkJob类启动JVM的时候,使用了-cp参数指定class path。根据这个原理,我们可以通过在Job文件中配置classpath参数,将自己修改过的HadoopSecureSparkWrapper类替换掉线上默认部署的类。这为我们改进Spark作业提交方法提供了前提。

 

三. 实现方法

有了上一节介绍的前提,我们的改进思路分为以下几步:

  1. 修改HadoopSecureSparkWrapper类,将Job文件中配置的参数序列化后,拼接在main方法的参数后,目前我们选用Json的方式。
  2. 编写SparkJob的抽象类,所有任务类继承该抽象类,在这个类的main方法中,将传递来的Json串解析成为Properties,然后向用户暴露一个抽象run方法即可。注意一点的是,这里要求main方法必须能被继承,因此该改进方法只适合Scala代码。关于Scala代码的main方法问题,这里不做详述。
  3. 在Spark公共的Job文件中spark_common.properties中配置好classpath参数,指向你修改后的azkaban-plugin包。

有了以上思路,代码就十分简单了,首先,我们在HadoopSecureSparkWrapper的main方法中加入如下代码,将jobProps转成Json拼接在args里:

 

Properties jobProps = HadoopSecureWrapperUtils.loadAzkabanProps();

String propsJson = convertToJson(jobProps);

args[0] = args[0] + propsJson;

 

    private static String convertToJson(Properties properties) {

        JsonObject json = new JsonObject();

        for (Object key : properties.keySet()) {

 

            String baseKey = key.toString();

            String value = properties.getProperty(baseKey);

            String[] splitedValue = value.split(“\\,”);

 

            if (splitedValue.length == 1) {

                json.addProperty(baseKey, value);

            } else {

                JsonArray elementArray = new JsonArray();

                for (String val : splitedValue) {

                    elementArray.add(new JsonPrimitive(val));

                }

                json.add(baseKey, elementArray);

            }

 

        }

        return new Gson().toJson(json);

    }

编写SparkJob的超类,在main方法中解析Json字符串:

 

  def run(props: Properties)

 

  def convertToProperties(json: String) = {

    val prop = new Properties()

    val jsonObj = JSON.parseFull(json).getOrElse(Map()).asInstanceOf[Map[String, String]]

    jsonObj.foreach(kv => prop.put(kv._1, kv._2))

    prop

  }

 

  def main(args: Array[String]): Unit = {

    properties = convertToProperties(args(args.length – 1))

    run(properties)

  }

最后,将classpath配置成你修改后的azkaban-plugin的包即可。

现在,我们就可以像如下方式编写SparkJob啦:

 

object Test extends SparkJob{

  override def run(props: Properties): Unit = {

    val day = props.get(“job.etl_date”)

  }

}

 

四. 局限性

这种方法是可行的,但为什么Azkaban的源码不采用这种方法呢?这种方法的最大局限在于,Spark程序是从main方法启动的,如果想把解析参数这层逻辑封装在抽象层,需要main方法可以被继承。在Scala语言中,这是允许的,但是注意,继承main方法是有风险的。在Java语言中,main方法是不可被继承的。所以以上方案不可行,但是可以手动注入解析参数的逻辑,这样代码会显得不优雅。再提醒一句,这种方法相当于用户用同名的class代替了系统的class,用户的class里面做了什么,系统是不知道的,解决这个问题,用户可以修改系统的代码,也可以把这个classpath配到自己的代码上。

网易云原创文章,未经许可,禁止转载。