从零开始开发IM(即时通讯)服务端(二)附源码

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 从零开始开发IM(即时通讯)服务端(二)附源码

点击上方“Java知音”,选择“置顶公众号”

技术文章第一时间送达!

作者:yuanrw

juejin.im/post/5d73af31f265da0394022e45

juejin.im/post/5d73af31f265da0394022e45

本篇将带大家从零开始搭建一个轻量级的IM服务端,IM的整体设计思路和架构在我的上篇博客中已经讲过了,没看过的同学请点击 。

这篇将给大家带来更多的细节实现。我将从三个方面来阐述如何构建一个完整可靠的IM系统。

  • 可靠性
  • 安全性
  • 存储设计
  • 可靠性

    什么是可靠性?对于一个IM系统来说,可靠的定义至少是不丢消息、消息不重复、不乱序,满足这三点,才能说有一个好的聊天体验。

    不丢消息

    我们先从不丢消息开始讲起。

    首先复习一下上一篇设计的服务端架构:

    从零开始开发IM(即时通讯)服务端(二)附源码

    我们先从一个简单例子开始思考:当Alice给Bob发送一条消息时,可能要经过这样一条链路:

    从零开始开发IM(即时通讯)服务端(二)附源码
  • client--connecter
  • connector--transfer
  • transfer--connector
  • connector--client
  • 在这整个链路中的每个环节都有可能出问题,虽然tcp协议是可靠的,但是它只能保证链路层的可靠,无法保证应用层的可靠。

    例如在第一步中,connector收到了从client发出的消息,但是转发给transfer失败,那么这条消息Bob就无法收到,而Alice也不会意识到消息发送失败了。

    如果Bob状态是离线,那么消息链路就是:

  • client--connector
  • connector--transfer
  • transfer--mq
  • 如果在第三步中,transfer收到了来自connector的消息,但是离线消息入库失败,那么这个消息也是传递失败了。

    为了保证应用层的可靠,我们必须要有一个ack机制,使发送方能够确认对方收到了这条消息。

    具体的实现,我们模仿tcp协议做一个应用层的ack机制。

    tcp的报文是以字节(byte)为单位的,而我们以message单位。

    从零开始开发IM(即时通讯)服务端(二)附源码

    发送方每次发送一个消息,就要等待对方的ack回应,在ack确认消息中应该带有收到的id以便发送方识别。

    其次,发送方需要维护一个等待ack的队列。每次发送一个消息之后,就将消息和一个计时器入队。

    另外存在一个线程一直轮询队列,如果有超时未收到ack的,就取出消息重发。

    超时未收到ack的消息有两种处理方式:

  • 和tcp一样不断发送直到收到ack为止。
  • 设定一个最大重试次数,超过这个次数还没收到ack,就使用失败机制处理,节约资源。例如如果是connector长时间未收到client的ack,那么可以主动断开和客户端的连接,剩下未发送的消息就作为离线消息入库,客户端断连后尝试重连服务器即可。
  • 不重复、不乱序

    有的时候因为网络原因可能导致ack收到较慢,发送方就会重复发送,那么接收方必须有一个去重机制。

    去重的方式是给每个消息增加一个唯一id。这个唯一id并不一定是全局的,只需要在一个会话中唯一即可。例如某两个人的会话,或者某一个群。如果网络断连了,重新连接后,就是新的会话了,id会重新从0开始。

    接收方需要在当前会话中维护收到的最后一个消息的id,叫做lastId。
    每次收到一个新消息, 就将id与lastId作比较看是否连续,如果不连续,就放入一个暂存队列 queue中稍后处理。

    例如:

  • 当前会话的lastId=1,接着服务器收到了消息msg(id=2),可以判断收到的消息是连续的,就处理消息,将lastId修改为2。
  • 但是如果服务器收到消息msg(id=3),就说明消息乱序到达了,那么就将这个消息入队,等待lastId变为2后,(即服务器收到消息msg(id=2)并处理完了),再取出这个消息处理。
  • 因此,判断消息是否重复只需要判断 msgIdlastId && !queue.contains(msgId)即可。如果收到重复的消息,可以判断是ack未送达,就再发送一次ack。

    接收方收到消息后完整的处理流程如下:

    从零开始开发IM(即时通讯)服务端(二)附源码

    伪代码如下:

    
    class ProcessMsgNode{
        /**
         * 接收到的消息
         */
        private Message message;
        /**
         * 处理消息的方法
         */
        private ConsumerMessage consumer;
    }
    
    public CompletableFutureVoid offer(Long id,Message     message,ConsumerMessage consumer) {
        if (isRepeat(id)) {
        //消息重复
            sendAck(id);
            return null;
        }
        if (!isConsist(id)) {
        //消息不连续
            notConsistMsgMap.put(id, new ProcessMsgNode(message, consumer));
            return null;
        }
        //处理消息
        return process(id, message, consumer);
    }
    
    private CompletableFutureVoid process(Long id, Message message, ConsumerMessage consumer) {
        return CompletableFuture
            .runAsync(() - consumer.accept(message))
            .thenAccept(v - sendAck(id))
            .thenAccept(v - lastId.set(id))
            .thenComposeAsync(v - {
                Long nextId = nextId(id);
                if (notConsistMsgMap.containsKey(nextId)) {
                    //队列中有下个消息
                    ProcessMsgNode node = notConsistMsgMap.get(nextId);
                    return process(nextId, node.getMessage(), consumer);
                } else {
                    //队列中没有下个消息
                    CompletableFutureVoid future = new CompletableFuture();
                    future.complete(null);
                    return future;
                }
            })
            .exceptionally(e - {
                logger.error("[process received msg] has error", e);
                return null;
            });
    }
    

    安全性

    无论是聊天记录还是离线消息,肯定都会在服务端存储备份,那么消息的安全性,保护客户的隐私也至关重要。

    因此所有的消息都必须要加密处理。

    在存储模块里,维护用户信息和关系链有两张基础表,分别是im_user用户表和im_relation关系链表。

    1.im_user表用于存放用户常规信息,例如用户名密码等,结构比较简单。

    2.im_relation表用于记录好友关系,结构如下:

    
    CREATE TABLE `im_relation` (
      `id` bigint(20) COMMENT '关系id',
      `user_id1` varchar(100) COMMENT '用户1id',
      `user_id2` varchar(100) COMMENT '用户2id',
      `encrypt_key` char(33) COMMENT 'aes密钥',
      `gmt_create` timestamp DEFAULT CURRENT_TIMESTAMP,
      `gmt_update` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, 
      PRIMARY KEY (`id`),
      UNIQUE KEY `USERID1_USERID2` (`user_id1`,`user_id2`)
    );
    

    3.user_id1和user_id2是互为好友的用户id,为了避免重复,存储时按照user_id1user_id2的顺序存,并且加上联合索引。

    4.encrypt_key是随机生成的密钥。当客户端登录时,就会从数据库中获取该用户的所有的relation,存在内存中,以便后续加密解密。

    5.当客户端给某个好友发送消息时,取出内存中该关系的密钥,加密后发送。同样,当收到一条消息时,取出相应的密钥解密。

    客户端完整登录流程如下:

    从零开始开发IM(即时通讯)服务端(二)附源码
  • client调用rest接口登录。
  • client调用rest接口获取该用户所有relation。
  • client向connector发送greet消息,通知上线。
  • connector拉取离线消息推送给client。
  • connector更新用户session。
  • 那为什么connector要先推送离线消息再更新session呢?我们思考一下如果顺序倒过来会发生什么:

  • 用户Alice登录服务器
  • connector更新session
  • 推送离线消息
  • 此时Bob发送了一条消息给Alice
  • 如果离线消息还在推送的过程中,Bob发送了新消息给Alice,服务器获取到Alice的session,就会立刻推送。这时新消息就有可能夹在一堆离线消息当中推过去了,那这时,Alice收到的消息就乱序了。

    而我们必须保证离线消息的顺序在新消息之前。

    那么如果先推送离线消息,之后才更新session。在离线消息推送的过程中,Alice的状态就是“未上线”,这时Bob新发送的消息只会入库im_offline,im_offline表中的数据被读完之后才会“上线”开始接受新消息。这也就避免了乱序。

    存储设计

    存储离线消息

    当用户不在线时,离线消息必然要存储在服务端,等待用户上线再推送。理解了上一个小节后,离线消息的存储就非常容易了。增加一张离线消息表im_offline,表结构如下:

    
    CREATE TABLE `im_offline` (
      `id` int(11) COMMENT '主键',
      `msg_id` bigint(20) COMMENT '消息id',
      `msg_type` int(2) COMMENT '消息类型',
      `content` varbinary(5000) COMMENT '消息内容',
      `to_user_id` varchar(100) COMMENT '收件人id',
      `has_read` tinyint(1) COMMENT '是否阅读',
      `gmt_create` timestamp COMMENT '创建时间',
      PRIMARY KEY (`id`)
    );
    

    msg_type用于区分消息类型(chat,ack),content加密后的消息内容以byte数组的形式存储。

    用户上线时按照条件to_user_id=用户id拉取记录即可。

    防止离线消息重复推送

    我们思考一下多端登录的情况,Alice有两台设备同时登陆,在这种并发的情况下,我们就需要某种机制来保证离线消息只被读取一次。

    这里利用CAS机制来实现:

    1.首先取出所有has_read=false的字段。

    2.检查每条消息的has_read值是否为false,如果是,则改为true。这是原子操作。

    
    update im_offline set has_read = true where id = ${msg_id} and has_read = false
    

    3.修改成功则推送,失败则不推送。

    相信到这里,同学们已经可以自己动手搭建一个完整可用的IM服务端了。更多问题欢迎评论区留言~~

    GitHub地址,如果觉得有用,记得给个star:

    https://github.com/yuanrw/IM

    推荐阅读(点击即可跳转阅读)

    1.

    2.

    3.

    4.

    5.

    觉得不错?欢迎转发分享给更多人

    从零开始开发IM(即时通讯)服务端(二)附源码

    我知道你 “在看从零开始开发IM(即时通讯)服务端(二)附源码

    原文始发于微信公众号(Java知音):

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 从零开始开发IM(即时通讯)服务端(二)附源码


     上一篇
    从零开始开发IM(即时通讯)服务端(一)附源码 从零开始开发IM(即时通讯)服务端(一)附源码
    点击上方“Java知音”,选择“置顶公众号” 技术文章第一时间送达! 作者:yuanrw juejin.im/post/5d6b3949f265da03c34c13e5 juejin.im/post/5
    2021-04-05
    下一篇 
    Redis布隆过滤器 Redis布隆过滤器
    正文 场景在项目开发中,我们经常会遇到去重问题。比如:判断一个人有没有浏览过一篇文章,判断一个人当天是否登录过某个系统,判断一个ip是否发过一个请求,等等。 比较容易想到的是使用set来实现这个功能。但如果数据量较大,使用set会非常消耗内
    2021-04-05