异步并发利器——实际项目中使用CompletionService提升系统性能的一次实践

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 异步并发利器——实际项目中使用CompletionService提升系统性能的一次实践

点击上方“Java知音”,选择“置顶公众号”

技术文章第一时间送达!

场景

随着互联网应用的深入,很多传统行业也都需要接入到互联网。我们公司也是这样,保险核心需要和很多保险中介对接,比如阿里、京东等等。这些公司对于接口服务的性能有些比较高的要求,传统的核心无法满足要求,所以信息技术部领导高瞻远瞩,决定开发互联网接入服务,满足来自性能的需求。

概念

CompletionServiceExecutorBlockingQueue的功能融合在一起,将Callable任务提交给CompletionService来执行,然后使用类似于队列操作的take和poll等方法来获得已完成的结果,而这些结果会在完成时被封装为Future。对于更多的概念,请参阅其他网络文档。

线程池的设计,阿里开发手册说过不要使用Java Executors 提供的默认线程池,因此需要更接近实际的情况来自定义一个线程池,根据多次压测,采用的线程池如下:


  public ExecutorService getThreadPool(){
          return new ThreadPoolExecutor(75,
                  125,
                  180000,
                  TimeUnit.MILLISECONDS,
                  new LinkedBlockingDeque(450),
                  new ThreadPoolExecutor.CallerRunsPolicy());
      }

说明:公司的业务为低频交易,对于单次调用性能要求高,但是并发压力根本不大,所以 阻塞队列已满且线程数达到最大值时所采取的饱和策略为调用者执行。

实现

业务

