真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

spark應(yīng)用程序如何在Java項目中運行-創(chuàng)新互聯(lián)

這篇文章將為大家詳細(xì)講解有關(guān)spark應(yīng)用程序如何在Java項目中運行,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

成都創(chuàng)新互聯(lián)是網(wǎng)站建設(shè)技術(shù)企業(yè),為成都企業(yè)提供專業(yè)的成都網(wǎng)站設(shè)計、成都做網(wǎng)站,網(wǎng)站設(shè)計,網(wǎng)站制作,網(wǎng)站改版等技術(shù)服務(wù)。擁有10年豐富建站經(jīng)驗和眾多成功案例,為您定制適合企業(yè)的網(wǎng)站。10年品質(zhì),值得信賴!

如下所示:

package org.shirdrn.spark.job;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.shirdrn.spark.job.maxmind.Country;
import org.shirdrn.spark.job.maxmind.LookupService;
import scala.Serializable;
import scala.Tuple2;
public class IPAddressStats implements Serializable {
  private static final long serialVersionUID = 8533489548835413763L;
  private static final Log LOG = LogFactory.getLog(IPAddressStats.class);
  private static final Pattern SPACE = Pattern.compile(" ");
  private transient LookupService lookupService;
  private transient final String geoIPFile;
  public IPAddressStats(String geoIPFile) {
   this.geoIPFile = geoIPFile;
   try {
    // lookupService: get country code from a IP address
    File file = new File(this.geoIPFile);
    LOG.info("GeoIP file: " + file.getAbsolutePath());
    lookupService = new AdvancedLookupService(file, LookupService.GEOIP_MEMORY_CACHE);
   } catch (IOException e) {
    throw new RuntimeException(e);
   }
  }
  @SuppressWarnings("serial")
  public void stat(String[] args) {
   JavaSparkContext ctx = new JavaSparkContext(args[0], "IPAddressStats",
     System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(IPAddressStats.class));
   JavaRDD lines = ctx.textFile(args[1], 1);
   // splits and extracts ip address filed
   JavaRDD words = lines.flatMap(new FlatMapFunction() {
    @Override
    public Iterable call(String s) {
     // 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
     // ip address
     return Arrays.asList(SPACE.split(s)[0]);
    }
   });
   // map
   JavaPairRDD ones = words.map(new PairFunction() {
    @Override
    public Tuple2 call(String s) {
     return new Tuple2(s, 1);
    }
   });
   // reduce
   JavaPairRDD counts = ones.reduceByKey(new Function2() {
    @Override
    public Integer call(Integer i1, Integer i2) {
     return i1 + i2;
    }
   });
   List> output = counts.collect();
   // sort statistics result by value
   Collections.sort(output, new Comparator>() {
    @Override
    public int compare(Tuple2 t1, Tuple2 t2) {
     if(t1._2 < t2._2) {
       return 1;
     } else if(t1._2 > t2._2) {
       return -1;
     }
     return 0;
    }
   });
   writeTo(args, output);
  }
  private void writeTo(String[] args, List> output) {
   for (Tuple2<?, ?> tuple : output) {
    Country country = lookupService.getCountry((String) tuple._1);
    LOG.info("[" + country.getCode() + "] " + tuple._1 + "\t" + tuple._2);
   }
  }
  public static void main(String[] args) {
   // ./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
   if (args.length < 3) {
    System.err.println("Usage: IPAddressStats   ");
    System.err.println(" Example: org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat");
    System.exit(1);
   }
   String geoIPFile = args[2];
   IPAddressStats stats = new IPAddressStats(geoIPFile);
   stats.stat(args);
   System.exit(0);
  }
}

網(wǎng)站欄目:spark應(yīng)用程序如何在Java項目中運行-創(chuàng)新互聯(lián)
分享鏈接:http://weahome.cn/article/ijdeo.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部