這篇文章將為大家詳細(xì)講解有關(guān)spark應(yīng)用程序如何在Java項(xiàng)目中運(yùn)行,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
創(chuàng)新互聯(lián)長(zhǎng)期為上千家客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺(tái),與合作伙伴共同營(yíng)造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為永嘉企業(yè)提供專業(yè)的網(wǎng)站設(shè)計(jì)、成都做網(wǎng)站,永嘉網(wǎng)站改版等技術(shù)服務(wù)。擁有十多年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開發(fā)。
如下所示:
package org.shirdrn.spark.job; import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.shirdrn.spark.job.maxmind.Country; import org.shirdrn.spark.job.maxmind.LookupService; import scala.Serializable; import scala.Tuple2; public class IPAddressStats implements Serializable { private static final long serialVersionUID = 8533489548835413763L; private static final Log LOG = LogFactory.getLog(IPAddressStats.class); private static final Pattern SPACE = Pattern.compile(" "); private transient LookupService lookupService; private transient final String geoIPFile; public IPAddressStats(String geoIPFile) { this.geoIPFile = geoIPFile; try { // lookupService: get country code from a IP address File file = new File(this.geoIPFile); LOG.info("GeoIP file: " + file.getAbsolutePath()); lookupService = new AdvancedLookupService(file, LookupService.GEOIP_MEMORY_CACHE); } catch (IOException e) { throw new RuntimeException(e); } } @SuppressWarnings("serial") public void stat(String[] args) { JavaSparkContext ctx = new JavaSparkContext(args[0], "IPAddressStats", System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(IPAddressStats.class)); JavaRDDlines = ctx.textFile(args[1], 1); // splits and extracts ip address filed JavaRDD words = lines.flatMap(new FlatMapFunction () { @Override public Iterable call(String s) { // 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" // ip address return Arrays.asList(SPACE.split(s)[0]); } }); // map JavaPairRDD ones = words.map(new PairFunction () { @Override public Tuple2 call(String s) { return new Tuple2 (s, 1); } }); // reduce JavaPairRDD counts = ones.reduceByKey(new Function2 () { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); List > output = counts.collect(); // sort statistics result by value Collections.sort(output, new Comparator >() { @Override public int compare(Tuple2 t1, Tuple2 t2) { if(t1._2 < t2._2) { return 1; } else if(t1._2 > t2._2) { return -1; } return 0; } }); writeTo(args, output); } private void writeTo(String[] args, List > output) { for (Tuple2<?, ?> tuple : output) { Country country = lookupService.getCountry((String) tuple._1); LOG.info("[" + country.getCode() + "] " + tuple._1 + "\t" + tuple._2); } } public static void main(String[] args) { // ./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat if (args.length < 3) { System.err.println("Usage: IPAddressStats "); System.err.println(" Example: org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat"); System.exit(1); } String geoIPFile = args[2]; IPAddressStats stats = new IPAddressStats(geoIPFile); stats.stat(args); System.exit(0); } }
具體實(shí)現(xiàn)邏輯,可以參考代碼中的注釋。我們使用Maven管理構(gòu)建Java程序,首先看一下我的pom配置中所依賴的軟件包,如下所示:
org.apache.spark spark-core_2.10 0.9.0-incubating log4j log4j 1.2.16 DNSjava dnsjava 2.1.1 commons-net commons-net 3.1 org.apache.hadoop hadoop-client 1.2.1
需要說明的是,當(dāng)我們將程序在Spark集群上運(yùn)行時(shí),它要求我們的編寫的Job能夠進(jìn)行序列化,如果某些字段不需要序列化或者無法序列化,可以直接使用transient修飾即可,如上面的屬性lookupService沒有實(shí)現(xiàn)序列化接口,使用transient使其不執(zhí)行序列化,否則的話,可能會(huì)出現(xiàn)類似如下的錯(cuò)誤:
14/03/10 22:34:06 INFO scheduler.DAGScheduler: Failed to run collect at IPAddressStats.java:76 Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.shirdrn.spark.job.IPAddressStats at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:741) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:740) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:740) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
在Spark集群上運(yùn)行Java程序
這里,我使用了Maven管理構(gòu)建Java程序,實(shí)現(xiàn)上述代碼以后,使用Maven的maven-assembly-plugin插件,配置內(nèi)容如下所示:
maven-assembly-plugin org.shirdrn.spark.job.UserAgentStats jar-with-dependencies *.properties *.xml make-assembly package single
將相關(guān)依賴庫(kù)文件都打進(jìn)程序包里面,最后拷貝JAR文件到Linux系統(tǒng)下(不一定非要在Spark集群的Master節(jié)點(diǎn)上),保證該節(jié)點(diǎn)上Spark的環(huán)境變量配置正確即可看。Spark軟件發(fā)行包解壓縮后,可以看到腳本bin/run-example,我們可以直接修改該腳本,將對(duì)應(yīng)的路徑指向我們實(shí)現(xiàn)的Java程序包(修改變量EXAMPLES_DIR以及我們的JAR文件存放位置相關(guān)的內(nèi)容),使用該腳本就可以運(yùn)行,腳本內(nèi)容如下所示:
cygwin=false case "`uname`" in CYGWIN*) cygwin=true;; esac SCALA_VERSION=2.10 # Figure out where the Scala framework is installed FWDIR="$(cd `dirname $0`/..; pwd)" # Export this as SPARK_HOME export SPARK_HOME="$FWDIR" # Load environment variables from conf/spark-env.sh, if it exists if [ -e "$FWDIR/conf/spark-env.sh" ] ; then . $FWDIR/conf/spark-env.sh fi if [ -z "$1" ]; then echo "Usage: run-example[ ]" >&2 exit 1 fi # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack # to avoid the -sources and -doc packages that are built by publish-local. EXAMPLES_DIR="$FWDIR"/java-examples SPARK_EXAMPLES_JAR="" if [ -e "$EXAMPLES_DIR"/*.jar ]; then export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR"/*.jar` fi if [[ -z $SPARK_EXAMPLES_JAR ]]; then echo "Failed to find Spark examples assembly in $FWDIR/examples/target" >&2 echo "You need to build Spark with sbt/sbt assembly before running this program" >&2 exit 1 fi # Since the examples JAR ideally shouldn't include spark-core (that dependency should be # "provided"), also add our standard Spark classpath, built using compute-classpath.sh. CLASSPATH=`$FWDIR/bin/compute-classpath.sh` CLASSPATH="$SPARK_EXAMPLES_JAR:$CLASSPATH" if $cygwin; then CLASSPATH=`cygpath -wp $CLASSPATH` export SPARK_EXAMPLES_JAR=`cygpath -w $SPARK_EXAMPLES_JAR` fi # Find java binary if [ -n "${JAVA_HOME}" ]; then RUNNER="${JAVA_HOME}/bin/java" else if [ `command -v java` ]; then RUNNER="java" else echo "JAVA_HOME is not set" >&2 exit 1 fi fi # Set JAVA_OPTS to be able to load native libraries and to set heap size JAVA_OPTS="$SPARK_JAVA_OPTS" JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH" # Load extra JAVA_OPTS from conf/java-opts, if it exists if [ -e "$FWDIR/conf/java-opts" ] ; then JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`" fi export JAVA_OPTS if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then echo -n "Spark Command: " echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" echo "========================================" echo fi exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
在Spark上運(yùn)行我們開發(fā)的Java程序,執(zhí)行如下命令:
cd /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1 ./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
我實(shí)現(xiàn)的程序類org.shirdrn.spark.job.IPAddressStats運(yùn)行需要3個(gè)參數(shù):
Spark集群主節(jié)點(diǎn)URL:例如我的是spark://m1:7077
輸入文件路徑:業(yè)務(wù)相關(guān)的,我這里是從HDFS上讀取文件hdfs://m1:9000/user/shirdrn/wwwlog20140222.log
GeoIP庫(kù)文件:業(yè)務(wù)相關(guān)的,用來計(jì)算IP地址所屬國(guó)家的外部文件
如果程序沒有錯(cuò)誤,能夠正常運(yùn)行,控制臺(tái)輸出程序運(yùn)行日志,示例如下所示:
14/03/10 22:17:24 INFO job.IPAddressStats: GeoIP file: /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 14/03/10 22:17:25 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/03/10 22:17:25 INFO Remoting: Starting remoting 14/03/10 22:17:25 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@m1:57379] 14/03/10 22:17:25 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@m1:57379] 14/03/10 22:17:25 INFO spark.SparkEnv: Registering BlockManagerMaster 14/03/10 22:17:25 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140310221725-c1cb 14/03/10 22:17:25 INFO storage.MemoryStore: MemoryStore started with capacity 143.8 MB. 14/03/10 22:17:25 INFO network.ConnectionManager: Bound socket to port 45189 with id = ConnectionManagerId(m1,45189) 14/03/10 22:17:25 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/03/10 22:17:25 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager m1:45189 with 143.8 MB RAM 14/03/10 22:17:25 INFO storage.BlockManagerMaster: Registered BlockManager 14/03/10 22:17:25 INFO spark.HttpServer: Starting HTTP Server 14/03/10 22:17:25 INFO server.Server: jetty-7.x.y-SNAPSHOT 14/03/10 22:17:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:49186 14/03/10 22:17:25 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.95.3.56:49186 14/03/10 22:17:25 INFO spark.SparkEnv: Registering MapOutputTracker 14/03/10 22:17:25 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-56c3e30d-a01b-4752-83d1-af1609ab2370 14/03/10 22:17:25 INFO spark.HttpServer: Starting HTTP Server 14/03/10 22:17:25 INFO server.Server: jetty-7.x.y-SNAPSHOT 14/03/10 22:17:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:52073 14/03/10 22:17:26 INFO server.Server: jetty-7.x.y-SNAPSHOT 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null} 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null} 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null} 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null} 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null} 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null} 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null} 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null} 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null} 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null} 14/03/10 22:17:26 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/03/10 22:17:26 INFO ui.SparkUI: Started Spark Web UI at http://m1:4040 14/03/10 22:17:26 INFO spark.SparkContext: Added JAR /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar at http://10.95.3.56:52073/jars/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1394515046396 14/03/10 22:17:26 INFO client.AppClient$ClientActor: Connecting to master spark://m1:7077... 14/03/10 22:17:26 INFO storage.MemoryStore: ensureFreeSpace(60341) called with curMem=0, maxMem=150837657 14/03/10 22:17:26 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 58.9 KB, free 143.8 MB) 14/03/10 22:17:26 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140310221726-0000 14/03/10 22:17:27 INFO client.AppClient$ClientActor: Executor added: app-20140310221726-0000/0 on worker-20140310221648-s1-52544 (s1:52544) with 1 cores 14/03/10 22:17:27 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140310221726-0000/0 on hostPort s1:52544 with 1 cores, 512.0 MB RAM 14/03/10 22:17:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/03/10 22:17:27 WARN snappy.LoadSnappy: Snappy native library not loaded 14/03/10 22:17:27 INFO client.AppClient$ClientActor: Executor updated: app-20140310221726-0000/0 is now RUNNING 14/03/10 22:17:27 INFO mapred.FileInputFormat: Total input paths to process : 1 14/03/10 22:17:27 INFO spark.SparkContext: Starting job: collect at IPAddressStats.java:77 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Registering RDD 4 (reduceByKey at IPAddressStats.java:70) 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Got job 0 (collect at IPAddressStats.java:77) with 1 output partitions (allowLocal=false) 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Final stage: Stage 0 (collect at IPAddressStats.java:77) 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1) 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1) 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[4] at reduceByKey at IPAddressStats.java:70), which has no missing parents 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[4] at reduceByKey at IPAddressStats.java:70) 14/03/10 22:17:27 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 14/03/10 22:17:28 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@s1:59233/user/Executor#-671170811] with ID 0 14/03/10 22:17:28 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 0 on executor 0: s1 (PROCESS_LOCAL) 14/03/10 22:17:28 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 2396 bytes in 5 ms 14/03/10 22:17:29 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager s1:47282 with 297.0 MB RAM 14/03/10 22:17:32 INFO scheduler.TaskSetManager: Finished TID 0 in 3376 ms on s1 (progress: 0/1) 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 0) 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Stage 1 (reduceByKey at IPAddressStats.java:70) finished in 4.420 s 14/03/10 22:17:32 INFO scheduler.DAGScheduler: looking for newly runnable stages 14/03/10 22:17:32 INFO scheduler.DAGScheduler: running: Set() 14/03/10 22:17:32 INFO scheduler.DAGScheduler: waiting: Set(Stage 0) 14/03/10 22:17:32 INFO scheduler.DAGScheduler: failed: Set() 14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 1.0 from pool 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List() 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[6] at reduceByKey at IPAddressStats.java:70), which is now runnable 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[6] at reduceByKey at IPAddressStats.java:70) 14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 14/03/10 22:17:32 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 1 on executor 0: s1 (PROCESS_LOCAL) 14/03/10 22:17:32 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 2255 bytes in 1 ms 14/03/10 22:17:32 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@s1:33534 14/03/10 22:17:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 120 bytes 14/03/10 22:17:32 INFO scheduler.TaskSetManager: Finished TID 1 in 282 ms on s1 (progress: 0/1) 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0) 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Stage 0 (collect at IPAddressStats.java:77) finished in 0.314 s 14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from pool 14/03/10 22:17:32 INFO spark.SparkContext: Job finished: collect at IPAddressStats.java:77, took 4.870958309 s 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 58.246.49.218 312 14/03/10 22:17:32 INFO job.IPAddressStats: [KR] 1.234.83.77 300 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.11.16 212 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 110.85.72.254 207 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 27.150.229.134 185 14/03/10 22:17:32 INFO job.IPAddressStats: [HK] 180.178.52.181 181 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.37.210.212 180 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 222.77.226.83 176 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.11.205 169 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.9.19 165 ...
我們也可以通過Web控制臺(tái)來查看當(dāng)前執(zhí)行應(yīng)用程序(Application)的狀態(tài)信息,通過Master節(jié)點(diǎn)的8080端口(如:http://m1:8080/)就能看到集群的應(yīng)用程序(Application)狀態(tài)信息。
另外,需要說明的時(shí)候,如果在Unix環(huán)境下使用Eclipse使用Java開發(fā)Spark應(yīng)用程序,也能夠直接通過Eclipse連接Spark集群,并提交開發(fā)的應(yīng)用程序,然后交給集群去處理。
關(guān)于spark應(yīng)用程序如何在Java項(xiàng)目中運(yùn)行就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。