真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Springboot項(xiàng)目redisTemplate實(shí)現(xiàn)輕量級消息隊(duì)列的方法

背景

十余年的秦皇島網(wǎng)站建設(shè)經(jīng)驗(yàn),針對設(shè)計(jì)、前端、開發(fā)、售后、文案、推廣等六對一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。成都營銷網(wǎng)站建設(shè)的優(yōu)勢是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整秦皇島建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。創(chuàng)新互聯(lián)從事“秦皇島網(wǎng)站設(shè)計(jì)”,“秦皇島網(wǎng)站推廣”以來,每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。

公司項(xiàng)目有個(gè)需求, 前端上傳excel文件, 后端讀取數(shù)據(jù)、處理數(shù)據(jù)、返回錯(cuò)誤數(shù)據(jù), 最簡單的方式同步處理, 客戶端上傳文件后一直阻塞等待響應(yīng), 但用戶體驗(yàn)無疑很差, 處理數(shù)據(jù)可能十分耗時(shí), 沒人愿意傻等, 由于項(xiàng)目暫未使用ActiveMQ等消息隊(duì)列中間件, 而redis的lpush和rpop很適合作為一種輕量級的消息隊(duì)列實(shí)現(xiàn), 所以用它完成此次功能開發(fā)

一、本文涉及知識(shí)點(diǎn)

  • excel文件讀寫--阿里easyexcel sdk
  • 文件上傳、下載--騰訊云對象存儲(chǔ)
  • 遠(yuǎn)程服務(wù)調(diào)用--restTemplate
  • 生產(chǎn)者、消費(fèi)者--redisTemplate leftPush和rightPop操作
  • 異步處理數(shù)據(jù)--Executors線程池
  • 讀取網(wǎng)絡(luò)文件流--HttpClient
  • 自定義注解實(shí)現(xiàn)用戶身份認(rèn)證--JWT token認(rèn)證, 攔截器攔截標(biāo)注有@LoginRequired注解的請求入口

當(dāng)然, Java實(shí)現(xiàn)咯

涉及的知識(shí)點(diǎn)比較多, 每一個(gè)知識(shí)點(diǎn)都可以作為專題進(jìn)行學(xué)習(xí)分析, 本文將完整實(shí)現(xiàn)呈現(xiàn)出來, 后期拆分與小伙伴分享學(xué)習(xí)

二、項(xiàng)目目錄結(jié)構(gòu)

Spring boot項(xiàng)目redisTemplate實(shí)現(xiàn)輕量級消息隊(duì)列的方法

說明: 數(shù)據(jù)庫DAO層放到另一個(gè)模塊了, 不是本文重點(diǎn)

三、主要maven依賴

1、easyexcel

1.1.2-beta4

  
   com.alibaba
   easyexcel
   ${easyexcel-latestVersion}
  

JWT

  
   io.jsonwebtoken
   jjwt
   0.7.0
  

redis

  
   org.springframework.boot
   spring-boot-starter-redis
   1.3.5.RELEASE
  

騰訊cos

  
   com.qcloud
   cos_api
   5.4.5
  

四、流程

  1. 用戶上傳文件
  2. 將文件存儲(chǔ)到騰訊cos
  3. 將上傳后的文件id及上傳記錄保存到數(shù)據(jù)庫
  4. redis生產(chǎn)一條導(dǎo)入消息, 即保存文件id到redis
  5. 請求結(jié)束, 返回"處理中"狀態(tài)
  6. redis消費(fèi)消息
  7. 讀取cos文件, 異步處理數(shù)據(jù)
  8. 將錯(cuò)誤數(shù)據(jù)以excel形式上傳至cos, 以供用戶下載, 并更新處理狀態(tài)為"處理完成"
  9. 客戶端輪詢查詢處理狀態(tài), 并可以下載錯(cuò)誤文件
  10. 結(jié)束

五、實(shí)現(xiàn)效果

