MQ

MQ-01丨消息队列入门

Posted by jiefang on November 1, 2019

01丨为什么需要消息队列?

  • 异步处理
  • 流量控制
  • 服务解耦

异步处理

消息队列被用于实现服务的异步处理。这样做的好处是:

  • 可以更快地返回结果;
  • 减少等待,自然实现了步骤之间的并发,提升系统总体的性能。

流量控制

这种设计的优点是:能根据下游的处理能力自动调节流量,达到“削峰填谷”的作用。但这样做同样是有代价的:

  • 增加了系统调用链环节,导致总体的响应时延变长。
  • 上下游系统都要将同步调用改为异步消息,增加了系统的复杂度。

服务解耦

消息队列的另外一个作用,就是实现系统应用之间的解耦。

总结

消息队列最常被使用的三种场景:异步处理、流量控制和服务解耦。当然,消息队 列的适用范围不仅仅局限于这些场景,还有包括:

  • 作为发布 / 订阅系统实现一个微服务级系统间的观察者模式;
  • 连接流计算任务和数据;
  • 用于将消息广播给大量接收者。

消息队列也有它自身的一些问题和局限性,包括:

  • 引入消息队列带来的延迟问题;
  • 增加了系统的复杂度;
  • 可能产生数据不一致的问题。

02丨该如何选择消息队列?

消息队列选型标准

  • 必须是开源的产品
  • 必须是近年来比较流行并且有一定社区活跃度的产品

作为一款及格的消息队列产品,必须具备的几个特性包括:

  • 消息的可靠传递:确保不丢消息;
  • Cluster:支持集群,确保不会因为某个节点宕机导致服务不可用,当然也不能丢消息;
  • 性能:具备足够好的性能,能满足绝大多数场景的性能要求。

可供选择的消息队列产品

RabbitMQ

优势

  • RabbitMQ 是一个相当轻量级的消息队列,非常容易部署和使用
  • 号称是世界上使用最广泛的开源消息队列
  • 特色的功能是支持非常灵活的路由配置,和其他消息队列不同的 是,它在生产者(Producer)和队列(Queue)之间增加了一个 Exchange 模块。

不足

  • RabbitMQ 对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。
  • RabbitMQ 的性能比较差,每秒钟可以处理几万到十几万条消息
  • RabbitMQ 使用的编程语言 Erlang,难以扩展和二次开发。

RocketMQ

优势

  • RocketMQ 有着不错的性能,稳定性和可靠性;
  • RocketMQ 使用 Java 语言开发,容易扩展或者二次开发。
  • RocketMQ 对在线业务的响应时延做了很多的优化,级别可以毫秒级的响应,如果应用场景很在意响应时延,那应该选择使用 RocketMQ
  • RocketMQ 的性能比 RabbitMQ 要高一个数量级,每秒钟大概能处理几十万条消息。

不足

  • 作为国产的消息队列,相比国外的比较流行的同类产品,在国际上还没有那么流行,与周边生态系统的集成和兼容程度要略逊一筹。

Kafka

当下的Kafka 已经发展为一个非常成熟的消息队列产品,无论在数据可靠性、稳定性和功能特性等方面都可以满足绝大多数场景的需求 Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域,几乎所有的相关开源软件系统都会优先支持 Kafka。 Kafka 使用 Scala 和 Java 语言开发,设计上大量使用了批量和异步的思想,这种设计使得Kafka 能做到超高的性能,Kafka的性能,尤其是异步收发的性能,是三者中最好的,但与RocketMQ并没有量级上的差异,大约每秒钟可以处理几十万条消息。

Kafka 这种异步批量的设计带来的问题是,它的同步收发消息的响应时延比较高,因为当客户端发送一条消息的时候,Kafka并不会立即发送出去,而是要等一会儿攒一批再发送,在它的Broker中,很多地方都会使用这种“先攒一波再一起处理”的设计。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka不太适合在线业务场景

03丨消息模型

主题和队列的区别

队列模型

队列是先进先出(FIFO, First-In-First-Out)的线性表(Linear List)。在具体应用中通常用链表或者数组来实现。队列只允许在后端(称为 rear)进行插入操作,在前端(称为 front)进行删除操作。这个定义里面包含几个关键点,第一个是先进先出,这里面隐含着的一个要求是,在消息入队出队过程中,需要保证这些消息严格有序,按照什么顺序写进队列,必须按照同样的顺序从队列中读出来。不过,队列是没有“读”这个操作的,“读”就是出队,也就是从队列中“删除”这条消息。早期的消息队列,就是按照“队列”的数据结构来设计的。

