本篇文章為大家展示了Spark RDD的collect action 不適用于單個element size過大的示例分析,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
創(chuàng)新互聯(lián)企業(yè)建站,十年網(wǎng)站建設(shè)經(jīng)驗,專注于網(wǎng)站建設(shè)技術(shù),精于網(wǎng)頁設(shè)計,有多年建站和網(wǎng)站代運營經(jīng)驗,設(shè)計師為客戶打造網(wǎng)絡(luò)企業(yè)風格,提供周到的建站售前咨詢和貼心的售后服務(wù)。對于做網(wǎng)站、成都網(wǎng)站建設(shè)中不同領(lǐng)域進行深入了解和探索,創(chuàng)新互聯(lián)在網(wǎng)站建設(shè)中充分了解客戶行業(yè)的需求,以靈動的思維在網(wǎng)頁中充分展現(xiàn),通過對客戶行業(yè)精準市場調(diào)研,為客戶提供的解決方案。
collect是Spark RDD一個非常易用的action,通過collect可以輕易獲得一個RDD當中所有的elements。當這些elements是String類型的時候,可以輕易將整個RDD轉(zhuǎn)化成一個List
不過等一等,這么好用的action有一個弱點,它不適合size比較的element。舉個例子來說吧。請看下面這段代碼:
... ...
JavaPairInputDStream
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
JavaDStream
@Override
public String call(Tuple2
return tuple2._2();
}
});
lines.foreachRDD(new Function
@Override
public Void call(JavaRDD
List
List
for (String message: messages) {
if (message== null)
continue;
String logStr = "message size is " + message.length();
strs.add(logStr);
}
saveToLog(outputLogPath, strs);
return null;
}
});
... ...
上述這段代碼當Kafka中單個message(也就是)的size很?。ū热?00Bytes)的時候,運行得很好??墒钱攩蝹€message size變大到一定程度(例如10MB),就會拋出以下異常:
sparkDriver-akka.actor.default-dispatcher-18 2015-10-15 21:52:28,606 ERROR JobSc
heduler - Error running job streaming job 1444971120000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 238.0 failed 4 times, most recent failure: Lost task 0.3 in stage 238.0 (TID421, 127.0.0.1): ExecutorLostFailure (executor 123 lost)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1215)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1404)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1365)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
原因很簡單,collect()無法handle“大數(shù)據(jù)”。對于10MB size這樣的單條message。我們可以用下面這段代碼替代上面最后一部分:
lines.foreachRDD(new Function
@Override
public Void call(JavaRDD
JavaRDD
@Override
public String call(String message) throws Exception {
if (message == null)
return null;
String logStr = "Message size is " + message.length();
return logStr;
}
});
List
saveToLog(outputLogPat, sizeStrs);
return null;
}
});
上述內(nèi)容就是Spark RDD的collect action 不適用于單個element size過大的示例分析,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。