上傳文件

Spring boot項(xiàng)目redisTemplate實(shí)現(xiàn)輕量級消息隊(duì)列的方法

數(shù)據(jù)庫導(dǎo)入記錄

Spring boot項(xiàng)目redisTemplate實(shí)現(xiàn)輕量級消息隊(duì)列的方法

導(dǎo)入的數(shù)據(jù)

Spring boot項(xiàng)目redisTemplate實(shí)現(xiàn)輕量級消息隊(duì)列的方法

下載錯(cuò)誤文件

Spring boot項(xiàng)目redisTemplate實(shí)現(xiàn)輕量級消息隊(duì)列的方法

錯(cuò)誤數(shù)據(jù)提示

Spring boot項(xiàng)目redisTemplate實(shí)現(xiàn)輕量級消息隊(duì)列的方法

查詢導(dǎo)入記錄

Spring boot項(xiàng)目redisTemplate實(shí)現(xiàn)輕量級消息隊(duì)列的方法

六、代碼實(shí)現(xiàn)

1、導(dǎo)入excel控制層

  @LoginRequired
  @RequestMapping(value = "doImport", method = RequestMethod.POST)
  public JsonResponse doImport(@RequestParam("file") MultipartFile file, HttpServletRequest request) {
    PLUser user = getUser(request);
    return orderImportService.doImport(file, user.getId());
  }

2、service層

  @Override
  public JsonResponse doImport(MultipartFile file, Integer userId) {
    if (null == file || file.isEmpty()) {
      throw new ServiceException("文件不能為空");
    }

    String filename = file.getOriginalFilename();
    if (!checkFileSuffix(filename)) {
      throw new ServiceException("當(dāng)前僅支持xlsx格式的excel");
    }

    // 存儲(chǔ)文件
    String fileId = saveToOss(file);
    if (StringUtils.isBlank(fileId)) {
      throw new ServiceException("文件上傳失敗, 請稍后重試");
    }

    // 保存記錄到數(shù)據(jù)庫
    saveRecordToDB(userId, fileId, filename);

    // 生產(chǎn)一條訂單導(dǎo)入消息
    redisProducer.produce(RedisKey.orderImportKey, fileId);

    return JsonResponse.ok("導(dǎo)入成功, 處理中...");
  }

  /**
   * 校驗(yàn)文件格式
   * @param fileName
   * @return
   */
  private static boolean checkFileSuffix(String fileName) {
    if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") <= 0) {
      return false;
    }

    int pointIndex = fileName.lastIndexOf(".");
    String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase();
    if (".xlsx".equals(suffix)) {
      return true;
    }

    return false;
  }

  /**
   * 將文件存儲(chǔ)到騰訊OSS
   * @param file
   * @return
   */
  private String saveToOss(MultipartFile file) {
    InputStream ins = null;
    try {
      ins = file.getInputStream();
    } catch (IOException e) {
      e.printStackTrace();
    }

    String fileId;
    try {
      String originalFilename = file.getOriginalFilename();
      File f = new File(originalFilename);
      inputStreamToFile(ins, f);
      FileSystemResource resource = new FileSystemResource(f);

      MultiValueMap param = new LinkedMultiValueMap<>();
      param.add("file", resource);

      ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
      fileId = (String) responseResult.getData();
    } catch (Exception e) {
      fileId = null;
    }

    return fileId;
  }

3、redis生產(chǎn)者

@Service
public class RedisProducerImpl implements RedisProducer {

  @Autowired
  private RedisTemplate redisTemplate;

  @Override
  public JsonResponse produce(String key, String msg) {
    Map map = Maps.newHashMap();
    map.put("fileId", msg);
    redisTemplate.opsForList().leftPush(key, map);
    return JsonResponse.ok();
  }

}

4、redis消費(fèi)者

@Service
public class RedisConsumer {

  @Autowired
  public RedisTemplate redisTemplate;