队列模型

发布-订阅模型

在发布 - 订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息。

对比这两种模型,生产者就是发布者,消费者就是订阅者,队列就是主题,并没有本质的区别。它们最大的区别其实就是,一份消息数据能不能被消费多次的问题 发布-订阅模型

RabbitMQ 的消息模型

RabbitMQ是少数依然坚持使用队列模型的产品之一。那它是怎么解决多个消费者的问题呢?在RabbitMQ中,Exchange位于生产者和队列之间,生产者并不关心将消息发送给哪个队列,而是将消息发送给 Exchange,由 Exchange 上配置的策略来决定将消息投递到哪些队列中。 RabbitMQ 的消息模型 同一份消息如果需要被多个消费者来消费,需要配置Exchange将消息发送到多个队列,每个队列中都存放一份完整的消息数据,可以为一个消费者提供消费服务。这也可以变相地实现新发布-订阅模型中,“一份消息数据可以被多个订阅者来多次消费”这样功能。

RocketMQ 的消息模型

RocketMQ 使用的消息模型是标准的发布 - 订阅模型,在 RocketMQ 也有队列(Queue)这个概念。每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。需要注意的是,RocketMQ 只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的

消费组

RocketMQ 中,订阅者的概念是通过消费组(ConsumerGroup)来体现的。每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者Consumer1消费了,那同组的其他消费者就不会再收到这条消息。

消费位置

在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要RocketMQ为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。这个消费位置是非常重要的概念,我们在使用消息队列的时候,丢消息的原因大多是由于消费位置处理不当导致的。

RocketMQ 的消息模型

Kafka 的消息模型

Kafka 的消息模型和 RocketMQ 是完全一样的,所有 RocketMQ中对应的概念,和生产消费过程中的确认机制,都完全适用于 Kafka。唯一的区别是,在Kafka中,队列这个概念的名称不一样,Kafka 中对应的名称是“分区(Partition)”,含义和功能是没有任何区别的。

04丨事务消息实现分布式事务

消息队列中的“事务”,主要解决的是消息生产者和消息消费者的数据一致性问题。

分布式事务

分布式事务就是要在分布式系统中的实现事务。在分布式系统中,在保证可用性和不严重牺牲性能的前提下,光是要实现数据的一致性就已经非常困难了,所以出现了很多“残血版”的一致性,比如顺序一致性、最终一致性等等。

事务消息适用的场景主要是那些需要异步更新数据,并且对数据实时性要求不太高的场景。

如何实现分布式事务

事务消息需要消息队列提供相应的功能才能实现,Kafka 和 RocketMQ 都提供了事务相关功能。 消息队列实现分布式事务

首先,订单系统在消息队列上开启一个事务。然后订单系统给消息服务器发送一个“半消息”,这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。

半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。然后根据本地事务的执行结果决定提交或者回滚事务消息。

如果订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程。如果订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息。这样就基本实现了“要么都成功,要么都失败”的一致性要求。

第四步提交事务消息时失败了怎么办? Kafka 的解决方案比较简单粗暴,直接抛出异常,让用户自行处理

RocketMQ 中的分布式事务实现

在 RocketMQ 中的事务实现中,增加了事务反查的机制来解决事务消息提交失败的问题。如果 Producer也就是订单系统,在提交或者回滚事务消息时发生网络异常,RocketMQ的 Broker 没有收到提交或者回滚的请求,Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。为了支撑这个事务反查机制,我们的业务代码需要实现一个反查本地事务状态的接口,告知RocketMQ 本地事务是成功还是失败。

RocketMQ分布式事务

05丨如何确保消息不会丢失?

检测消息丢失的方法

可以利用消息队列的有序性来验证是否有消息丢失。原理非常简单,在 Producer 端, 给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的 连续性。 有几个问题需要你注意:

  • 像 Kafka 和 RocketMQ 这样的消息队列,它是不保证在 Topic 上的严格顺序的,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。
  • 系统中 Producer 是多实例的,由于并不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。
  • Consumer 实例的数量最好和分区数量一致,做到 Consumer 和分区一一对应,这样会较方便地在 Consumer 内检测消息序号的连续性。