投保业务主要涉及这几个大的方面:投保校验、核保校验、保费试算

  • **投保校验:******最主要的是要查询客户黑名单和风险等级,都是千万级的表。而且投保人和被保人都需要校验
  • **核保校验:******除了常规的核保规则校验,查询千万级的大表,还需要调用外部智能核保接口获得用户的风险等级,投保人和被保人都需要校验
  • **保费试算:******需要计算每个险种的保费
  • 核保校验:****除了常规的核保规则校验,查询千万级的大表,还需要调用外部智能核保接口获得用户的风险等级,投保人和被保人都需要校验

    设计

    根据上面的业务,如果串行执行的话,单次性能肯定不高,所以考虑多线程异步执行获得校验结果,再对结果综合判断

  • **投保校验:**采用一个线程(也可以根据投保人和被保人数量来采用几个线程)
  • **核保校验:**
    • **常规校验:**采用一个线程
    • **外部调用:**有几个用户(指投保人和被保人)就采用几个线程
    • 核保校验:

      外部调用:有几个用户(指投保人和被保人)就采用几个线程

      保费计算:有几个险种就采用几个线程,最后合并得到整个的保费

      代码

      以下代码是样例,实际逻辑已经去掉

      先创建投保、核保(常规、外部调用)、保费计算4个业务服务类:

      投保服务类:InsuranceVerificationServiceImpl,假设耗时50ms

      
          @Service
          public class InsuranceVerificationServiceImpl implements InsuranceVerificationService {
              private static final Logger logger = LoggerFactory.getLogger(InsuranceVerificationServiceImpl.class);
              @Override
              public TaskResponseModelObject insuranceCheck(String key, PolicyModel policyModel) {
                  try {
                      //假设耗时50ms
                      Thread.sleep(50);            
                      return TaskResponseModel.success().setKey(key).setData(policyModel);
                  } catch (InterruptedException e) {
                      logger.warn(e.getMessage());            
                      return TaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());
                  }
              }
          }
      

      核保常规校验服务类:UnderwritingCheckServiceImpl,假设耗时50ms

      
          @Service
          public class UnderwritingCheckServiceImpl implements UnderwritingCheckService {
              private static final Logger logger = LoggerFactory.getLogger(UnderwritingCheckServiceImpl.class);
              @Override
              public TaskResponseModelObject underwritingCheck(String key, PolicyModel policyModel) {
                  try {
                      //假设耗时50ms
                      Thread.sleep(50);            
                      return TaskResponseModel.success().setKey(key).setData(policyModel);
                  } catch (InterruptedException e) {
                      logger.warn(e.getMessage());            
                      return TaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());
                  }
              }
          }
      

      核保外部调用服务类:ExternalCallServiceImpl,假设耗时200ms

      
          @Service
          public class ExternalCallServiceImpl implements ExternalCallService {
              private static final Logger logger = LoggerFactory.getLogger(ExternalCallServiceImpl.class);
              @Override
              public TaskResponseModelObject externalCall(String key, Insured insured) {
                  try {
                      //假设耗时200ms
                      Thread.sleep(200);
                      ExternalCallResultModel externalCallResultModel = new ExternalCallResultModel();
                      externalCallResultModel.setIdcard(insured.getIdcard());
                      externalCallResultModel.setScore(200);
                      return TaskResponseModel.success().setKey(key).setData(externalCallResultModel);
                  } catch (InterruptedException e) {
                      logger.warn(e.getMessage());
                      return TaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());
                  }
              }
          }
      

      试算服务类:TrialCalculationServiceImpl,假设耗时50ms

      
          @Service
          public class TrialCalculationServiceImpl implements TrialCalculationService {
              private static final Logger logger = LoggerFactory.getLogger(TrialCalculationServiceImpl.class);
              @Override
              public TaskResponseModelObject trialCalc(String key, Risk risk) {
                  try {
                      //假设耗时50ms
                      Thread.sleep(50);
                      return TaskResponseModel.success().setKey(key).setData(risk);
                  } catch (InterruptedException e) {
                      logger.warn(e.getMessage());
                      return TaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());
                  }
              }
          }
      

      统一返回接口类:TaskResponseModel, 上面4个服务的方法统一返回TaskResponseModel

      
        @Data
        @ToString
        @NoArgsConstructor
        @AllArgsConstructor
        @EqualsAndHashCode
        @Accessors(chain = true)
        public class TaskResponseModelT extends Object implements Serializable {
            private String key;           //唯一调用标志
            private String resultCode;    //结果码
            private String resultMessage; //结果信息
            private T data;               //业务处理结果
      
            public static TaskResponseModelObject success() {
                TaskResponseModelObject taskResponseModel = new TaskResponseModel();
                taskResponseModel.setResultCode("200");
                return taskResponseModel;
            }
            public static TaskResponseModelObject failure() {
                TaskResponseModelObject taskResponseModel = new TaskResponseModel();
                taskResponseModel.setResultCode("400");
                return taskResponseModel;
            }
        }
      

      注:

    • **key**为这次调用的唯一标识,由调用者传进来
    • **resultCode**结果码,200为成功,400表示有异常
    • **resultMessage**信息,表示不成功或者异常信息
    • **data**业务处理结果,如果成功的话
    • 这些服务类都是单例模式
    • resultCode结果码,200为成功,400表示有异常

      data业务处理结果,如果成功的话

      要使用用CompletionService的话,需要创建实现了Callable接口的线程

      投保Callable:

      
          @Data
          @AllArgsConstructor
          public class InsuranceVerificationCommand implements CallableTaskResponseModelObject {
              private String key;
              private PolicyModel policyModel;
              private final InsuranceVerificationService insuranceVerificationService;
              @Override
              public TaskResponseModelObject call() throws Exception {
                  return insuranceVerificationService.insuranceCheck(key, policyModel);
              }
          }
      

      核保常规校验Callable:

      
          @Data
          @AllArgsConstructor
          public class UnderwritingCheckCommand implements CallableTaskResponseModelObject {
              private String key;
              private PolicyModel policyModel;
              private final UnderwritingCheckService underwritingCheckService;
              @Override
              public TaskResponseModelObject call() throws Exception {
                  return underwritingCheckService.underwritingCheck(key, policyModel);
              }
          }
      

      核保外部调用Callable:

      
          @Data
          @AllArgsConstructor
          public class ExternalCallCommand implements CallableTaskResponseModelObject {
              private String key;
              private Insured insured;
              private final ExternalCallService externalCallService;
              @Override
              public TaskResponseModelObject call() throws Exception {
                  return externalCallService.externalCall(key, insured);
              }
          }
      

      试算调用Callable:

      
          @Data
          @AllArgsConstructor
          public class TrialCalculationCommand implements CallableTaskResponseModelObject {
              private String key;
              private Risk risk;
              private final TrialCalculationService trialCalculationService;
              @Override
              public TaskResponseModelObject call() throws Exception {
                  return trialCalculationService.trialCalc(key, risk);
              }
          }
      

    • 每一次调用,需要创建这4种**Callable******
    • 返回统一接口**TaskResopnseModel**
    • 返回统一接口TaskResopnseModel

      异步执行的类:TaskExecutor

      
        @Component
        public class TaskExecutor {
            private static final Logger logger = LoggerFactory.getLogger(TaskExecutor.class);
            //线程池
            private final ExecutorService executorService;
      
            public TaskExecutor(ExecutorService executorService) {
                this.executorService = executorService;
            }
      
            //异步执行,获取所有结果后返回
            public ListTaskResponseModelObject execute(ListCallableTaskResponseModelObject commands) {
                //创建异步执行对象
                CompletionServiceTaskResponseModelObject completionService = new ExecutorCompletionService(executorService);
                for (CallableTaskResponseModelObject command : commands) {
                    completionService.submit(command);
                }
                //获取所有异步执行线程的结果
                int taskCount = commands.size();
                ListTaskResponseModelObject params = new ArrayList(taskCount);
                try {
                    for (int i = 0; i  taskCount; i++) {
                        FutureTaskResponseModelObject future = completionService.take();
                        params.add(future.get());
                    }
                } catch (InterruptedException | ExecutionException e) {
                    //异常处理
                    params.clear();
                    params.add(TaskResponseModel.failure().setKey("error").setResultMessage("异步执行线程错误"));
                }
                //返回,如果执行中发生error, 则返回相应的key值:error
                return params;
            }
        }
      

    • 为单例模式
    • 接收参数为****ListCallableTaskResponseModelObject,也就是上面定义的4种Callable的列表
    • 返回ListTaskResponseModelObject,也就是上面定义4种Callable返回的结果列表
    • 我们的业务是对返回结果统一判断,业务返回结果有因果关系
    • 如果线程执行有异常,也返回ListTaskResponseModel,这个时候列表中只有一个**TaskResponseModel**,**key**为error, 后续调用者可以通过这个来判断线程是否执行成功;
    • 接收参数为****ListCallableTaskResponseModelObject,也就是上面定义的4种Callable的列表

      我们的业务是对返回结果统一判断,业务返回结果有因果关系

      调用方:CompletionServiceController

      
        @RestController
        public class CompletionServiceController {
            //投保key
            private static final String INSURANCE_KEY = "insurance_";
            //核保key
            private static final String UNDERWRITING_KEY = "underwriting_";
            //外部调用key
            private static final String EXTERNALCALL_KEY = "externalcall_";
            //试算key
            private static final String TRIA_KEY = "trial_";
      
            private static final Logger logger = LoggerFactory.getLogger(CompletionServiceController.class);
      
            private final ExternalCallService externalCallService;
            private final InsuranceVerificationService insuranceVerificationService;
            private final TrialCalculationService trialCalculationService;
            private final UnderwritingCheckService underwritingCheckService;
            private final TaskExecutor taskExecutor;
      
            public CompletionServiceController(ExternalCallService externalCallService, InsuranceVerificationService insuranceVerificationService, TrialCalculationService trialCalculationService, UnderwritingCheckService underwritingCheckService, TaskExecutor taskExecutor) {
                this.externalCallService = externalCallService;
                this.insuranceVerificationService = insuranceVerificationService;
                this.trialCalculationService = trialCalculationService;
                this.underwritingCheckService = underwritingCheckService;
                this.taskExecutor = taskExecutor;
            }
      
            //多线程异步并发接口
            @PostMapping(value = "/async", headers = "Content-Type=application/json;charset=UTF-8")
            public String asyncExec(@RequestBody PolicyModel policyModel) {
                long start = System.currentTimeMillis();
      
                asyncExecute(policyModel);
                logger.info("异步总共耗时:" + (System.currentTimeMillis() - start));
                return "ok";
            }
      
            //串行调用接口
            @PostMapping(value = "/sync", headers = "Content-Type=application/json;charset=UTF-8")
            public String syncExec(@RequestBody PolicyModel policyModel) {
                long start = System.currentTimeMillis();
                syncExecute(policyModel);
                logger.info("同步总共耗时:" + (System.currentTimeMillis() - start));
                return "ok";
            }
            private void asyncExecute(PolicyModel policyModel) {
                ListCallableTaskResponseModelObject baseTaskCallbackList = new ArrayList();
                //根据被保人外部接口调用
                for (Insured insured : policyModel.getInsuredList()) {
                    ExternalCallCommand externalCallCommand = new ExternalCallCommand(EXTERNALCALL_KEY + insured.getIdcard(), insured, externalCallService);
                    baseTaskCallbackList.add(externalCallCommand);
                }
                //投保校验
                InsuranceVerificationCommand insuranceVerificationCommand = new InsuranceVerificationCommand(INSURANCE_KEY, policyModel, insuranceVerificationService);
                baseTaskCallbackList.add(insuranceVerificationCommand);
                //核保校验
                UnderwritingCheckCommand underwritingCheckCommand = new UnderwritingCheckCommand(UNDERWRITING_KEY, policyModel, underwritingCheckService);
                baseTaskCallbackList.add(underwritingCheckCommand);
                //根据险种进行保费试算
                for(Risk risk : policyModel.getRiskList()) {
                    TrialCalculationCommand trialCalculationCommand = new TrialCalculationCommand(TRIA_KEY + risk.getRiskcode(), risk, trialCalculationService);
                    baseTaskCallbackList.add(trialCalculationCommand);
                }
                ListTaskResponseModelObject results = taskExecutor.execute(baseTaskCallbackList);
                for (TaskResponseModelObject t : results) {
                    if (t.getKey().equals("error")) {
                        logger.warn("线程执行失败");
                        logger.warn(t.toString());
                    }
                    logger.info(t.toString());
                }
      
            }
            private void syncExecute(PolicyModel policyModel) {
                //根据被保人外部接口调用
                for (Insured insured : policyModel.getInsuredList()) {
                    TaskResponseModelObject externalCall = externalCallService.externalCall(insured.getIdcard(), insured);
                    logger.info(externalCall.toString());
                }
                //投保校验
                TaskResponseModelObject insurance = insuranceVerificationService.insuranceCheck(INSURANCE_KEY, policyModel);
                logger.info(insurance.toString());
                //核保校验
                TaskResponseModelObject underwriting = underwritingCheckService.underwritingCheck(UNDERWRITING_KEY, policyModel);
                logger.info(underwriting.toString());
                //根据险种进行保费试算
                for(Risk risk : policyModel.getRiskList()) {
                    TaskResponseModelObject risktrial = trialCalculationService.trialCalc(risk.getRiskcode(), risk);
                    logger.info(risktrial.toString());
                }
      
            }
        }
      

      1.为测试方便,提供两个接口调用:一个是串行执行,一个是异步并发执行

      2.在异步并发执行函数asyncExecute中:

    • 根据有多少个被保人,创建多少个外部调用的Callable实例,**key**值为**EXTERNALCALL_KEY + insured.getIdcard()**,在一次保单投保调用中,每一个被保人**Callable**的**key**是不一样的。
    • 根据有多少个险种,创建多少个试算的**Callable**实例,**key**为**TRIA_KEY + risk.getRiskcode()**,在一次保单投保调用中,每一个险种的**Callable**的key是不一样的
    • 创建投保校验的**Callable**实例,业务上只需要一个
    • 创建核保校验的**Callable**实例,业务上只需要一个
    • 将Callable列表传入到**TaskExecutor**执行异步并发调用
    • 根据返回结果来判断,通过判断返回的**TaskResponseModel**的**key**值可以知道是哪类业务校验,分别进行判断,还可以交叉判断(公司的业务就是要交叉判断)
    • 根据有多少个险种,创建多少个试算的Callable实例,keyTRIA_KEY + risk.getRiskcode(),在一次保单投保调用中,每一个险种的Callable的key是不一样的

      创建核保校验的Callable实例,业务上只需要一个

      根据返回结果来判断,通过判断返回的TaskResponseModelkey值可以知道是哪类业务校验,分别进行判断,还可以交叉判断(公司的业务就是要交叉判断)

      验证

      验证数据:

      
      {
      "insuredList":
      [{"idcard":"laza","name":"320106"},
      {"idcard":"ranran","name":"120102"}],
      "policyHolder":"lazasha","policyNo":"345000987","riskList":
      [{"mainFlag":1,"premium":300,"riskcode":"risk001","riskname":"险种一"},
      {"mainFlag":0,"premium":400,"riskcode":"risk002","riskname":"险种二"}]
      }
      

      上面数据表明:有两个被保人,两个险种。按照我们上面的定义,会调用两次外部接口,两次试算,一次投保,一次核保。而在样例代码中,一次外部接口调用耗时为200ms, 其他都为50ms.

      本地开发的配置为8C16G:

    • 同步串行接口调用计算:2 * 200 + 2 * 50 + 50 + 50 = 600ms
    • 多线程异步执行调用计算:按照多线程并发执行原理,取耗时最长的200ms
    • 多线程异步执行调用计算:按照多线程并发执行原理,取耗时最长的200ms

      验证:同步接口

      异步并发利器:实际项目中使用CompletionService提升系统性能的一次实践

      输出耗时:可以看到耗时601ms

      异步并发利器:实际项目中使用CompletionService提升系统性能的一次实践

      验证:多线程异步执行接口

      异步并发利器:实际项目中使用CompletionService提升系统性能的一次实践

      输出耗时:可以看到为204ms

      异步并发利器:实际项目中使用CompletionService提升系统性能的一次实践

      结果:基本和我们的预期相符合。

      结束

      这是将实际生产中的例子简化出来,具体生产的业务比较复杂,不便于展示。

      实际情况下,原来的接口需要1000ms以上才能完成单次调用,有的需要2000-3000ms。现在的接口,在生产两台8c16g的虚拟机, 经过4个小时的简单压测能够支持2000用户并发,单次返回时长为350ms左右,服务很稳定,完全能够满足公司的业务发展需求。

      提供的这个是可以运行的列子,代码在:https://github.com/lazasha111211/completionservice-demo.git

      END

      Java面试题专栏

      异步并发利器:实际项目中使用CompletionService提升系统性能的一次实践

      我知道你 “在看异步并发利器:实际项目中使用CompletionService提升系统性能的一次实践

      原文始发于微信公众号(Java知音):

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 异步并发利器——实际项目中使用CompletionService提升系统性能的一次实践


     上一篇
    【加精】【月报】Java知音的一月汇总 【加精】【月报】Java知音的一月汇总
    姗姗来迟的一月精选文章汇总,中间隔了春节,没有及时整理,今天补上~ 往期:Java知音的十月: Java知音的十一月: Java知音的十二月:     基础不牢,地动山摇Spring全家桶实用就完了 我知道你 “在看” 原文始发于微信公
    下一篇 
    这些面试中经常被问到的线程池问题,你都能回答的上来吗? 这些面试中经常被问到的线程池问题,你都能回答的上来吗?
    点击上方“Java知音”,选择“置顶公众号” 技术文章第一时间送达! 作者:Real_man juejin.im/post/5e435ac3f265da57537ea7ba 想要进阶自己的开发水平,JDK源码中一些优秀的设计