  @Value("${txOssFileUrl}")
  private String txOssFileUrl;

  @Value("${txOssUploadUrl}")
  private String txOssUploadUrl;

  @PostConstruct
  public void init() {
    processOrderImport();
  }

  /**
   * 處理訂單導(dǎo)入
   */
  private void processOrderImport() {
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> {
      while (true) {
        Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.SECONDS);
        if (null == object) {
          continue;
        }
        String msg = JSON.toJSONString(object);
        executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl));
      }
    });
  }

}

5、處理任務(wù)線程類

public class OrderImportTask implements Runnable {
  public OrderImportTask(String msg, String txOssFileUrl, String txOssUploadUrl) {
    this.msg = msg;
    this.txOssFileUrl = txOssFileUrl;
    this.txOssUploadUrl = txOssUploadUrl;
  }
}

  /**
   * 注入bean
   */
  private void autowireBean() {
    this.restTemplate = BeanContext.getApplicationContext().getBean(RestTemplate.class);
    this.transactionTemplate = BeanContext.getApplicationContext().getBean(TransactionTemplate.class);
    this.orderImportService = BeanContext.getApplicationContext().getBean(OrderImportService.class);
  }

  @Override
  public void run() {
    // 注入bean
    autowireBean();

    JSONObject jsonObject = JSON.parseObject(msg);
    String fileId = jsonObject.getString("fileId");

    MultiValueMap param = new LinkedMultiValueMap<>();
    param.add("id", fileId);

    ResponseResult responseResult = restTemplate.postForObject(txOssFileUrl, param, ResponseResult.class);
    String fileUrl = (String) responseResult.getData();
    if (StringUtils.isBlank(fileUrl)) {
      return;
    }

    InputStream inputStream = HttpClientUtil.readFileFromURL(fileUrl);
    List list = ExcelUtil.read(inputStream);
    process(list, fileId);
  }

  /**
   * 將文件上傳至oss
   * @param file
   * @return
   */
  private String saveToOss(File file) {
    String fileId;
    try {
      FileSystemResource resource = new FileSystemResource(file);
      MultiValueMap param = new LinkedMultiValueMap<>();
      param.add("file", resource);

      ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
      fileId = (String) responseResult.getData();
    } catch (Exception e) {
      fileId = null;
    }
    return fileId;
  }

說明: 處理數(shù)據(jù)的業(yè)務(wù)邏輯代碼就不用貼了

