SpringBoot项目——RedisTemplate实现轻量级消息队列

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> SpringBoot项目——RedisTemplate实现轻量级消息队列

点击上方“Java知音”,选择“置顶公众号”

技术文章第一时间送达!

作者:wangzaiplus www.jianshu.com/p/0c684076367e

背景

公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用ActiveMQ等消息队列中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发

一、本文涉及知识点

  • excel文件读写--阿里easyexcel sdk
  • 文件上传、下载--腾讯云对象存储
  • 远程服务调用--restTemplate
  • 生产者、消费者--redisTemplate leftPush和rightPop操作
  • 异步处理数据--Executors线程池
  • 读取网络文件流--HttpClient
  • 自定义注解实现用户身份认证--JWT token认证, 拦截器拦截标注有@LoginRequired注解的请求入口
  • 文件上传、下载–腾讯云对象存储

    生产者、消费者–redisTemplate leftPush和rightPop操作

    读取网络文件流–HttpClient

    当然, Java实现咯

    涉及的知识点比较多, 每一个知识点都可以作为专题进行学习分析, 本文将完整实现呈现出来, 后期拆分与小伙伴分享学习

    二、项目目录结构

    SpringBoot项目:RedisTemplate实现轻量级消息队列

    说明: 数据库DAO层放到另一个模块了, 不是本文重点

    三、主要maven依赖

    1.easyexcel

    
    easyexcel-latestVersion1.1.2-beta4/easyexcel-latestVersion
    
            dependency
                groupIdcom.alibaba/groupId
                artifactIdeasyexcel/artifactId
                version${easyexcel-latestVersion}/version
            /dependency
    

    2.JWT

    
            dependency
                groupIdio.jsonwebtoken/groupId
                artifactIdjjwt/artifactId
                version0.7.0/version
            /dependency
    

    3.redis

    
            dependency
                groupIdorg.springframework.boot/groupId
                artifactIdspring-boot-starter-redis/artifactId
                version1.3.5.RELEASE/version
            /dependency
    

    4.腾讯cos

    
            dependency
                groupIdcom.qcloud/groupId
                artifactIdcos_api/artifactId
                version5.4.5/version
            /dependency
    

    四、流程

  • 用户上传文件
  • 将文件存储到腾讯cos
  • 将上传后的文件id及上传记录保存到数据库
  • redis生产一条导入消息, 即保存文件id到redis
  • 请求结束, 返回"处理中"状态
  • redis消费消息
  • 读取cos文件, 异步处理数据
  • 将错误数据以excel形式上传至cos, 以供用户下载, 并更新处理状态为"处理完成"
  • 客户端轮询查询处理状态, 并可以下载错误文件
  • 结束
  • 将文件存储到腾讯cos

    redis生产一条导入消息, 即保存文件id到redis

    redis消费消息

    将错误数据以excel形式上传至cos, 以供用户下载, 并更新处理状态为”处理完成”

    结束

    五、实现效果

    1.上传文件

    SpringBoot项目:RedisTemplate实现轻量级消息队列

    2.数据库导入记录

    SpringBoot项目:RedisTemplate实现轻量级消息队列

    3.导入的数据

    SpringBoot项目:RedisTemplate实现轻量级消息队列

    4.下载错误文件

    SpringBoot项目:RedisTemplate实现轻量级消息队列

    5.错误数据提示

    SpringBoot项目:RedisTemplate实现轻量级消息队列

    6.查询导入记录

    SpringBoot项目:RedisTemplate实现轻量级消息队列

    六、代码实现

    1、导入excel控制层

    
        @LoginRequired
        @RequestMapping(value = "doImport", method = RequestMethod.POST)
        public JsonResponse doImport(@RequestParam("file") MultipartFile file, HttpServletRequest request) {
            PLUser user = getUser(request);
            return orderImportService.doImport(file, user.getId());
        }
    

    2、service层

    
        @Override
        public JsonResponse doImport(MultipartFile file, Integer userId) {
            if (null == file || file.isEmpty()) {
                throw new ServiceException("文件不能为空");
            }
    
            String filename = file.getOriginalFilename();
            if (!checkFileSuffix(filename)) {
                throw new ServiceException("当前仅支持xlsx格式的excel");
            }
    
            // 存储文件
            String fileId = saveToOss(file);
            if (StringUtils.isBlank(fileId)) {
                throw new ServiceException("文件上传失败, 请稍后重试");
            }
    
            // 保存记录到数据库
            saveRecordToDB(userId, fileId, filename);
    
            // 生产一条订单导入消息
            redisProducer.produce(RedisKey.orderImportKey, fileId);
    
            return JsonResponse.ok("导入成功, 处理中...");
        }
    
        /**
         * 校验文件格式
         * @param fileName
         * @return
         */
        private static boolean checkFileSuffix(String fileName) {
            if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") = 0) {
                return false;
            }
    
            int pointIndex = fileName.lastIndexOf(".");
            String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase();
            if (".xlsx".equals(suffix)) {
                return true;
            }
    
            return false;
        }
    
       /**
         * 将文件存储到腾讯OSS
         * @param file
         * @return
         */
        private String saveToOss(MultipartFile file) {
            InputStream ins = null;
            try {
                ins = file.getInputStream();
            } catch (IOException e) {
                e.printStackTrace();
            }
    
            String fileId;
            try {
                String originalFilename = file.getOriginalFilename();
                File f = new File(originalFilename);
                inputStreamToFile(ins, f);
                FileSystemResource resource = new FileSystemResource(f);
    
                MultiValueMapString, Object param = new LinkedMultiValueMap();
                param.add("file", resource);
    
                ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
                fileId = (String) responseResult.getData();
            } catch (Exception e) {
                fileId = null;
            }
    
            return fileId;
        }
    

    3、redis生产者

    
    @Service
    public class RedisProducerImpl implements RedisProducer {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        @Override
        public JsonResponse produce(String key, String msg) {
            MapString, String map = Maps.newHashMap();
            map.put("fileId", msg);
            redisTemplate.opsForList().leftPush(key, map);
            return JsonResponse.ok();
        }
    
    }
    

    4、redis消费者

    
    @Service
    public class RedisConsumer {
    
        @Autowired
        public RedisTemplate redisTemplate;
    
        @Value("${txOssFileUrl}")
        private String txOssFileUrl;
    
        @Value("${txOssUploadUrl}")
        private String txOssUploadUrl;
    
        @PostConstruct
        public void init() {
            processOrderImport();
        }
    
        /**
         * 处理订单导入
         */
        private void processOrderImport() {
            ExecutorService executorService = Executors.newCachedThreadPool();
            executorService.execute(() - {
                while (true) {
                    Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.SECONDS);
                    if (null == object) {
                        continue;
                    }
                    String msg = JSON.toJSONString(object);
                    executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl));
                }
            });
        }
    
    }
    

    5、处理任务线程类

    
    public class OrderImportTask implements Runnable {
        public OrderImportTask(String msg, String txOssFileUrl, String txOssUploadUrl) {
            this.msg = msg;
            this.txOssFileUrl = txOssFileUrl;
            this.txOssUploadUrl = txOssUploadUrl;
        }
    }
    
        /**
         * 注入bean
         */
        private void autowireBean() {
            this.restTemplate = BeanContext.getApplicationContext().getBean(RestTemplate.class);
            this.transactionTemplate = BeanContext.getApplicationContext().getBean(TransactionTemplate.class);
            this.orderImportService = BeanContext.getApplicationContext().getBean(OrderImportService.class);
        }
    
        @Override
        public void run() {
            // 注入bean
            autowireBean();
    
            JSONObject jsonObject = JSON.parseObject(msg);
            String fileId = jsonObject.getString("fileId");
    
            MultiValueMapString, Object param = new LinkedMultiValueMap();
            param.add("id", fileId);
    
            ResponseResult responseResult = restTemplate.postForObject(txOssFileUrl, param, ResponseResult.class);
            String fileUrl = (String) responseResult.getData();
            if (StringUtils.isBlank(fileUrl)) {
                return;
            }
    
            InputStream inputStream = HttpClientUtil.readFileFromURL(fileUrl);
            ListObject list = ExcelUtil.read(inputStream);
            process(list, fileId);
        }
    
        /**
         * 将文件上传至oss
         * @param file
         * @return
         */
        private String saveToOss(File file) {
            String fileId;
            try {
                FileSystemResource resource = new FileSystemResource(file);
                MultiValueMapString, Object param = new LinkedMultiValueMap();
                param.add("file", resource);
    
                ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
                fileId = (String) responseResult.getData();
            } catch (Exception e) {
                fileId = null;
            }
            return fileId;
        }
    

    说明: 处理数据的业务逻辑代码就不用贴了

    6、上传文件到cos

    
        @RequestMapping("/txOssUpload")
        @ResponseBody
        public ResponseResult txOssUpload(@RequestParam("file") MultipartFile file) throws UnsupportedEncodingException {
            if (null == file || file.isEmpty()) {
                return ResponseResult.fail("文件不能为空");
            }
    
            String originalFilename = file.getOriginalFilename();
            originalFilename = MimeUtility.decodeText(originalFilename);// 解决中文乱码问题
            String contentType = getContentType(originalFilename);
            String key;
    
            InputStream ins = null;
            File f = null;
    
            try {
                ins = file.getInputStream();
                f = new File(originalFilename);
                inputStreamToFile(ins, f);
                key = iFileStorageClient.txOssUpload(new FileInputStream(f), originalFilename, contentType);
            } catch (Exception e) {
                return ResponseResult.fail(e.getMessage());
            } finally {
                if (null != ins) {
                    try {
                        ins.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (f.exists()) {// 删除临时文件
                    f.delete();
                }
            }
    
            return ResponseResult.ok(key);
        }
    
        public static void inputStreamToFile(InputStream ins,File file) {
            try {
                OutputStream os = new FileOutputStream(file);
                int bytesRead = 0;
                byte[] buffer = new byte[8192];
                while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
                    os.write(buffer, 0, bytesRead);
                }
                os.close();
                ins.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public String txOssUpload(FileInputStream inputStream, String key, String contentType) {
            key = Uuid.getUuid() + "-" + key;
            OSSUtil.txOssUpload(inputStream, key, contentType);
            try {
                if (null != inputStream) {
                    inputStream.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return key;
        }
    
        public static void txOssUpload(FileInputStream inputStream, String key, String contentType) {
            ObjectMetadata objectMetadata = new ObjectMetadata();
            try{
                int length = inputStream.available();
                objectMetadata.setContentLength(length);
            }catch (Exception e){
                logger.info(e.getMessage());
            }
            objectMetadata.setContentType(contentType);
            cosclient.putObject(txbucketName, key, inputStream, objectMetadata);
        }
    

    7、下载文件

    
        /**
         * 腾讯云文件下载
         * @param response
         * @param id
         * @return
         */
        @RequestMapping("/txOssDownload")
        public Object txOssDownload(HttpServletResponse response, String id) {
            COSObjectInputStream cosObjectInputStream = iFileStorageClient.txOssDownload(id, response);
            String contentType = getContentType(id);
            FileUtil.txOssDownload(response, contentType, cosObjectInputStream, id);
            return null;
        }
    
        public static void txOssDownload(HttpServletResponse response, String contentType, InputStream fileStream, String fileName) {
            FileOutputStream fos = null;
            response.reset();
            OutputStream os = null;
            try {
                response.setContentType(contentType + "; charset=utf-8");
                if(!contentType.equals(PlConstans.FileContentType.image)){
                    try {
                        response.setHeader("Content-Disposition", "attachment; filename=" + new String(fileName.getBytes("UTF-8"), "ISO8859-1"));
                    } catch (UnsupportedEncodingException e) {
                        response.setHeader("Content-Disposition", "attachment; filename=" + fileName);
                        logger.error("encoding file name failed", e);
                    }
                }
    
                os = response.getOutputStream();
    
                byte[] b = new byte[1024 * 1024];
                int len;
                while ((len = fileStream.read(b))  0) {
                    os.write(b, 0, len);
                    os.flush();
                    try {
                        if(fos != null) {
                            fos.write(b, 0, len);
                            fos.flush();
                        }
                    } catch (Exception e) {
                        logger.error(e.getMessage());
                    }
                }
            } catch (IOException e) {
                IOUtils.closeQuietly(fos);
                fos = null;
            } finally {
                IOUtils.closeQuietly(os);
                IOUtils.closeQuietly(fileStream);
                if(fos != null) {
                    IOUtils.closeQuietly(fos);
                }
            }
        }
    

    8、读取网络文件流

    
        /**
         * 读取网络文件流
         * @param url
         * @return
         */
        public static InputStream readFileFromURL(String url) {
            if (StringUtils.isBlank(url)) {
                return null;
            }
    
            HttpClient httpClient = new DefaultHttpClient();
            HttpGet methodGet = new HttpGet(url);
            try {
                HttpResponse response = httpClient.execute(methodGet);
                if (response.getStatusLine().getStatusCode() == 200) {
                    HttpEntity entity = response.getEntity();
                    return entity.getContent();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return null;
        }
    

    9、ExcelUtil

    
        /**
         * 读excel
         * @param inputStream 文件输入流
         * @return list集合
         */
        public static ListObject read(InputStream inputStream) {
            return EasyExcelFactory.read(inputStream, new Sheet(1, 1));
        }
    
        /**
         * 写excel
         * @param data list数据
         * @param clazz
         * @param saveFilePath 文件保存路径
         * @throws IOException
         */
        public static void write(List? extends BaseRowModel data, Class? extends BaseRowModel clazz, String saveFilePath) throws IOException {
            File tempFile = new File(saveFilePath);
            OutputStream out = new FileOutputStream(tempFile);
            ExcelWriter writer = EasyExcelFactory.getWriter(out);
            Sheet sheet = new Sheet(1, 3, clazz, "Sheet1", null);
            writer.write(data, sheet);
            writer.finish();
            out.close();
        }
    

    说明: 至此, 整个流程算是完整了, 下面将其他知识点代码也贴出来参考

    七、其他

    1、@LoginRequired注解

    
    /**
     * 在需要登录验证的Controller的方法上使用此注解
     */
    @Target({ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    public @interface LoginRequired {
    }
    

    2、MyControllerAdvice

    
    @ControllerAdvice
    public class MyControllerAdvice {
    
        @ResponseBody
        @ExceptionHandler(TokenValidationException.class)
        public JsonResponse tokenValidationExceptionHandler() {
            return JsonResponse.loginInvalid();
        }
    
        @ResponseBody
        @ExceptionHandler(ServiceException.class)
        public JsonResponse serviceExceptionHandler(ServiceException se) {
            return JsonResponse.fail(se.getMsg());
        }
    
        @ResponseBody
        @ExceptionHandler(Exception.class)
        public JsonResponse exceptionHandler(Exception e) {
            e.printStackTrace();
            return JsonResponse.fail(e.getMessage());
        }
    
    }
    

    3、AuthenticationInterceptor

    
    public class AuthenticationInterceptor implements HandlerInterceptor {
    
        private static final String CURRENT_USER = "user";
    
        @Autowired
        private UserService userService;
    
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
            // 如果不是映射到方法直接通过
            if (!(handler instanceof HandlerMethod)) {
                return true;
            }
            HandlerMethod handlerMethod = (HandlerMethod) handler;
            Method method = handlerMethod.getMethod();
    
            // 判断接口是否有@LoginRequired注解, 有则需要登录
            LoginRequired methodAnnotation = method.getAnnotation(LoginRequired.class);
            if (methodAnnotation != null) {
                // 验证token
                Integer userId = JwtUtil.verifyToken(request);
                PLUser plUser = userService.selectByPrimaryKey(userId);
                if (null == plUser) {
                    throw new RuntimeException("用户不存在,请重新登录");
                }
                request.setAttribute(CURRENT_USER, plUser);
                return true;
            }
            return true;
        }
    
        @Override
        public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
        }
    
        @Override
        public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
        }
    }
    

    4、JwtUtil

    
        public static final long EXPIRATION_TIME = 2592_000_000L; // 有效期30天
        public static final String SECRET = "pl_token_secret";
        public static final String HEADER = "token";
        public static final String USER_ID = "userId";
    
        /**
         * 根据userId生成token
         * @param userId
         * @return
         */
        public static String generateToken(String userId) {
            HashMapString, Object map = new HashMap();
            map.put(USER_ID, userId);
            String jwt = Jwts.builder()
                    .setClaims(map)
                    .setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME))
                    .signWith(SignatureAlgorithm.HS512, SECRET)
                    .compact();
            return jwt;
        }
    
        /**
         * 验证token
         * @param request
         * @return 验证通过返回userId
         */
        public static Integer verifyToken(HttpServletRequest request) {
            String token = request.getHeader(HEADER);
            if (token != null) {
                try {
                    MapString, Object body = Jwts.parser()
                            .setSigningKey(SECRET)
                            .parseClaimsJws(token)
                            .getBody();
    
                    for (Map.Entry entry : body.entrySet()) {
                        Object key = entry.getKey();
                        Object value = entry.getValue();
                        if (key.toString().equals(USER_ID)) {
                            return Integer.valueOf(value.toString());// userId
                        }
                    }
                    return null;
                } catch (Exception e) {
                    logger.error(e.getMessage());
                    throw new TokenValidationException("unauthorized");
                }
            } else {
                throw new TokenValidationException("missing token");
            }
        }
    

    END

    Java面试题专栏

    SpringBoot项目:RedisTemplate实现轻量级消息队列

    我知道你 “在看SpringBoot项目:RedisTemplate实现轻量级消息队列

    原文始发于微信公众号(Java知音):

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> SpringBoot项目——RedisTemplate实现轻量级消息队列


     上一篇
    Git 从入门到精通,这篇包教包会! Git 从入门到精通,这篇包教包会!
    点击上方“Java知音”,选择“置顶公众号” 技术文章第一时间送达! 作者:静默虚空 juejin.im/post/5c8296f85188257e3941b2d4 简介Git 是什么Git 是一个开源的分布式版本控制系统。
    下一篇 
    SpringBoot 优雅的配置拦截器方式 SpringBoot 优雅的配置拦截器方式
    点击上方“后端技术精选”,选择“置顶公众号” 技术文章第一时间送达! 作者:攻城狮-飞牛  my.oschina.net/bianxin/blog/2876640 其实spring boot拦截器的配置方式和springMV