确保消息可靠传递

消息的生产消费

  • 生产阶段: 在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker端。
  • 存储阶段: 在这个阶段,消息在Broker端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
  • 消费阶段: 在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到Consumer 上。

生产阶段

在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。

只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。你在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。以 Kafka 为例,我们看一下如何可靠地发送消息:

1
2
3
4
5
6
7
try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.println(" 消息发送成功。");
}catch (Throwable e) {
    System.out.println(" 消息发送失败!");
    System.out.println(e);
}

异步发送时,则需要在回调方法里进行检查。这个地方是需要特别注意的,很多丢消息的原因就是,我们使用了异步发送,却没有在回调中检查发送结果。

1
2
3
4
5
6
7
8
producer.send(record, (metadata, exception) -> {
    if (metadata != null) {
        System.out.println(" 消息发送成功。");
    }else {
        System.out.println(" 消息发送失败!");
        System.out.println(exception);
    }
});

存储阶段

在存储阶段正常情况下,只要Broker在正常运行,就不会出现丢失消息的问题,但是如果Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。 如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。

对于单个节点的 Broker,需要配置Broker参数,在收到消息后,将消息写入磁盘后再给Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为SYNC_FLUSH同步刷盘

如果是 Broker 是由多个节点组成的集群,需要将Broker集群配置成:至少将消息发送到2 个以上的节点,再给客户端回复发送确认响应。这样当某个Broker宕机时,其他的Broker 可以替代宕机的 Broker,也不会发生消息丢失。

消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给Broker发送消费确认响应。如果Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

在编写消费代码时需注意:不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认

小结

  • 在生产阶段,你需要捕获消息发送的错误,并重发消息。
  • 在存储阶段,你可以通过配置刷盘和复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个 Broker 宕机或者磁盘损坏而丢失。
  • 在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认。

06丨如何处理消费过程中的重复消息

消息重复的情况必然存在

在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低 到高依次是:

  • At most once: 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
  • At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
  • Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。

常用的绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和Kafka 都是这样。也就是说消息队列很难保证消息不重复。

用幂等性解决重复消息问题

一般解决重复消息的办法是,在消费端,让我们消费消息的操作具备幂等性。

幂等(Idempotence)被用来描述一个操作、方法或者服务。一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同

从对系统的影响结果来说:At least once + 幂等消费 = Exactly once。 那么如何实现幂等操作呢?最好的方式就是,从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作。

利用数据库的唯一约束实现幂等

不光是可以使用关系型数据库,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等,比如,你可以用 Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂等消费。

为更新的数据设置前置条件

另外一种实现幂等的思路是,给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。

记录并检查操作

如果上面提到的两种实现幂等方法都不能适用于你的场景,我们还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯一ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。

具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个ID检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。

首先,给每个消息指定一个全局唯一的 ID 就是一件不那么简单的事儿,方法有很多,但都不太好同时满足简单、高可用和高性能,或多或少都要有些牺牲。更加麻烦的是,在“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。

07丨消息积压了该如何处理

优化性能来避免消息积压

在使用消息队列的系统中,对于性能的优化,主要体现在生产者和消费者这一收一发两部分的业务逻辑中。对于绝大多数使用消息队列的业务来说,消息队列本身的处理能力要远大于业务系统的处理能力。主流消息队列的单个节点,消息收发的性能可以达到每秒钟处理几万至几十万条消息的水平,还可以通过水平扩展 Broker 的实例数成倍地提升处理能力。

而一般的业务系统需要处理的业务逻辑远比消息队列要复杂,单个节点每秒钟可以处理几百到几千次请求,已经可以算是性能非常好的了。

所以,对于消息队列的性能优化,我们更关注的是,在消息的收发两端,我们的业务代码怎么和消息队列配合,达到一个最佳的性能

发送端性能优化

发送端业务代码的处理性能,实际上和消息队列的关系不大,因为一般发送端都是先执行自己的业务逻辑,最后再发送消息。如果说,你的代码发送消息的性能上不去,你需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的

对于发送消息的业务逻辑,只需要注意设置合适的并发和批量大小,就可以达到很好的发送性能。 Producer 发送消息的过程,Producer 发消息给 Broker,Broker收到消息后返回确认响应,包括步骤的耗时:

  • 发送端准备数据、序列化消息、构造请求等逻辑的时间,也就是发送端在发送网络请求之前的耗时;
  • 发送消息和返回响应在网络传输中的耗时;
  • Broker 处理消息的时延。

