手把手教你设计一个百万级的消息推送系统

576
发表时间:2020-10-13 17:35

  消息推送技术选择


  为了满足大量的消息推送连接,并同时支持双全职通信,性能必须得到保证。Java技术栈中的选择,一开始自然排除传统IO。


消息推送


  NIO是唯一的选择。其实这个层面的选择并不多。考虑到社区和数据维护,最终选择了Netty。


  最终架构图如下:


  现在看也无所谓。我们一个一个介绍吧。


  消息推送协议解析


  由于它是一个消息系统,自然要与客户端一起定义协议格式。


  常见简单的协议是HTTP,但我们的一个要求是双全职的交互模式,HTTP更为浏览器服务。我们需要的是一个更加精简的协议,以减少许多不必要的数据传输。


  所以我觉得最好在满足业务需求的同时,定制自己的私有协议。在这个场景中,有一个标准的物联网协议。


  如果是其他场景,可以借鉴流行的RPC框架定制私有协议,使得双方的通信更加高效。


  但是根消息推送据这段时间的经验,无论哪种方式,都需要在协议中预留安全相关的位置。与协议相关的内容就不多讨论了,会介绍更具体的应用。


  简单实现


  先考虑如何实现功能,再考虑百万连接的情况。


  注册认证


  做真正的消息上下行之前首先要考虑的是消息推送认证。就像你用微信一样,第一步是登录。无论谁能直接连接到平台。所以第一步是注册。


  比如上面架构图中的注册/认证模块。一般来说,客户端需要通过HTTP请求传递一个唯一的标识符。通过后台身份验证后,它将响应一个令牌,并在Redis或DB中维护令牌和客户端之间的关系。


  客户端也在本地保存这个令牌,将来的每个请求都必须携带这个令牌。一旦该令牌过期,客户端需要再次请求获取令牌。


  认证通过后,客户端会通过TCP直接连接到图中的推送服务器模块。这个模块是为了真正处理消息的上行和下行。消息推送


  保存渠道关系


  在连接和访问之后,有必要在实际处理服务之前维护当前客户端和通道之间的关系。


  假设客户端的唯一标识是手机号,则需要在一个Map中维护手机号和当前频道。


  这类似于前面Spring Boot集成的长连接心跳机制,如下图所示:


  同时,为了通过渠道获取客户端的唯一标消息推送识(手机号),需要在渠道中设置相应的属性:公共静态void putClientId(通道通道消息推送,字符串clientId){


  channel.attr(CLIENT_ID)。set(ClientID);


  {}


  获取手机号码时:


  公共静态字符串getClientId(通道通道){


  返回(字符串)getAttribute(通道,CLIENT _ ID);


  {}


  这样,当我们的客户端离线时,我们可以记录相关日志:


  string TelNo=NettYattrutil.GetClientID(CTX.channel());


  nettysocketholder.remove(TelNo);


  log.info('客户端离线,TelNo='TelNo');


  在这里,需要注意的是,存储客户端和通道之间关系的映射应该预先设置大小(避免频繁扩展),因为它将是使用最频繁、占消息推送用内存最多的对象。


  消息上行


  下一步是上传真实的业务数据。一般来说,第一步是判断上传的消息输入的是什么业务类型。在聊天场景中,可以上传文字、图片、视频等内容。


  因此,我们必须区分并做不同的处理,这与客户协商的协议有关:


  它可以通过消息头中的字段来区分。


  更简单的是JSON消息,它取出一个字段来区分不同的消息。


  不管怎样,只要能分辨出来就行。


  消息解析和服务解耦


  消息被解析后,就可以进行处理了,比如写入数据库,调用其他接口。


  众所周知,在Netty中处理消息通常是在channelRead()方法中。


  在这里,您可以解析消息并区分类型。但是消息推送如果我们的业务逻辑也写在里面,这里的内容将是巨大的。


  即使我们分成几个开发来处理不同的业务,也会有很多冲突和维护困难等问题。因此,将消息解析与业务处理完全分离是非常必要的。


  这时,面向接口的编程开始发挥作用。这里的核心代码与“制作轮子”——cicada(轻量级Web框架)一致。


  首先为处理业务逻辑定义一个接口,然后在解析消息后,通过反射创建一个特定的对象来执行处理功能。这样,不同的业务和开发人员只需要实现这个接口和自己的业务逻辑。


  伪代码如下:


  要了解蝉的具体实现,请点击这里:


  https://github.com/TogetherOS/cicada


  上行还有一点要注意:因为是基于长连接,消息推送客户端需要定期发送心跳包来维持这个连接。


  同时服务器会进行相应的检查,在n个时间间隔没有收到消息后,会主动断开连接,节省资源。


  这可以通过使用空闲状态处理程序来实现。


  消息下行


  当有向上的运动时,也有向下的运动。例如,在聊天场景中,有两个客户端连接到推送服务器,它们直接需要点对点的通信。


  这个时候的过程是:


  将消息发送到服务器。


  服务器收到消息后,知道消息是要发给B的,消息推送需要在内存中找到B的Channel。


  通过b的渠道转发a的消息。


  这是一个向下的过程。甚至管理员也需要向所有在线用户发送系统通知,类似:遍历存储通道关系的Map,逐个发送消息。这也是之前需要存储在Map中的主要原因。


  伪代码如下:


  具体可参考:


  https://github.com/crossoverJie/netty-action/


  分布式方案


  单机版已经实现。现在我们将关注如何实现数百万个连接。


  其实百万连接只是一个形容词,更多的是如何实现一个分布式的方案,可以灵活的横向扩展,支持更多的连接。在这样做之前消息推送,我们必须首先找出我们的独立版本可以支持多少连接。


  影响这一点的因素有很多:


  服务器配置本身。内存、CPU、网卡、Linux支持的最大打开文件数等。


  应用您自己的配置。因为Netty本身需要依赖堆外内存,JVM本身也需要占用一部分内存,比如存储通道关系的大型Map。这一点需要根据自身情况进行调整。


  结合上述情况,我们可以测试单个节点可以支持的最大连接数。单机再怎么优化都有上限,这也是分布式系统解决的主要问题。


  建筑概论


  在谈具体实现之前,我们必须先谈一下上面贴的整体架构图:


  从左边开始。上面提到的注册和认证模块也部署在集群中,并通过预Nginx加载。如前所述,它的主要目的是进行身份验证并向客户端返回令牌。但是在推服务器集群之后,它还有另外一个功能。也就是返回一个可以被当前客户端使用的推送服务器。


  右边的平台一般指的是管理平台,可以查看当前实时在线号码,向指定客户端推送消息等等。推送消息需要通过推送服务器才能消息推送找到真正的推送节点。


  其他中间件如Redis、ZooKeeper、Kafka、MySQL等都是为这些功能准备的,见下面的实现。


  注册发现


  首先,第一个问题是注册发现。如何为客户端选择一个可用的节点是推服务器数量变成多个后首先要解决的问题。


  事实上,本节的内容已经在分布式(I)服务注册和发现中进行了详细描述。所有的推送服务器在启动时都需要在ZooKeeper中注册自己的信息。


  注册认证模块会订阅ZooKeeper中的节点,从而获取最新的服务列表。结构如下:


  下面是一些伪代码:应用程序开始注册ZooKeeper。


  注册认证模块只需要订阅这个ZooKeeper节点:


  路由策略


  既然可以获得所有的服务列表,那么我们如何为客户端选择合适的推送服务器呢?


  该过程侧重于以下几点:


  尽量保证所有节点均匀连接。


  作为重新平衡是添加还是删除节点。


  首先,有几种算法可以确保均衡:


  投票。将每个节点逐个分配给客户端消息推送。但是,新节点的分布会不均匀。


  哈希模模式。类似HashMap,但是也会有轮询的问题。当然,你可以像HashMap一样做一个再平衡,让所有客户端重新连接。然而,这将导致所有连接被中断和重新连接,这有点昂贵。


  由于哈希取模的问题,带来了一致的哈希算法,但部分客户端仍然需要重新平衡。


  体重。每个节点的负载情况可以手动调整,甚至自动调整。基于监控,负载高时会自动降低部分节点的权重,负载低时可以增加权重。


  另一个问题是:当我们重启一些应用程序进行升级时,这个节点上的客户端会做什么?


  因为我们有心跳机制,所以心跳被阻断的时候可以认为这个节点有问题。那么需要重新请求注册认证模块来获得可用的节点。它也适用于弱网络。如果客户端此时正在发送消息,则需要将消息保存在本地,并等待获取新节点后再重新发送。


  有状态连接


  在这种情况下,与无状态的HTTP不同,消息推送我们需要清楚地知道每个客户端和连接之间的关系。


  在上面的独立版本中,我们将这种关系保存在本地缓存中,但是它在分布式环境中显然不起作用。


  例如,当平台向客户端推送消息时,它必须首先知道客户端的通道存储在哪个节点上。


  借助我们之前的经验,引入第三方中间件来存储这种关系是很自然的。


  也就是说,当客户端访问推送服务器时,体系结构图中存放路的Redis需要在Redis中存储当前客户端的唯一标识符和服务节点的ip端口。


  同时,当客户端离线时,必须从Redis中删除连接关系。这样,在理想情况下,每个节点内存中的映射关系应该与Redis中的数据完全相同。


  伪代码如下:


  在这里,当存放路相关时会出现并发问题,所以最好将其更改为Lua脚本。


  推送路由


  想象这样一个场景:如果管理员需要向最近注册的客户端推送系统消息,他会怎么做?


  结合架构图,假设这批有10W个客户端。首先,消息推送我们需要通过Nginx将这些号码发送到平台下的推送路由。


  为了提高效率,这批号码甚至可以再次分散到每个推送路由。得到具体数字后,根据数字启动多线程,得到上一路由Redis中客户端对应的推送服务器。


  然后通过HTTP调用推送服务器进行真正的消息传递(Netty也很支持HTTP协议)。


  推送成功后,需要将结果更新到数据库中,离线客户端可以根据业务重新推送。


  信息流


  也许有些场景非常重视客户端的上游消息,需要持久化,消息量非常大。


  在推服务器里做生意显然不合适,可以选择卡消息推送夫卡去解耦。把所有上行数据直接丢进卡夫卡,不要管它。然后消费者程序取出数据并写入数据库。


  分布式问题


  分布式技术解决了性能问题,但也带来了其他麻烦。


  应用监控


  比如如何在线了解几十个推送服务器节点的健康状况?这时候监控系统就要发挥作用了,我们需要知道每个节点当前的内存使用情况和GC。还有操作系统本身的内存使用,毕竟Netty使用了大量的堆外内存。同时需要监控各个节点当前的上线号以及Redis中的上线号。理论上这两个数应该相等。


  这样,我们就可以知道系统的使用情况,并灵活地维护这些节点的数量。


  日志处理


  日志记录也变得极其重要。例如,当客户端消息推送无法连接时,您需要知道问题出在哪里。


  市场营销方案最好在每个请求中添加一个traceID来记录日志,这样您就可以通过这个日志来检查卡在每个节点中的位置。而ELK必须要用。


     联系电话:
15807157429
手机/微信
 
 

开户/代运营:

18871871197


开户/代运营:

15807157429