PageRank簡(jiǎn)單介紹:
目前累計(jì)服務(wù)客戶近1000家,積累了豐富的產(chǎn)品開發(fā)及服務(wù)經(jīng)驗(yàn)。以網(wǎng)站設(shè)計(jì)水平和技術(shù)實(shí)力,樹立企業(yè)形象,為客戶提供成都做網(wǎng)站、成都網(wǎng)站制作、網(wǎng)站策劃、網(wǎng)頁設(shè)計(jì)、網(wǎng)絡(luò)營銷、VI設(shè)計(jì)、網(wǎng)站改版、漏洞修補(bǔ)等服務(wù)。創(chuàng)新互聯(lián)建站始終以務(wù)實(shí)、誠信為根本,不斷創(chuàng)新和提高建站品質(zhì),通過對(duì)領(lǐng)先技術(shù)的掌握、對(duì)創(chuàng)意設(shè)計(jì)的研究、對(duì)客戶形象的視覺傳遞、對(duì)應(yīng)用系統(tǒng)的結(jié)合,為客戶提供更好的一站式互聯(lián)網(wǎng)解決方案,攜手廣大客戶,共同發(fā)展進(jìn)步。其值是通過其他值得指向值所決定,具體例子如下:
第一部分:
對(duì)應(yīng)于每個(gè)mapReduce的計(jì)算:
由mapper算出每個(gè)點(diǎn)所指節(jié)點(diǎn)的分值,由reduce整個(gè)key相同的,由公式算出。
三角號(hào)表示的是迭代兩次之間計(jì)算的差值,若小于某個(gè)值則計(jì)算完成,求的每個(gè)點(diǎn)的pagerank值。
自我實(shí)現(xiàn)的代碼:如下
輸入的數(shù)據(jù)分為:
input1.txt
A,B,D
B,C
C,A,B
D,B,C
表示每行第一個(gè)點(diǎn)所指向的節(jié)點(diǎn),在reducer的setup會(huì)用到,構(gòu)建hashmap供使用。
input2.txt
A,0.25,B,D
B,0.25,C
C,0.25,A,B
D,0.25,B,C
中間多的數(shù)字,表示當(dāng)前每個(gè)節(jié)點(diǎn)的pagerank值,其文件可無,因?yàn)榭梢杂缮厦娴奈募?jì)算生成,有四個(gè)節(jié)點(diǎn),即1/4。
自我實(shí)現(xiàn)的代碼:
package bbdt.steiss.pageRank; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.URI; import java.util.ArrayList; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class PageRank { public static class PageMapper extends Mapper{ private Text averageValue = new Text(); private Text node = new Text(); @Override //把每行數(shù)據(jù)的對(duì)應(yīng)節(jié)點(diǎn)的分pagerank找出,并輸出,當(dāng)前節(jié)點(diǎn)的值除以指向節(jié)點(diǎn)的總數(shù) protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String string = value.toString(); String [] ss = string.split(","); int length = ss.length; double pageValue = Double.parseDouble(ss[1]); double average = pageValue/(length-2); averageValue.set(String.valueOf(average)); int i = 2; while(i<=length-1){ node.set(ss[i]); context.write(node,averageValue); i++; } } } public static class PageReducer extends Reducer { private HashMap content; private Text res = new Text(); //reducer工作前,key相同的會(huì)分組分在一組,用迭代器操作,從總的圖中找到所有該節(jié)點(diǎn)的分pagerank值 //利用公式計(jì)算該pagerank值,輸出。因?yàn)橄乱淮我?,因此輸出可以湊近一些,把結(jié)果都放在value里輸出 @Override protected void reduce(Text text, Iterable intIterable, Context context) throws IOException, InterruptedException { double sum = 0.0; double v = 0.0; for (Text t : intIterable) { v = Double.parseDouble(t.toString()); sum = sum + v; } double a = 0.85; double result = (1-a)/4 + a*sum; String sRes = String.valueOf(result); String back = content.get(text.toString()); String front = text.toString(); String comp = front + "," + sRes + back; res.set(comp); context.write(null,res); } @Override //reducer的初始化時(shí),先把節(jié)點(diǎn)對(duì)應(yīng)文件的數(shù)據(jù),存在hashmap中,也就是content中,供每次reduce方法使用,相當(dāng)于數(shù)據(jù)庫的作用 //方便查詢 protected void setup(Context context) throws IOException, InterruptedException { URI[] uri = context.getCacheArchives(); content = new HashMap (); for(URI u : uri) { FileSystem fileSystem = FileSystem.get(u.create("hdfs://hadoop1:9000"), context.getConfiguration()); FSDataInputStream in = null; in = fileSystem.open(new Path(u.getPath())); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in)); String line; while((line = bufferedReader.readLine())!=null) { int index = line.indexOf(","); String first = line.substring(0,index); String last = line.substring(index,line.length()); content.put(first, last); } } } } public static void main(String[] args) throws Exception{ //接受路徑文件 Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); Path cachePath = new Path(args[2]); double result = 100; int flag = 0; //制定差值多大時(shí)進(jìn)入循環(huán) while(result>0.1) { if(flag == 1) { //初次調(diào)用mapreduce不操作這個(gè) //這個(gè)是把mapreduce的輸出文件復(fù)制到輸入文件中,作為這次mapreduce的輸入文件 copyFile(); flag = 0; } Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(PageRank.class); job.setMapperClass(PageMapper.class); job.setReducerClass(PageReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); job.addCacheArchive(cachePath.toUri()); outputPath.getFileSystem(configuration).delete(outputPath, true); job.waitForCompletion(true); String outpathString = outputPath.toString()+"/part-r-00000"; //計(jì)算兩個(gè)文件的各節(jié)點(diǎn)的pagerank值差 result = fileDo(inputPath, new Path(outpathString)); flag = 1; } System.exit(0); } //計(jì)算兩個(gè)文件的每個(gè)節(jié)點(diǎn)的pagerank差值,返回 public static double fileDo(Path inputPath,Path outPath) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop1:9000"); FileSystem fs = FileSystem.get(conf); FSDataInputStream in1 = null; FSDataInputStream in2 = null; in1 = fs.open(inputPath); in2 = fs.open(outPath); BufferedReader br1 = new BufferedReader(new InputStreamReader(in1)); BufferedReader br2 = new BufferedReader(new InputStreamReader(in2)); String s1 = null; String s2 = null; ArrayList arrayList1 = new ArrayList (); ArrayList arrayList2 = new ArrayList (); while ((s1 = br1.readLine()) != null) { String[] ss = s1.split(","); arrayList1.add(Double.parseDouble(ss[1])); } br1.close(); while ((s2 = br2.readLine()) != null) { String[] ss = s2.split(","); arrayList2.add(Double.parseDouble(ss[1])); } double res = 0; for(int i = 0;i 注意:
在本地操作hdfs時(shí),進(jìn)行文件的刪除和添加,需要打開hdfs的文件操作權(quán)限,
這里刪除需要打開hdfs在/input目錄下的權(quán)限操作,非常重要 “hdfs dfs -chmod 777 /input”打開權(quán)限,這樣才可以刪除其下面的文件打開/input路徑的操作權(quán)限
第二部分
以上是自己實(shí)現(xiàn)的pagerank的算法;下面介紹一下別人的代碼
robby的代碼實(shí)現(xiàn):
1.首先對(duì)節(jié)點(diǎn)定義節(jié)點(diǎn)類,用于存當(dāng)前節(jié)點(diǎn)的pagerank值以及所指向的節(jié)點(diǎn),存在一個(gè)數(shù)組中。
package org.robby.mr.pagerank; import org.apache.commons.lang.StringUtils; import java.io.IOException; import java.util.Arrays; //節(jié)點(diǎn)類,記錄的是當(dāng)前節(jié)點(diǎn)的pagerank值和其指向的節(jié)點(diǎn) public class Node { private double pageRank = 0.25; private String[] adjacentNodeNames; //分割符號(hào) public static final char fieldSeparator = '\t'; public double getPageRank() { return pageRank; } public Node setPageRank(double pageRank) { this.pageRank = pageRank; return this; } public String[] getAdjacentNodeNames() { return adjacentNodeNames; } //接受一個(gè)數(shù)組,復(fù)制在指向節(jié)點(diǎn)數(shù)組上 public Node setAdjacentNodeNames(String[] adjacentNodeNames) { this.adjacentNodeNames = adjacentNodeNames; return this; } public boolean containsAdjacentNodes() { return adjacentNodeNames != null; } //這個(gè)方法是從pagerank值開始+后面的指向的節(jié)點(diǎn) @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(pageRank); if (getAdjacentNodeNames() != null) { sb.append(fieldSeparator) .append(StringUtils .join(getAdjacentNodeNames(), fieldSeparator)); } return sb.toString(); } //通過字符串建立一個(gè)node public static Node fromMR(String value) throws IOException { String[] parts = StringUtils.splitPreserveAllTokens( value, fieldSeparator); if (parts.length < 1) { throw new IOException( "Expected 1 or more parts but received " + parts.length); } Node node = new Node() .setPageRank(Double.valueOf(parts[0])); if (parts.length > 1) { node.setAdjacentNodeNames(Arrays.copyOfRange(parts, 1, parts.length)); } return node; } }2.這個(gè)是mapper的實(shí)現(xiàn)
package org.robby.mr.pagerank; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; //這里map的輸入時(shí)Text和Text類型,說明是兩個(gè)文本,因此主函數(shù)中應(yīng)設(shè)置job的輸入類型格式為KeyValueTextInputFormat public class Map extends Mapper{ private Text outKey = new Text(); private Text outValue = new Text(); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { //先把原始的數(shù)據(jù)輸出,供reduce找指向節(jié)點(diǎn)使用 context.write(key, value); //傳入時(shí),key是第一個(gè)節(jié)點(diǎn),以制表符分割,后面是value Node node = Node.fromMR(value.toString()); if(node.getAdjacentNodeNames() != null && node.getAdjacentNodeNames().length > 0) { double outboundPageRank = node.getPageRank() / (double)node.getAdjacentNodeNames().length; for (int i = 0; i < node.getAdjacentNodeNames().length; i++) { String neighbor = node.getAdjacentNodeNames()[i]; outKey.set(neighbor); Node adjacentNode = new Node() .setPageRank(outboundPageRank); outValue.set(adjacentNode.toString()); System.out.println( " output -> K[" + outKey + "],V[" + outValue + "]"); //這里輸出計(jì)算出的節(jié)點(diǎn)分pagerank值 context.write(outKey, outValue); } } } } 輸出的數(shù)據(jù):例子 A 0.25 B D B 0.125 D 0.125 注意:
KeyValueTextInputFormat的輸入格式(Text,Text),對(duì)每行的文本內(nèi)容進(jìn)行處理,以第一個(gè)制表符作為分割,分為key和value傳入。
TextInputFormat的格式是(Longwritable,Text),以行標(biāo)作為key,內(nèi)容作為value處理;
3.reduce方法的實(shí)現(xiàn)
package org.robby.mr.pagerank; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class Reduce extends Reducer{ public static final double CONVERGENCE_SCALING_FACTOR = 1000.0; public static final double DAMPING_FACTOR = 0.85; public static String CONF_NUM_NODES_GRAPH = "pagerank.numnodes"; private int numberOfNodesInGraph; public static enum Counter { CONV_DELTAS } //reduce初始化時(shí)執(zhí)行的方法,得到總節(jié)點(diǎn)個(gè)數(shù),在conf對(duì)象里 @Override protected void setup(Context context) throws IOException, InterruptedException { numberOfNodesInGraph = context.getConfiguration().getInt( CONF_NUM_NODES_GRAPH, 0); } private Text outValue = new Text(); public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { System.out.println("input -> K[" + key + "]"); double summedPageRanks = 0; Node originalNode = new Node(); for (Text textValue : values) { System.out.println(" input -> V[" + textValue + "]"); Node node = Node.fromMR(textValue.toString()); //這里就是傳入的是原始數(shù)據(jù) if (node.containsAdjacentNodes()) { // the original node // originalNode = node; } else { //計(jì)算針對(duì)一個(gè)節(jié)點(diǎn)的pagerank總和 summedPageRanks += node.getPageRank(); } } double dampingFactor = ((1.0 - DAMPING_FACTOR) / (double) numberOfNodesInGraph); double newPageRank = dampingFactor + (DAMPING_FACTOR * summedPageRanks); //計(jì)算差值 double delta = originalNode.getPageRank() - newPageRank; //把原節(jié)點(diǎn)對(duì)象的pagerank改為新的 originalNode.setPageRank(newPageRank); outValue.set(originalNode.toString()); System.out.println( " output -> K[" + key + "],V[" + outValue + "]"); //把更改后的節(jié)點(diǎn)對(duì)象輸出 context.write(key, outValue); int scaledDelta = Math.abs((int) (delta * CONVERGENCE_SCALING_FACTOR)); System.out.println("Delta = " + scaledDelta); //這個(gè)是計(jì)數(shù)器,mapreduce有很多計(jì)數(shù)器,自定義的要通過enum對(duì)象傳入建立和取值 //increment是增值的意思 context.getCounter(Counter.CONV_DELTAS).increment(scaledDelta); } } 4.main函數(shù)的實(shí)現(xiàn):
package org.robby.mr.pagerank; import org.apache.commons.io.*; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.*; import java.util.*; public final class Main { public static void main(String... args) throws Exception { //傳入輸入文件的路徑,與輸出文件的路徑 String inputFile = args[0]; String outputDir = args[1]; iterate(inputFile, outputDir); } public static void iterate(String input, String output) throws Exception { //因?yàn)檫@個(gè)是在hadoop上運(yùn)行的(hadoop jar ...),因此conf會(huì)自動(dòng)配上集群上hadoop的hdfs的入口 //后面的文件可以直接找filesystem,即hdfs的文件操作類 Configuration conf = new Configuration(); Path outputPath = new Path(output); outputPath.getFileSystem(conf).delete(outputPath, true); outputPath.getFileSystem(conf).mkdirs(outputPath); //建立輸入文件 Path inputPath = new Path(outputPath, "input.txt"); //建立文件,返回節(jié)點(diǎn)個(gè)數(shù) int numNodes = createInputFile(new Path(input), inputPath); int iter = 1; double desiredConvergence = 0.01; while (true) { //path建立時(shí),outputpath+后面的是文件路徑 Path jobOutputPath = new Path(outputPath, String.valueOf(iter)); System.out.println("======================================"); System.out.println("= Iteration: " + iter); System.out.println("= Input path: " + inputPath); System.out.println("= Output path: " + jobOutputPath); System.out.println("======================================"); //這里進(jìn)行mapreduce if (calcPageRank(inputPath, jobOutputPath, numNodes) < desiredConvergence) { System.out.println( "Convergence is below " + desiredConvergence + ", we're done"); break; } inputPath = jobOutputPath; iter++; } } //這個(gè)類的作用是把file文件的內(nèi)容加上pagerank值送到targetfile里 public static int createInputFile(Path file, Path targetFile) throws IOException { Configuration conf = new Configuration(); FileSystem fs = file.getFileSystem(conf); int numNodes = getNumNodes(file); double initialPageRank = 1.0 / (double) numNodes; //fs調(diào)用create方法根據(jù)path對(duì)象建立文件,返回該文件流 OutputStream os = fs.create(targetFile); //file文件的流迭代器 LineIterator iter = IOUtils .lineIterator(fs.open(file), "UTF8"); while (iter.hasNext()) { String line = iter.nextLine(); //獲取每行的內(nèi)容 String[] parts = StringUtils.split(line); //建立node對(duì)象 Node node = new Node() .setPageRank(initialPageRank) .setAdjacentNodeNames( Arrays.copyOfRange(parts, 1, parts.length)); IOUtils.write(parts[0] + '\t' + node.toString() + '\n', os); } os.close(); return numNodes; } //獲取節(jié)點(diǎn)數(shù)量,也就是獲取文件的行數(shù) public static int getNumNodes(Path file) throws IOException { Configuration conf = new Configuration(); FileSystem fs = file.getFileSystem(conf); return IOUtils.readLines(fs.open(file), "UTF8").size(); } //進(jìn)行mapreduce運(yùn)算 public static double calcPageRank(Path inputPath, Path outputPath, int numNodes) throws Exception { Configuration conf = new Configuration(); conf.setInt(Reduce.CONF_NUM_NODES_GRAPH, numNodes); Job job = Job.getInstance(conf); job.setJarByClass(Main.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //輸入的key和value都是文本,因此使用這個(gè)class,以第一個(gè)分隔符作為分割符號(hào),分為key和value job.setInputFormatClass(KeyValueTextInputFormat.class); //map輸出定義下 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); if (!job.waitForCompletion(true)) { throw new Exception("Job failed"); } long summedConvergence = job.getCounters().findCounter( Reduce.Counter.CONV_DELTAS).getValue(); double convergence = ((double) summedConvergence / Reduce.CONVERGENCE_SCALING_FACTOR) / (double) numNodes; System.out.println("======================================"); System.out.println("= Num nodes: " + numNodes); System.out.println("= Summed convergence: " + summedConvergence); System.out.println("= Convergence: " + convergence); System.out.println("======================================"); return convergence; } }注意:
這個(gè)是文件流操作的方法,使用 import org.apache.commons.io.IOUtils中的IOUtils類中的方法。
還有一個(gè)Arrays方法copyOfRange,可以返回?cái)?shù)組的指定位置,返回一個(gè)數(shù)組
OutputStream os = fs.create(targetFile); //file文件的流迭代器 LineIterator iter = IOUtils .lineIterator(fs.open(file), "UTF8"); while (iter.hasNext()) { String line = iter.nextLine(); String[] parts = StringUtils.split(line); Node node = new Node() .setPageRank(initialPageRank) .setAdjacentNodeNames( Arrays.copyOfRange(parts, 1, parts.length)); IOUtils.write(parts[0] + '\t' + node.toString() + '\n', os); }使用readLines方法,返回的是一個(gè)String數(shù)組,每個(gè)單元里放的是每行的內(nèi)容
IOUtils.readLines(fs.open(file), "UTF8").size();TextOutPutFormat的輸出的鍵值對(duì)可以是任何類型,輸出是自動(dòng)調(diào)用toString方法,把對(duì)象轉(zhuǎn)為字符串輸出。
使用stringUtils,截字符串為數(shù)組
String[] parts = StringUtils.splitPreserveAllTokens( value, fieldSeparator);另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。
網(wǎng)站標(biāo)題:學(xué)習(xí)日志---基于hadoop實(shí)現(xiàn)PageRank-創(chuàng)新互聯(lián)
文章路徑:http://weahome.cn/article/djishi.html