消费端性能优化

使用消息队列的时候,大部分的性能问题都出现在消费端,如果消费的速度跟不上发送端生产消息的速度,就会造成消息积压。

在设计系统的时候,一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行

消费端的性能优化除了优化消费业务逻辑以外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。特别需要注意的一点是,在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保Consumer 的实例数和分区数量是相等的。如果Consumer的实例数量超过分区数量,这样的扩容实际上是没有效果的。原因我们之前讲过,因为对于消费者来说,在每个分区上实际上只能支持单线程消费。

消息积压了该如何处理?

能导致积压突然增加,最粗粒度的原因,只有两种:要么是发送变快了,要么是消费变慢了

如果是单位时间发送的消息增多,比如说是赶上大促或者抢购,短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是通过扩容消费端的实例数来提升总体的消费能力

如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。

如果通过监控发现,无论是发送消息的速度还是消费消息的速度和原来都没什么变化,这时候你需要检查一下你的消费端,是不是消费失败导致的一条消息反复消费这种情况比较多,这种情况也会拖慢整个系统的消费速度

如果监控到消费变慢了,你需要检查你的消费实例,分析一下是什么原因导致消费变慢。优先检查一下日志是否有大量的消费错误,如果没有错误的话,可以通过打印堆栈信息,看一下你的消费线程是不是卡在什么地方不动了,比如触发了死锁或者卡在等待某些资源上了。

08丨网关如何接收服务端的秒杀结果

网关如何接收服务端的秒杀结果

image

如何实现单个队列的并行消费?

比如说,队列中当前有 10 条消息,对应的编号是 0-9,当前的消费位置是 5。同时来了三个消费者来拉消息,把编号为5、6、7的消息分别给三个消费者,每人一条。过了一段时间,三个消费成功的响应都回来了,这时候就可以把消费位置更新为 8 了,这样就实现并行消费。

这是理想的情况。还有可能编号为 6、7 的消息响应回来了,编号 5 的消息响应一直回不来,怎么办?这个位置 5 就是一个消息空洞。为了避免位置 5 把这个队列卡住,可以先把消费位置5这条消息,复制到一个特殊重试队列中,然后依然把消费位置更新为 8,继续消费。再有消费者来拉消息的时候,优先把重试队列中的那条消息给消费者就可以了。

这是并行消费的一种实现方式。需要注意的是,并行消费开销还是很大的,不应该作为一个常规的,提升消费并发的手段,如果消费慢需要增加消费者的并发数,还是需要扩容队列数。

如何保证消息的严格顺序?

主题层面是无法保证严格顺序的,只有在队列上才能保证消息的严格顺序。

如果说,你的业务必须要求全局严格顺序,就只能把消息队列数配置成1,生产者和消费者也只能是一个实例,这样才能保证全局严格顺序。

09丨学习开源代码该如何入手

通过文档来了解开源项目

学习源代码应该从哪儿入手呢?最佳的方式就是先看它的文档

用以点带面的方式来阅读源码

推荐大家阅读源码的方式是,带着问题去读源码,最好是带着问题的答案去读源码

10丨如何使用异步设计提升系统性能

异步设计如何提升系统性能?

同步实现的性能瓶颈采用同步实现的方式,整个服务器的所有线程大部分时间都没有在工作,而是都在等待。

采用异步实现解决等待问题

异步化实现后,整个流程的时序和同步实现是完全一样的,区别只是在线程模型上由同步顺序调用改为了异步调用和回调的机制。

简单实用的异步框架: CompletableFuture

Java 中比较常用的异步框架有 Java8 内置的CompletableFuture和 ReactiveX 的RxJava.

小结

简单的说,异步思想就是,当我们要执行一项比较耗时的操作时,不去等待操作结束,而是给这个操作一个命令:“当操作完成后,接下来去执行什么。”

11丨如何实现高性能的异步网络传输

同步网络IO模型

每个连接都需要阻塞一个线程来等待数据,大量的连接数就会需要相同数量的数据接收线程。当这些 TCP 连接都在进行数据收发的时候,会导致什么情况呢?对,会有大量的线程来抢占 CPU 时间,造成频繁的 CPU 上下文切换,导致 CPU 的负载升高,整个系统的性能就会比较慢。

