真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Java/Web如何調(diào)用Hadoop進(jìn)行MapReduce

這篇文章主要為大家展示了“Java/Web如何調(diào)用Hadoop進(jìn)行MapReduce”,內(nèi)容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“Java/Web如何調(diào)用Hadoop進(jìn)行MapReduce”這篇文章吧。

創(chuàng)新互聯(lián)公司專業(yè)為企業(yè)提供汝南網(wǎng)站建設(shè)、汝南做網(wǎng)站、汝南網(wǎng)站設(shè)計(jì)、汝南網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計(jì)與制作、汝南企業(yè)網(wǎng)站模板建站服務(wù),十載汝南做網(wǎng)站經(jīng)驗(yàn),不只是建網(wǎng)站,更提供有價(jià)值的思路和整體網(wǎng)絡(luò)服務(wù)。

Hadoop環(huán)境搭建詳見此文章https://www.jb51.net/article/33649.htm。

我們已經(jīng)知道Hadoop能夠通過Hadoop jar ***.jar input output的形式通過命令行來調(diào)用,那么如何將其封裝成一個服務(wù),讓Java/Web來調(diào)用它?使得用戶可以用方便的方式上傳文件到Hadoop并進(jìn)行處理,獲得結(jié)果。首先,***.jar是一個Hadoop任務(wù)類的封裝,我們可以在沒有jar的情況下運(yùn)行該類的main方法,將必要的參數(shù)傳遞給它。input 和output則將用戶上傳的文件使用Hadoop的JavaAPI put到Hadoop的文件系統(tǒng)中。然后再通過Hadoop的JavaAPI 從文件系統(tǒng)中取得結(jié)果文件。

搭建JavaWeb工程。本文使用Spring、SpringMVC、MyBatis框架, 當(dāng)然,這不是重點(diǎn),就算沒有使用任何框架也能實(shí)現(xiàn)。

項(xiàng)目框架如下:

Java/Web如何調(diào)用Hadoop進(jìn)行MapReduce

項(xiàng)目中使用到的jar包如下:

Java/Web如何調(diào)用Hadoop進(jìn)行MapReduceJava/Web如何調(diào)用Hadoop進(jìn)行MapReduce

在Spring的配置文件中,加入

 
    
    
    

使得項(xiàng)目支持文件上傳。

新建一個login.jsp 點(diǎn)擊登錄后進(jìn)入user/login

Java/Web如何調(diào)用Hadoop進(jìn)行MapReduce

user/login中處理登錄,登錄成功后,【在Hadoop文件系統(tǒng)中創(chuàng)建用戶文件夾】,然后跳轉(zhuǎn)到console.jsp

package com.chenjie.controller; 
 
import java.io.IOException; 
  
import javax.annotation.Resource; 
 
import javax.servlet.http.HttpServletRequest; 
 
import javax.servlet.http.HttpServletResponse; 
 
import org.apache.hadoop.conf.Configuration; 
 
import org.apache.hadoop.fs.FileSystem; 
 
import org.apache.hadoop.fs.Path; 
 
import org.springframework.stereotype.Controller; 
 
import org.springframework.web.bind.annotation.RequestMapping; 

import com.chenjie.pojo.JsonResult; 
 
import com.chenjie.pojo.User; 
 
import com.chenjie.service.UserService; 
 
import com.chenjie.util.AppConfig; 
 
import com.google.gson.Gson; 
/** 
 
 * 用戶請求控制器 
 
 * 
 
 * @author Chen 
 
 * 
 
 */ 
 
@Controller 
 
// 聲明當(dāng)前類為控制器 
 
@RequestMapping("/user") 
 
// 聲明當(dāng)前類的路徑 
 
public class UserController { 
 
  @Resource(name = "userService") 
 
  private UserService userService;// 由Spring容器注入一個UserService實(shí)例 
  /** 
 
   * 登錄 
 
   * 
 
   * @param user 
 
   *      用戶 
 
   * @param request 
 
   * @param response 
 
   * @throws IOException 
 
   */ 
 
  @RequestMapping("/login") 
 
  // 聲明當(dāng)前方法的路徑 
 