6、上傳文件到cos

  @RequestMapping("/txOssUpload")
  @ResponseBody
  public ResponseResult txOssUpload(@RequestParam("file") MultipartFile file) throws UnsupportedEncodingException {
    if (null == file || file.isEmpty()) {
      return ResponseResult.fail("文件不能為空");
    }

    String originalFilename = file.getOriginalFilename();
    originalFilename = MimeUtility.decodeText(originalFilename);// 解決中文亂碼問題
    String contentType = getContentType(originalFilename);
    String key;

    InputStream ins = null;
    File f = null;

    try {
      ins = file.getInputStream();
      f = new File(originalFilename);
      inputStreamToFile(ins, f);
      key = iFileStorageClient.txOssUpload(new FileInputStream(f), originalFilename, contentType);
    } catch (Exception e) {
      return ResponseResult.fail(e.getMessage());
    } finally {
      if (null != ins) {
        try {
          ins.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
      if (f.exists()) {// 刪除臨時(shí)文件
        f.delete();
      }
    }

    return ResponseResult.ok(key);
  }

  public static void inputStreamToFile(InputStream ins,File file) {
    try {
      OutputStream os = new FileOutputStream(file);
      int bytesRead = 0;
      byte[] buffer = new byte[8192];
      while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
        os.write(buffer, 0, bytesRead);
      }
      os.close();
      ins.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  public String txOssUpload(FileInputStream inputStream, String key, String contentType) {
    key = Uuid.getUuid() + "-" + key;
    OSSUtil.txOssUpload(inputStream, key, contentType);
    try {
      if (null != inputStream) {
        inputStream.close();
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
    return key;
  }

  public static void txOssUpload(FileInputStream inputStream, String key, String contentType) {
    ObjectMetadata objectMetadata = new ObjectMetadata();
    try{
      int length = inputStream.available();
      objectMetadata.setContentLength(length);
    }catch (Exception e){
      logger.info(e.getMessage());
    }
    objectMetadata.setContentType(contentType);
    cosclient.putObject(txbucketName, key, inputStream, objectMetadata);
  }

7、下載文件

  /**
   * 騰訊云文件下載
   * @param response
   * @param id
   * @return
   */
  @RequestMapping("/txOssDownload")
  public Object txOssDownload(HttpServletResponse response, String id) {
    COSObjectInputStream cosObjectInputStream = iFileStorageClient.txOssDownload(id, response);
    String contentType = getContentType(id);
    FileUtil.txOssDownload(response, contentType, cosObjectInputStream, id);
    return null;
  }

  public static void txOssDownload(HttpServletResponse response, String contentType, InputStream fileStream, String fileName) {
    FileOutputStream fos = null;
    response.reset();
    OutputStream os = null;
    try {
      response.setContentType(contentType + "; charset=utf-8");
      if(!contentType.equals(PlConstans.FileContentType.image)){
        try {
          response.setHeader("Content-Disposition", "attachment; filename=" + new String(fileName.getBytes("UTF-8"), "ISO8859-1"));
        } catch (UnsupportedEncodingException e) {
          response.setHeader("Content-Disposition", "attachment; filename=" + fileName);
          logger.error("encoding file name failed", e);
        }
      }

      os = response.getOutputStream();

      byte[] b = new byte[1024 * 1024];
      int len;
      while ((len = fileStream.read(b)) > 0) {
        os.write(b, 0, len);
        os.flush();
        try {
          if(fos != null) {
            fos.write(b, 0, len);
            fos.flush();
          }
        } catch (Exception e) {
          logger.error(e.getMessage());
        }
      }
    } catch (IOException e) {
      IOUtils.closeQuietly(fos);
      fos = null;
    } finally {
      IOUtils.closeQuietly(os);
      IOUtils.closeQuietly(fileStream);
      if(fos != null) {
        IOUtils.closeQuietly(fos);
      }
    }
  }

8、讀取網(wǎng)絡(luò)文件流

  /**
   * 讀取網(wǎng)絡(luò)文件流
   * @param url
   * @return
   */
  public static InputStream readFileFromURL(String url) {
    if (StringUtils.isBlank(url)) {
      return null;
    }

    HttpClient httpClient = new DefaultHttpClient();
    HttpGet methodGet = new HttpGet(url);
    try {
      HttpResponse response = httpClient.execute(methodGet);
      if (response.getStatusLine().getStatusCode() == 200) {
        HttpEntity entity = response.getEntity();
        return entity.getContent();
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    return null;
  }

9、ExcelUtil

  /**
   * 讀excel
   * @param inputStream 文件輸入流
   * @return list集合
   */
  public static List read(InputStream inputStream) {
    return EasyExcelFactory.read(inputStream, new Sheet(1, 1));
  }

  /**
   * 寫excel
   * @param data list數(shù)據(jù)
   * @param clazz
   * @param saveFilePath 文件保存路徑
   * @throws IOException
   */
  public static void write(List<? extends BaseRowModel> data, Class<? extends BaseRowModel> clazz, String saveFilePath) throws IOException {
    File tempFile = new File(saveFilePath);
    OutputStream out = new FileOutputStream(tempFile);
    ExcelWriter writer = EasyExcelFactory.getWriter(out);
    Sheet sheet = new Sheet(1, 3, clazz, "Sheet1", null);
    writer.write(data, sheet);
    writer.finish();
    out.close();
  }

說明: 至此, 整個(gè)流程算是完整了, 下面將其他知識(shí)點(diǎn)代碼也貼出來參考

七、其他

1、@LoginRequired注解

/**
 * 在需要登錄驗(yàn)證的Controller的方法上使用此注解
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface LoginRequired {
}

2、MyControllerAdvice

@ControllerAdvice
public class MyControllerAdvice {

  @ResponseBody
  @ExceptionHandler(TokenValidationException.class)
  public JsonResponse tokenValidationExceptionHandler() {
    return JsonResponse.loginInvalid();
  }

  @ResponseBody
  @ExceptionHandler(ServiceException.class)
  public JsonResponse serviceExceptionHandler(ServiceException se) {
    return JsonResponse.fail(se.getMsg());
  }

  @ResponseBody
  @ExceptionHandler(Exception.class)
  public JsonResponse exceptionHandler(Exception e) {
    e.printStackTrace();
    return JsonResponse.fail(e.getMessage());
  }

}

3、AuthenticationInterceptor

public class AuthenticationInterceptor implements HandlerInterceptor {

  private static final String CURRENT_USER = "user";

  @Autowired
  private UserService userService;

  @Override
  public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
    // 如果不是映射到方法直接通過
    if (!(handler instanceof HandlerMethod)) {
      return true;
    }
    HandlerMethod handlerMethod = (HandlerMethod) handler;
    Method method = handlerMethod.getMethod();

    // 判斷接口是否有@LoginRequired注解, 有則需要登錄
    LoginRequired methodAnnotation = method.getAnnotation(LoginRequired.class);
    if (methodAnnotation != null) {
      // 驗(yàn)證token
      Integer userId = JwtUtil.verifyToken(request);
      PLUser plUser = userService.selectByPrimaryKey(userId);
      if (null == plUser) {
        throw new RuntimeException("用戶不存在,請重新登錄");
      }
      request.setAttribute(CURRENT_USER, plUser);
      return true;
    }
    return true;
  }

  @Override
  public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
  }

  @Override
  public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
  }
}

4、JwtUtil

  public static final long EXPIRATION_TIME = 2592_000_000L; // 有效期30天
  public static final String SECRET = "pl_token_secret";
  public static final String HEADER = "token";
  public static final String USER_ID = "userId";

  /**
   * 根據(jù)userId生成token
   * @param userId
   * @return
   */
  public static String generateToken(String userId) {
    HashMap map = new HashMap<>();
    map.put(USER_ID, userId);
    String jwt = Jwts.builder()
        .setClaims(map)
        .setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME))
        .signWith(SignatureAlgorithm.HS512, SECRET)
        .compact();
    return jwt;
  }

  /**
   * 驗(yàn)證token
   * @param request
   * @return 驗(yàn)證通過返回userId
   */
  public static Integer verifyToken(HttpServletRequest request) {
    String token = request.getHeader(HEADER);
    if (token != null) {
      try {
        Map body = Jwts.parser()
            .setSigningKey(SECRET)
            .parseClaimsJws(token)
            .getBody();

        for (Map.Entry entry : body.entrySet()) {
          Object key = entry.getKey();
          Object value = entry.getValue();
          if (key.toString().equals(USER_ID)) {
            return Integer.valueOf(value.toString());// userId
          }
        }
        return null;
      } catch (Exception e) {
        logger.error(e.getMessage());
        throw new TokenValidationException("unauthorized");
      }
    } else {
      throw new TokenValidationException("missing token");
    }
  }

結(jié)語: OK, 搞定,睡了, 好困

總結(jié)

以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對創(chuàng)新互聯(lián)的支持。


當(dāng)前文章:Springboot項(xiàng)目redisTemplate實(shí)現(xiàn)輕量級消息隊(duì)列的方法
當(dāng)前鏈接:http://weahome.cn/article/pciejg.html

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部