使用 Netty 来实现异步网络通信

异步网络IO模型

使用 NIO 来实现异步网络通信

NIO上线异步网络

12丨序列化与反序列化

要想使用网络框架的 API 来传输结构化的数据,必须得先实现结构化的数据与字 节流之间的双向转换。这种将结构化数据转换成字节流的过程,我们称为序列化,反过来转换,就是反序列化。

序列化的用途除了用于在网络上传输数据以外,另外的一个重要用途是,将结构化数据保存在文件中,因为在文件内保存数据的形式也是二进制序列,和网络传输过程中的数据是一样的,所以序列化同样适用于将结构化数据保存在文件中。

该选择哪种序列化实现?

面对这么多种序列化实现,我们该如何选择呢?需要权衡几个因素:

  • 序列化后的数据最好是易于人类阅读的;
  • 实现的复杂度是否足够低;
  • 序列化和反序列化的速度越快越好;
  • 序列化后的信息密度越大越好,也就是说,同样的一个结构化数据,序列化之后占用的存储空间越小越好;

像 JSON、XML 这些序列化方法,可读性最好,但信息密度也最低。像Kryo、Hessian 这些通用的二进制序列化实现,适用范围广,使用简单,性能比 JSON、XML 要好一些,但是肯定不如专用的序列化实现。

实现高性能的序列化和反序列化

13丨传输协议:应用程序之间对话的语言

传输协议就是应用程序之间对话的语言

断句

  • 分隔符
  • 前置长度

用双工收发协议提升吞吐量

  • 使用 ID 来标识请求与响应对应关系

发送请求的时候,给每个请求加一个序号,这个序号在本次会话内保证唯一,然后在响应中带上请求的序号,这样就可以把请求和响应对应上了。

14丨内存管理:如何避免内存溢出和频繁的垃圾回收

15丨Kafka如何实现高性能IO

使用批量消息提升服务端处理能力

批量处理是一种非常有效的提升系统吞吐量的方法。在 Kafka 内部,消息都是以“批”为单位处理的。

Kafka的客户端 SDK 在实现消息发送逻辑的时候,采用了异步批量发送的机制。

调用 send() 方法发送一条消息之后,无论你是同步发送还是异步发送,Kafka 都不会立即就把这条消息发送出去。它会先把这条消息,存放在内存中缓存起来,然后选择合适的时机把缓存中的所有消息组成一批,一次性发给 Broker。

在Broker 整个处理流程中,无论是写入磁盘、从磁盘读出来、还是复制到其他副本这些流程中,批消息都不会被解开,一直是作为一条“批消息”来进行处理的

在消费时,消息同样是以批为单位进行传递的,Consumer 从 Broker 拉到一批消息后,在客户端把批消息解开,再一条一条交给用户代码处理。

构建批消息和解开批消息分别在发送端和消费端的客户端完成,不仅减轻了 Broker 的压力,最重要的是减少了 Broker 处理请求的次数,提升了总体的处理能力。

使用顺序读写提升磁盘 IO 性能

对于磁盘来说,它有一个特性,就是顺序读写的性能要远远好于随机读写。 操作系统每次从磁盘读写数据的时候,需要先寻址,也就是先要找到数据在磁盘上的物理位置,然后再进行数据读写。顺序读写相比随机读写省去了大部分的寻址时间,它只要寻址一次,就可以连续地读写下去,所以说,性能要比随机读写要好很多。

Kafka 就是充分利用了磁盘的这个特性。它的存储设计非常简单,对于每个分区,它把从Producer 收到的消息,顺序地写入对应的log文件中,一个文件写满了,就开启一个新的文件这样顺序写下去。消费的时候,也是从某个全局的位置开始,也就是某一个 log 文件中的某个位置开始,顺序地把消息读出来。

利用 PageCache 加速消息读写

在 Kafka 中,它会利用 PageCache 加速消息读写。 Kafka 在读写消息文件的时候,充分利用了PageCache的特性。一般来说,消息刚刚写入到服务端就会被消费,按照LRU的“优先清除最近最少使用的页”这种策略,读取的时候,对于这种刚刚写入的PageCache,命中的几率会非常高。也就是说,大部分情况下,消费读消息都会命中 PageCache,带来的好处有两个:一个是读取的速度会非常快,另外一个是,给写入消息让出磁盘的IO资源,间接也提升了写入的性能。

