這篇文章將為大家詳細(xì)講解有關(guān)spark應(yīng)用程序如何在Java項目中運行,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
成都創(chuàng)新互聯(lián)是網(wǎng)站建設(shè)技術(shù)企業(yè),為成都企業(yè)提供專業(yè)的成都網(wǎng)站設(shè)計、成都做網(wǎng)站,網(wǎng)站設(shè)計,網(wǎng)站制作,網(wǎng)站改版等技術(shù)服務(wù)。擁有10年豐富建站經(jīng)驗和眾多成功案例,為您定制適合企業(yè)的網(wǎng)站。10年品質(zhì),值得信賴!如下所示:
package org.shirdrn.spark.job; import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.shirdrn.spark.job.maxmind.Country; import org.shirdrn.spark.job.maxmind.LookupService; import scala.Serializable; import scala.Tuple2; public class IPAddressStats implements Serializable { private static final long serialVersionUID = 8533489548835413763L; private static final Log LOG = LogFactory.getLog(IPAddressStats.class); private static final Pattern SPACE = Pattern.compile(" "); private transient LookupService lookupService; private transient final String geoIPFile; public IPAddressStats(String geoIPFile) { this.geoIPFile = geoIPFile; try { // lookupService: get country code from a IP address File file = new File(this.geoIPFile); LOG.info("GeoIP file: " + file.getAbsolutePath()); lookupService = new AdvancedLookupService(file, LookupService.GEOIP_MEMORY_CACHE); } catch (IOException e) { throw new RuntimeException(e); } } @SuppressWarnings("serial") public void stat(String[] args) { JavaSparkContext ctx = new JavaSparkContext(args[0], "IPAddressStats", System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(IPAddressStats.class)); JavaRDDlines = ctx.textFile(args[1], 1); // splits and extracts ip address filed JavaRDD words = lines.flatMap(new FlatMapFunction () { @Override public Iterable call(String s) { // 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" // ip address return Arrays.asList(SPACE.split(s)[0]); } }); // map JavaPairRDD ones = words.map(new PairFunction () { @Override public Tuple2 call(String s) { return new Tuple2 (s, 1); } }); // reduce JavaPairRDD counts = ones.reduceByKey(new Function2 () { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); List > output = counts.collect(); // sort statistics result by value Collections.sort(output, new Comparator >() { @Override public int compare(Tuple2 t1, Tuple2 t2) { if(t1._2 < t2._2) { return 1; } else if(t1._2 > t2._2) { return -1; } return 0; } }); writeTo(args, output); } private void writeTo(String[] args, List > output) { for (Tuple2<?, ?> tuple : output) { Country country = lookupService.getCountry((String) tuple._1); LOG.info("[" + country.getCode() + "] " + tuple._1 + "\t" + tuple._2); } } public static void main(String[] args) { // ./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat if (args.length < 3) { System.err.println("Usage: IPAddressStats "); System.err.println(" Example: org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat"); System.exit(1); } String geoIPFile = args[2]; IPAddressStats stats = new IPAddressStats(geoIPFile); stats.stat(args); System.exit(0); } }