一. Azkaban简介
Azkaban来自LinkedIn公司,用于管理他们的Hadoop批处理工作流。Hadoop任务在Azkaban上由两部分组成,一个是配置该任务的Job文件,该文件是一个Property格式的文件。另一个是Hadoop的Java类,该Java类需要实现一个带Properties参数的构造函数和一个run方法。
public AzkabanJob(String name, Properties props)
public void run()
Azkaban将Job文件中的参数封装成Properties,通过构造函数传递给Hadoop的Java类,用户通过构造函数中的Properties拿到配置在Job文件中的参数。
Spark任务在Azkaban上传递参数的方式有所不同。Azkaban会去调用Spark任务的main方法来调用任务。任务参数只能从main方法的参数中通过数组的形式传入。在Job文件的配置中,需要配置如下参数:
params=firstParam secParam
Azkaban会将params参数按照空格切分成数组传入main方法的参数中。显然,这种传参方式十分不方便。这里提出一种解决办法,将Spark任务的传参方式改成和Hadoop任务传参方式一致。实现将Job配置文件中的参数传递给Spark程序的功能。
二. Spark任务执行原理
在azkaban-plugins的包中,有个jobtype模块,在azkaban.jobtype包中找到HadoopSparkJob和HadoopSecureSparkWrapper类。当用户在Job文件中指定type=spark后,Azkaban调度框架会使用HadoopSparkJob类去启动JVM调用由HadoopSecureSparkWrapper类封装后的任务类的main方法。通过观察源码,我们可知,其实HadoopSecureSparkWrapper类也是通过Spark自带的org.apache.spark.deploy.SparkSubmit类提交Spark任务的。
org.apache.spark.deploy.SparkSubmit$.MODULE$.main(newArgs);
简单来说,Azkaban实际上就是将参数拼接后传给spark的submit进行任务提交的。
通过跟踪Azkaban的代码,可以发现,传递给HadoopSecureSparkWrapper的main方法的数组参数长度为1,内容就是所有的spark任务启动参数,通过一些用户代理的操作后,main方法又将参数传递给runSpark方法,runSpark方法将参数组织成SparkSubmit的参数。
另外一个很重要的是,HadoopSparkJob类启动JVM的时候,使用了-cp参数指定class path。根据这个原理,我们可以通过在Job文件中配置classpath参数,将自己修改过的HadoopSecureSparkWrapper类替换掉线上默认部署的类。这为我们改进Spark作业提交方法提供了前提。
三. 实现方法
有了上一节介绍的前提,我们的改进思路分为以下几步:
有了以上思路,代码就十分简单了,首先,我们在HadoopSecureSparkWrapper的main方法中加入如下代码,将jobProps转成Json拼接在args里:
Properties jobProps = HadoopSecureWrapperUtils.loadAzkabanProps();
String propsJson = convertToJson(jobProps);
args[0] = args[0] + propsJson;
private static String convertToJson(Properties properties) {
JsonObject json = new JsonObject();
for (Object key : properties.keySet()) {
String baseKey = key.toString();
String value = properties.getProperty(baseKey);
String[] splitedValue = value.split(“\\,”);
if (splitedValue.length == 1) {
json.addProperty(baseKey, value);
} else {
JsonArray elementArray = new JsonArray();
for (String val : splitedValue) {
elementArray.add(new JsonPrimitive(val));
}
json.add(baseKey, elementArray);
}
}
return new Gson().toJson(json);
}
编写SparkJob的超类,在main方法中解析Json字符串:
def run(props: Properties)
def convertToProperties(json: String) = {
val prop = new Properties()
val jsonObj = JSON.parseFull(json).getOrElse(Map()).asInstanceOf[Map[String, String]]
jsonObj.foreach(kv => prop.put(kv._1, kv._2))
prop
}
def main(args: Array[String]): Unit = {
properties = convertToProperties(args(args.length – 1))
run(properties)
}
最后,将classpath配置成你修改后的azkaban-plugin的包即可。
现在,我们就可以像如下方式编写SparkJob啦:
object Test extends SparkJob{
override def run(props: Properties): Unit = {
val day = props.get(“job.etl_date”)
}
}
四. 局限性
这种方法是可行的,但为什么Azkaban的源码不采用这种方法呢?这种方法的最大局限在于,Spark程序是从main方法启动的,如果想把解析参数这层逻辑封装在抽象层,需要main方法可以被继承。在Scala语言中,这是允许的,但是注意,继承main方法是有风险的。在Java语言中,main方法是不可被继承的。所以以上方案不可行,但是可以手动注入解析参数的逻辑,这样代码会显得不优雅。再提醒一句,这种方法相当于用户用同名的class代替了系统的class,用户的class里面做了什么,系统是不知道的,解决这个问题,用户可以修改系统的代码,也可以把这个classpath配到自己的代码上。
网易云原创文章,未经许可,禁止转载。