ZeroCopy:零拷贝技术

在服务端,处理消费的大致逻辑是这样的:首先,从文件中找到消息数据,读到内存中;然后,把消息通过网络发给客户端。这个过程中,数据实际上做了 2 次或者 3 次复制:

  • 从文件复制数据到 PageCache 中,如果命中PageCache,这一步可以省掉;
  • 从 PageCache 复制到应用程序的内存空间中,也就是我们可以操作的对象所在的内存;
  • 从应用程序的内存空间复制到Socket的缓冲区,这个过程就是我们调用网络应用框架的 API 发送数据的过程;

Kafka 使用零拷贝技术可以把这个复制次数减少一次,上面的2、3步骤两次复制合并成一次复制。直接从PageCache中把数据复制到Socket缓冲区中,这样不仅减少一次数据复制,更重要的是,由于不用把数据复制到用户内存空间,DMA 控制器可以直接完成数据复制,不需要 CPU 参与,速度更快。

小结

关键的技术点:

  • 使用批量处理的方式来提升系统吞吐能力。
  • 基于磁盘文件高性能顺序读写的特性来设计的存储结构。
  • 利用操作系统的 PageCache 来缓存数据,减少 IO 并提升读性能。
  • 使用零拷贝技术加速消费流程。

16丨缓存策略:如何使用缓存来减少磁盘IO

使用内存作为缓存来加速应用程序的访问速度,是几乎所有高性能系统都会采用的方法。

缓存的思想很简单,就是把低速存储的数据,复制一份副本放到高速的存储中,用来加速数据的访问。

选择只读缓存还是读写缓存?

使用缓存,首先你就会面临选择读缓存还是读写缓存的问题。他们唯一的区别就是,在更新数据的时候,是否经过缓存。

Kafka 使用的 PageCache,它就是一个非常典型的读写缓存

PageCache

操作系统会利用系统空闲的物理内存来给文件读写做缓存,这个缓存叫做 PageCache。应用程序在写文件的时候,操作系统会先把数据写入到 PageCache 中,数据在成功写到PageCache 之后,对于用户代码来说,写入就结束了。 然后,操作系统再异步地把数据更新到磁盘的文件中。应用程序在读文件的时候,操作系统也是先尝试从 PageCache中寻找数据,如果找到就直接返回数据,找不到会触发一个缺页中断,然后操作系统把数据从文件读取到PageCache中,再返回给应用程序。

在数据写到 PageCache 中后,它并不是同时就写到磁盘上了,这中间是有一个延迟的。操作系统可以保证,即使是应用程序意外退出了,操作系统也会把这部分数据同步到磁盘上。但是,如果服务器突然掉电了,这部分数据就丢失了。

读写缓存的这种设计,它天然就是不可靠的,是一种牺牲数据一致性换取性能的设计

为什么 Kafka 可以使用 PageCache 来提升它的性能呢?

  • 消息队列它的读写比例大致是1:1,因为,大部分我们用消息队列都是一收一发这样使用。这种读写比例,只读缓存既无法给写加速,读的加速效果也有限,并不能提升多少性能。
  • Kafka 它并不是只靠磁盘来保证数据的可靠性,它更依赖的是,在不同节点上的多副本来解决数据可靠性问题,这样即使某个服务器掉电丢失一部分文件内容,它也可以从其他节点上找到正确的数据,不会丢消息。
  • PageCache 这个读写缓存是操作系统实现的,Kafka 只要按照正确的姿势来使用就好了,不涉及到实现复杂度的问题。

缓存更新策略

  • 更新数据的同时去更新缓存
  • 是定期来更新全部缓存
  • 给缓存中的每个数据设置一个有效期,让它自然过期以达到更新的目的

缓存置换策略

最经典也是最实用的算法就是 LRU 算法,也叫最近最少使用算法。LRU 的算法原理非常简单,它总是把最长时间未被访问的数据置换出去。

17丨如何正确使用锁保护共享数据,协调异步线程

锁的原理是这样的:任何时间都只能有一个线程持有锁,只有持有锁的线程才能访问被锁保护的资源

避免滥用锁

那是不是遇到这种情况都要用锁解决呢?我分享一下我个人使用锁的第一条原则:如果能不用锁,就不用锁;如果你不确定是不是应该用锁,那也不要用锁