  public String login(User user, HttpServletRequest request, 
 
      HttpServletResponse response) throws IOException { 
 
    response.setContentType("application/json");// 設(shè)置響應(yīng)內(nèi)容格式為json 
 
    User result = userService.login(user);// 調(diào)用UserService的登錄方法 
 
    request.getSession().setAttribute("user", result); 
 
    if (result != null) { 
 
      createHadoopFSFolder(result); 
 
      return "console"; 
 
    } 
 
    return "login"; 
 
  } 
 
  public void createHadoopFSFolder(User user) throws IOException { 
 
    Configuration conf = new Configuration(); 
 
    conf.addResource(new Path("/opt/hadoop-1.2.1/conf/core-site.xml")); 
 
    conf.addResource(new Path("/opt/hadoop-1.2.1/conf/hdfs-site.xml")); 
 
 
 
    FileSystem fileSystem = FileSystem.get(conf); 
 
    System.out.println(fileSystem.getUri()); 
 
 
 
    Path file = new Path("/user/" + user.getU_username()); 
 
    if (fileSystem.exists(file)) { 
 
      System.out.println("haddop hdfs user foler exists."); 
 
      fileSystem.delete(file, true); 
 
      System.out.println("haddop hdfs user foler delete success."); 
 
    } 
 
    fileSystem.mkdirs(file); 
 
    System.out.println("haddop hdfs user foler creat success."); 
 
  } 
}

console.jsp中進(jìn)行文件上傳和任務(wù)提交、

Java/Web如何調(diào)用Hadoop進(jìn)行MapReduce

文件上傳和任務(wù)提交:

package com.chenjie.controller; 
 
import java.io.File; 
import java.io.IOException; 
import java.net.InetSocketAddress; 
import java.net.URI; 
import java.util.ArrayList; 
import java.util.Iterator; 
import java.util.List; 
 
import javax.servlet.http.HttpServletRequest; 
import javax.servlet.http.HttpServletResponse; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.mapred.JobClient; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.JobID; 
import org.apache.hadoop.mapred.JobStatus; 
import org.apache.hadoop.mapred.RunningJob; 
import org.springframework.stereotype.Controller; 
import org.springframework.web.bind.annotation.RequestMapping; 
import org.springframework.web.multipart.MultipartFile; 
import org.springframework.web.multipart.MultipartHttpServletRequest; 
import org.springframework.web.multipart.commons.CommonsMultipartResolver; 
 
import com.chenjie.pojo.User; 
import com.chenjie.util.Utils; 
 
@Controller 
// 聲明當(dāng)前類為控制器 
@RequestMapping("/hadoop") 
// 聲明當(dāng)前類的路徑 
public class HadoopController { 
 
  @RequestMapping("/upload") 
  // 聲明當(dāng)前方法的路徑 
  //文件上傳 
  public String upload(HttpServletRequest request, 
      HttpServletResponse response) throws IOException { 
    List fileList = (List) request.getSession() 
        .getAttribute("fileList");//得到用戶已上傳文件列表 
    if (fileList == null) 
      fileList = new ArrayList();//如果文件列表為空,則新建 
    User user = (User) request.getSession().getAttribute("user"); 
    if (user == null) 
      return "login";//如果用戶未登錄,則跳轉(zhuǎn)登錄頁面 
    CommonsMultipartResolver multipartResolver = new CommonsMultipartResolver( 
        request.getSession().getServletContext());//得到在Spring配置文件中注入的文件上傳組件 
    if (multipartResolver.isMultipart(request)) {//如果請求是文件請求 
      MultipartHttpServletRequest multiRequest = (MultipartHttpServletRequest) request; 
 
      Iterator iter = multiRequest.getFileNames();//得到文件名迭代器 
      while (iter.hasNext()) { 
        MultipartFile file = multiRequest.getFile((String) iter.next()); 
        if (file != null) { 
          String fileName = file.getOriginalFilename(); 
          File folder = new File("/home/chenjie/CJHadoopOnline/" 
              + user.getU_username()); 
          if (!folder.exists()) { 
            folder.mkdir();//如果文件不目錄存在,則在服務(wù)器本地創(chuàng)建 
          } 
          String path = "/home/chenjie/CJHadoopOnline/" 
              + user.getU_username() + "/" + fileName; 
 
          File localFile = new File(path); 
 
          file.transferTo(localFile);//將上傳文件拷貝到服務(wù)器本地目錄 
          // fileList.add(path); 
        } 
        handleUploadFiles(user, fileList);//處理上傳文件 
      } 
 
    } 
    request.getSession().setAttribute("fileList", fileList);//將上傳文件列表保存在Session中 
    return "console";//返回console.jsp繼續(xù)上傳文件 
  } 
 
