CDH版本:5.10.0
IDE环境:win7 64位 MyEclipse2015
spark模式:yarn
提交模式:yarn-client
之前同样的IDE环境下,向alone模式的spark提交任务,一直很顺利,今天测了一下spark on yarn模式,提交只能是yarn-client模式,其它基本不变,只是换了模式,结果出现下面错误:
java.io.IOException: Cannot run program "/etc/hadoop/conf.cloudera.yarn/topology.py" (in directory "D:\workspace2015\5_10_0cdh"): CreateProcess error=2, ϵͳÕҲ»µ½ָ¶
折腾了五个小时,总算是解决了,方法如下:
修改工程中的core-site.xml,找到配置net.topology.script.file.name,将其value注释掉,如图:
再次运行,得到正确结果。
代码如下:
package com.byit.test;
import java.util.Arrays;
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import com.byit.getinfo.IGetConf;
/** * * @author 耿廑 * yarn client模式下将src/main/resources目录下的core-site.xml中的 * net.topology.script.file.name的value值注释掉,切记,切记!!! */ public class SparkYarnClientTest implements IGetConf{ static String sendString = " "; public static void main(String[] args) throws Exception { // TODO Auto-generated method stub SparkYarnClientTest.run(); } public static void run() throws Exception { // TODO Auto-generated method stub System.setProperty("HADOOP_USER_NAME",sparkUser); SparkConf conf = new SparkConf() .setAppName("SparkYarnClientTest") .setMaster("yarn-client") .set("spark.yarn.jar",hdfsURL + "/test/spark/libs/spark-assembly-1.6.0-cdh5.10.0-hadoop2.6.0-cdh5.10.0.jar"); JavaSparkContext sc = new JavaSparkContext(conf); String jar = jarsPath + "/SparkYarnClientTest.jar"; sc.addJar(jar); String inPath = hdfsURL + "/test/spark/input"; String outPath = hdfsURL + "/test/spark/output"; JavaRDD<String> word = sc.textFile(inPath).flatMap(new toWord()); JavaPairRDD<String,Integer> wordPair = word.mapToPair(new myMapper()); JavaPairRDD<String,Integer> count = wordPair.reduceByKey(new myReducer()); JavaRDD<String> result = count.map(new toString()); result.saveAsTextFile(outPath); sc.close(); } @SuppressWarnings("serial") private static class toWord implements FlatMapFunction<String,String> { public Iterable<String> call(String words) throws Exception { // TODO Auto-generated method stub return Arrays.asList(words.split(sendString)); } } @SuppressWarnings("serial") private static class myMapper implements PairFunction<String,String,Integer> {
public Tuple2<String, Integer> call(String word) throws Exception { // TODO Auto-generated method stub Tuple2<String, Integer> myWord = new Tuple2<String, Integer>(word,1); return myWord; } } @SuppressWarnings("serial") private static class myReducer implements Function2<Integer,Integer,Integer> { public Integer call(Integer value1, Integer value2) throws Exception { // TODO Auto-generated method stub Integer value = value1 + value2; return value; } } @SuppressWarnings("serial") private static class toString implements Function<Tuple2<String, Integer>,String> {
public String call(Tuple2<String, Integer> result) throws Exception { // TODO Auto-generated method stub String myResult = result._1 + "\t" + result._2; return myResult; } } }
|