只有在并发环境中,共享资源不支持并发访问,或者说并发访问共享资源会导致系统错误的情况下,才需要使用锁

锁的用法

  • 在访问共享资源之前,先获取锁
  • 如果获取锁成功,就可以访问共享资源了
  • 最后,需要释放锁,以便其他线程继续访问共享资源

注意:

  • 使用完锁,一定要释放它
  • 避免死锁

如何避免死锁?

  • 避免滥用锁,程序里用的锁少,写出死锁 Bug 的几率自然就低;
  • 对于同一把锁,加锁和解锁必须要放在同一个方法中,这样一次加锁对应一次解锁,代码清晰简单,便于分析问题;
  • 尽量避免在持有一把锁的情况下,去获取另外一把锁,就是要尽量避免同时持有多把锁;
  • 如果需要持有多把锁,一定要注意加解锁的顺序,解锁的顺序要和加锁顺序相反;
  • 程序中所有的锁排一个顺序,在所有需要加锁的地方,按照同样的顺序加解锁;

18丨 如何用硬件同步原语(CAS)替代锁?

什么是硬件同步原语?

硬件同步原语(Atomic Hardware Primitives)是由计算机硬件提供的一组原子操作,我们比较常用的原语主要是 CAS 和 FAA 这两种。

CAS(Compare and Swap):先比较一下变量 p 当前的值是不是等于 old,如果等于,那就把变量 p 赋值为 new,并返回 true,否则就不改变变量 p,并返回 false。

FAA(Fetch and Add):先获取变量 p 当前的值 value,然后给变量 p 增加 inc,最后返回变量 p 之前的值 value。

19丨数据压缩

对于 Kafka 来说,使用数据压缩,提升了大概几十倍的吞吐量。 当然,在实际生产时,不太可能达到这么高的压缩率,但是合理地使用数据压缩,仍然可以做到提升数倍的吞吐量。

数据压缩不仅能节省存储空间,还可以用于提升网络传输性能

什么情况适合使用数据压缩?

网络传输数据,对比:

  • 不压缩直接传输需要的时间是: 传输未压缩数据的耗时。
  • 使用数据压缩需要的时间是: 压缩耗时 + 传输压缩数据耗时 + 解压耗时。

压缩和解压的操作都是计算密集型的操作,非常耗费CPU资源。如果你的应用处理业务逻辑就需要耗费大量的CPU资源,就不太适合再进行压缩和解压。

如果你的系统的瓶颈是磁盘的 IO 性能,CPU 资源又很闲,这种情况就非常适 合在把数据写入磁盘前先进行压缩。

如果你的系统读写比严重不均衡,你还要考虑,每读一次数据就要解压一次是不是划算。 压缩本质是资源的置换,是一个时间换空间,或者说是 CPU资源换存储资源的游戏

应该选择什么压缩算法?

目前常用的压缩算法包括:ZIP,GZIP,SNAPPY,LZ4等等。

选择压缩算法的时候,主要需要考虑数据的压缩率和压缩耗时。一般来说,压缩率越高的算法,压缩耗时也越高。如果是对性能要求高的系统,可以选择压缩速度快的算法,比如 LZ4;如果需要更高的压缩比,可以考虑 GZIP 或者压缩率更高的 XZ 等算法。

如何选择合适的压缩分段?

在压缩时,给定的被压缩数据它必须有确定的长度,或者说,是有头有尾的,不能是一个无限的数据流,如果要对流数据进行压缩,那必须把流数据划分成多个帧,一帧一帧的分段压缩。

需要根据业务,选择合适的压缩分段,在压缩率、压缩速度和解压浪费之间找 到一个合适的平衡。

Kafka 是如何处理消息压缩的?

  • Kafka 是否开启压缩,这是可以配置,它也支持配置使用哪一种压缩算法。原因我们在上面说过,不同的业务场景是否需要开启压缩,选择哪种压缩算法是不能一概而论的。所以,Kafka 的设计者把这个选择权交给使用者。
  • 在开启压缩时,Kafka选择一批消息一起压缩,每一个批消息就是一个压缩分段。使用者也可以通过参数来控制每批消息的大小。
  • 在 Kafka 中,生产者生成一个批消息发给服务端,在服务端中是不会拆分批消息的。那按照批来压缩,意味着,在服务端也不用对这批消息进行解压,可以整批直接存储,然后整批发送给消费者。最后,批消息由消费者进行解压。