  @RequestMapping("/wordcount") 
  //調(diào)用Hadoop進(jìn)行mapreduce 
  public void wordcount(HttpServletRequest request, 
      HttpServletResponse response) { 
    System.out.println("進(jìn)入controller wordcount "); 
    User user = (User) request.getSession().getAttribute("user"); 
    System.out.println(user); 
    // if(user == null) 
    // return "login"; 
    WordCount c = new WordCount();//新建單詞統(tǒng)計(jì)任務(wù) 
    String username = user.getU_username(); 
    String input = "hdfs://chenjie-virtual-machine:9000/user/" + username 
        + "/wordcountinput";//指定Hadoop文件系統(tǒng)的輸入文件夾 
    String output = "hdfs://chenjie-virtual-machine:9000/user/" + username 
        + "/wordcountoutput";//指定Hadoop文件系統(tǒng)的輸出文件夾 
    String reslt = output + "/part-r-00000";//默認(rèn)輸出文件 
    try { 
      Thread.sleep(3*1000); 
      c.main(new String[] { input, output });//調(diào)用單詞統(tǒng)計(jì)任務(wù) 
      Configuration conf = new Configuration();//新建Hadoop配置 
      conf.addResource(new Path("/opt/hadoop-1.2.1/conf/core-site.xml"));//添加Hadoop配置,找到Hadoop部署信息 
      conf.addResource(new Path("/opt/hadoop-1.2.1/conf/hdfs-site.xml"));//Hadoop配置,找到文件系統(tǒng) 
 
      FileSystem fileSystem = FileSystem.get(conf);//得打文件系統(tǒng) 
      Path file = new Path(reslt);//找到輸出結(jié)果文件 
      FSDataInputStream inStream = fileSystem.open(file);//打開 
      URI uri = file.toUri();//得到輸出文件路徑 
      System.out.println(uri); 
      String data = null; 
      while ((data = inStream.readLine()) != null) { 
        //System.out.println(data); 
        response.getOutputStream().println(data);//講結(jié)果文件寫回用戶網(wǎng)頁 
      } 
//     InputStream in = fileSystem.open(file); 
//     OutputStream out = new FileOutputStream("result.txt"); 
//     IOUtils.copyBytes(in, out, 4096, true); 
      inStream.close(); 
    } catch (Exception e) { 
      System.err.println(e.getMessage()); 
    } 
  } 
 
  @RequestMapping("/MapReduceStates") 
  //得到MapReduce的狀態(tài) 
  public void mapreduce(HttpServletRequest request, 
      HttpServletResponse response) { 
    float[] progress=new float[2]; 
    try { 
      Configuration conf1=new Configuration(); 
      conf1.set("mapred.job.tracker", Utils.JOBTRACKER); 
       
      JobStatus jobStatus = Utils.getJobStatus(conf1); 
//     while(!jobStatus.isJobComplete()){ 
//       progress = Utils.getMapReduceProgess(jobStatus); 
//       response.getOutputStream().println("map:" + progress[0] + "reduce:" + progress[1]); 
//       Thread.sleep(1000); 
//     } 
      JobConf jc = new JobConf(conf1); 
       
      JobClient jobClient = new JobClient(jc); 
      JobStatus[] jobsStatus = jobClient.getAllJobs();  
      //這樣就得到了一個JobStatus數(shù)組,隨便取出一個元素取名叫jobStatus  
      jobStatus = jobsStatus[0];  
      JobID jobID = jobStatus.getJobID(); //通過JobStatus獲取JobID  
      RunningJob runningJob = jobClient.getJob(jobID); //通過JobID得到RunningJob對象  
      runningJob.getJobState();//可以獲取作業(yè)狀態(tài),狀態(tài)有五種,為JobStatus.Failed 、JobStatus.KILLED、JobStatus.PREP、JobStatus.RUNNING、JobStatus.SUCCEEDED  
      jobStatus.getUsername();//可以獲取運(yùn)行作業(yè)的用戶名。  
      runningJob.getJobName();//可以獲取作業(yè)名。  
      jobStatus.getStartTime();//可以獲取作業(yè)的開始時(shí)間,為UTC毫秒數(shù)。  
      float map = runningJob.mapProgress();//可以獲取Map階段完成的比例,0~1,  
      System.out.println("map=" + map); 
      float reduce = runningJob.reduceProgress();//可以獲取Reduce階段完成的比例。 
      System.out.println("reduce="+reduce); 
      runningJob.getFailureInfo();//可以獲取失敗信息。  
      runningJob.getCounters();//可以獲取作業(yè)相關(guān)的計(jì)數(shù)器,計(jì)數(shù)器的內(nèi)容和作業(yè)監(jiān)控頁面上看到的計(jì)數(shù)器的值一樣。  
       
       
    } catch (IOException e) { 
      progress[0] = 0; 
      progress[1] = 0; 
    } 
   
    request.getSession().setAttribute("map", progress[0]); 
    request.getSession().setAttribute("reduce", progress[1]); 
  } 
   
  //處理文件上傳 
  public void handleUploadFiles(User user, List fileList) { 
    File folder = new File("/home/chenjie/CJHadoopOnline/" 
        + user.getU_username()); 
    if (!folder.exists()) 
      return; 
    if (folder.isDirectory()) { 
      File[] files = folder.listFiles(); 
      for (File file : files) { 
        System.out.println(file.getName()); 
        try { 
          putFileToHadoopFSFolder(user, file, fileList);//將單個文件上傳到Hadoop文件系統(tǒng) 
        } catch (IOException e) { 
          System.err.println(e.getMessage()); 
        } 
      } 
    } 
  } 
 
  //將單個文件上傳到Hadoop文件系統(tǒng) 
  private void putFileToHadoopFSFolder(User user, File file, 
      List fileList) throws IOException { 
    Configuration conf = new Configuration(); 
    conf.addResource(new Path("/opt/hadoop-1.2.1/conf/core-site.xml")); 
    conf.addResource(new Path("/opt/hadoop-1.2.1/conf/hdfs-site.xml")); 
 
    FileSystem fileSystem = FileSystem.get(conf); 
    System.out.println(fileSystem.getUri()); 
 
    Path localFile = new Path(file.getAbsolutePath()); 
    Path foler = new Path("/user/" + user.getU_username() 
        + "/wordcountinput"); 
    if (!fileSystem.exists(foler)) { 
      fileSystem.mkdirs(foler); 
    } 
     
    Path hadoopFile = new Path("/user/" + user.getU_username() 
        + "/wordcountinput/" + file.getName()); 
//   if (fileSystem.exists(hadoopFile)) { 
//     System.out.println("File exists."); 
//   } else { 
//     fileSystem.mkdirs(hadoopFile); 
//   } 
    fileSystem.copyFromLocalFile(true, true, localFile, hadoopFile); 
    fileList.add(hadoopFile.toUri().toString()); 
 
  } 
 
}

啟動Hadoop:

Java/Web如何調(diào)用Hadoop進(jìn)行MapReduce

運(yùn)行結(jié)果:

可以在任意平臺下,登錄該項(xiàng)目地址,上傳文件,得到結(jié)果。

Java/Web如何調(diào)用Hadoop進(jìn)行MapReduce

Java/Web如何調(diào)用Hadoop進(jìn)行MapReduce


Java/Web如何調(diào)用Hadoop進(jìn)行MapReduce

Java/Web如何調(diào)用Hadoop進(jìn)行MapReduce

Java/Web如何調(diào)用Hadoop進(jìn)行MapReduce

Java/Web如何調(diào)用Hadoop進(jìn)行MapReduce

運(yùn)行成功。

以上是“Java/Web如何調(diào)用Hadoop進(jìn)行MapReduce”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!


網(wǎng)站標(biāo)題:Java/Web如何調(diào)用Hadoop進(jìn)行MapReduce
新聞來源:http://weahome.cn/article/pieese.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部