「微服务架构」跨多个微服务的数据架构模式

kafka的诞生,是为了解决linkedin的数据管道问题,起初linkedin采用了ActiveMQ来进行数据交换,大约是在2010年前后,那时的ActiveMQ还远远无法满足linkedin对数据传递系统的要求。

在微服务中,一个逻辑上原子作可以经常跨越多个微服务。即使是单片系统也可能使用多个数据库或消息传递解决方案。使用多个的数据存储解决方案,如果其中一个分布式流程参与者出现故障,我们就会面临数据不一致的风险 - 例如在未下订单的情况下向客户收费或未通知客户订单成功。在本文中,我想分享一些我为使微服务之间的数据最终保持一致而学到的技术。

kafka架构图_kafka的架构包括哪些组件kafka架构图_kafka的架构包括哪些组件


kafka架构图_kafka的架构包括哪些组件


为什么实现这一目标如此具有挑战性?只要我们有多个存储数据的地方(不在单个数据库中),就不能自动解决一致性问题,工程师在设计系统时需要注意一致性。 目前,在我看来,业界还没有一个广为人知的解决方案,可以在多个不同的数据源中自动更新数据 - 我们可能不应该等待很快就能获得一个。

以自动且无障碍的方式解决该问题的一种尝试是实现两阶段提交(2PC)模式的XA协议。但在现代高规模应用中(特别是在云环境中),2PC似乎表现不佳。为了消除2PC的缺点,我们必须交易ACID for BASE并根据要求以不同方式覆盖一致性问题。

在多个微服务中处理一致性问题的最着名的方法是Saga模式。 您可以将Sagas视为多个事务的应用程序级分布式协调 。 根据用例和要求,您可以优化自己的Saga实施。 相反,XA协议试图涵盖所有场景。 Saga模式也不是新的。 它在过去已知并用于ESB和SOA体系结构中。 ,它成功地转变为微服务世界。 跨越多个服务的每个原子业务作可能包含技术级别的多个事务。 Saga Pattern的关键思想是 能够回滚其中一个单独的交易 。 众所周知,开箱即用的已经提交的单个事务无法进行回滚。 但这是通过引入 补偿作来 实现的 - 通过引入“取消”作。

除了 取消 之外,您还应该考虑使您的服务具有 幂等性 ,以便在出现故障时重试或重新启动某些作。 应故障,并应积极主动地应对故障。

如果在进程的中间 负责调用补偿作的系统崩溃或重新启动 ,该怎么办? 在这种情况下,用户可能会收到错误消息,并Kafka是一个分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。且应该触发补偿逻辑,或者 - 当处理异步用户请求时,应该恢复执行逻辑。

要查找崩溃的事务并恢复作或应用补偿,我们需要协调来自多个服务的数据。 对账

是在金融领域工作的工程师所熟悉的技术。你有没有想过银行如何确保你的资金转移不会丢失,或者两个不同的银行之间如何汇款?快速回答是对账。

回到微服务,使用相同的原则,我们可以在一些 动作触发器 上协调来自多个服务的数据。当检测到故障时,可以按或由系统触发作。最简单的方法是运行逐记录比较。可以通过 比较聚合值来 优化该过程。在这种情况下,其中一个系统将成为每条记录的真实来源。

想象一下多步骤交易。如何在对帐期间确定哪些事务可能已失败以及哪些步骤失败?一种解决方案是 检查每个事务的状态 。在某些情况下,此功能不可用(想象一下发送电子邮件或生成其他类型消息的无状态邮件服务)。在其他一些情况下,您可能希望立即了解事务状态,尤其是在具有许多步骤的复杂方案中。例如,预订航班,酒店和转机的多步订单。

在这些情况下,日志可以提供帮助。 记录是一种简单但功能强大的技术 。 许多分布式系统依赖于日志 。 “ 预写日志记录 ”是数据库在内部实现事务行为或维护副本之间一致性的方式。相同的技术可以应用于微服务设计。在进行实际数据更改之前,服务会写入有关其进行更改的意图的日志条目。实际上, 日志可以是协调服务所拥有的数据库中的表或 。

日志不仅可用于 恢复事务处理 ,还可用于为系统用户,客户或支持团队提供 可见性 。但是,在简单方案中,服务日志可能是冗余的, 状态端点或状态字段 就足够了。

到目前为止,您可能认为s只是编配(orchestration )方案的一部分。但是s也可以用于编排(choreography ),每个微服务只知道过程的一部分。 Sagas包括处理分布式事务的正流和负流的知识。在编排(choreography )中,每个分布式事务参与者都具有这种知识。

到目前为止描述的一致性解决方案并不容易。他们确实很复杂。但有一种更简单的方法: 一次修改一个数据源 。我们可以将这两个步骤分开,而不是改变服务的状态并在一个过程中发出。

但是,有时候不需要特定的框架。一些数据库提供了一种友好的方式来拖尾其作日志,例如MongoDB Oplog。如果数据库中没有此类功能,则可以通过时间戳轮询更改,或使用上次处理的不可变记录ID查询更改。避免不一致的关键是使数据更改通知成为一个单独的过程。在这种情况下,数据库记录是 单一的事实来源 。只有在首先发生变化时才会捕获更改。

更改数据捕获的缺点是业务逻辑的分离。更改捕获过程很可能与更改逻辑本身分开存在于您的代码库中 - 这很不方便。最知名的变更数据捕获应用程序是与域无关的变更,例如与数据仓库共享数据。对于域,采用不同的机制,例如明确发送。

让我们来看看颠倒的单一事实来源。如果不是先写入数据库,而是先触发一个,然后与自己和其他服务共享。在这种情况下,成为事实的来源。这将是一种源的形式,其中我们自己的服务状态有效地成为读取模型,并且每个都是写入模型。

“优先”方法面临的挑战也是CQRS本身的挑战。想象一下,在下订单之前,我们想要检查商品的可用性。如果两个实例同时收到同一项目的订单怎么办?两者都将同时检查读取模型中的库存并发出订单。如果没有某种覆盖方案,我们可能会遇到麻烦。

“优先”方法的另一个挑战是任何驱动架构的挑战 - 的顺序。多个并发消费者以错误的顺序处理可能会给我们带来另一种一致性问题,例如处理尚未创建的客户的订单。

诸如Kafka或AWS Kinesis之类的数据流解决方案可以保证将按顺序处理与单个实体相关的(例如,仅在创建用户之后为客户创建订单)。例如,在Kafka中,您可以按用户ID对主题进行分区,以便与单个用户相关的所有将由分配给该分区的单个使用者处理,从而允许按顺序处理它们。相反,在Message Brokers中,消息队列具有一个订单,但是多个并发消费者在给定顺序中进行消息处理(如果不是不可能的话)。在这种情况下,您可能会遇到并发问题。

实际上,在需要线性化的情况下或在具有许多数据约束的情况(例如性检查)中,难以实现“优先”方法。但它在其他情况下确实很有用。但是,由于其异步性质,仍然需要解决并发和竞争条件的挑战。

有许多方法可以将系统拆分为多个服务。我们努力将单独的微服务与单独的域匹配。但域名有多细化?有时很难将域与子域或聚合根区分开来。没有简单的规则来定义您的微服务拆分。

数据的原子更新需要两个不同系统之间达成共识,如果单个值为0或1则达成协议。当涉及到微服务时,它归结为两个参与者之间的一致性问题,并且因而在版本中,kafka将分组消费的位移数据存储在了一个特殊的topic中,即__consumer_offsets,由于每个分组group的位移信息都会提交到该topic,因而kafka默认为其设置了非常多的分区,也即50个分区。所有实际解决方案都遵循一条经验法则:

在给定时刻,对于每个数据记录,您需要找到系统信任的数据源

事实的来源可能是,数据库或其中一项服务。实现微服务系统的一致性是开发人员的。我的方法如下:

请问kafka和rabbitmq有啥区别啊?

1、顺序读写

RabbitMQ,遵循AMQP协议,由内在高并发的erlang语言开发,用在实时的对可靠性要求比较高的消息传递上。

kafka和RabbitMQ的区别还是挺大的:

RabbitMQ遵循AMQP协议,RabbitMQ的broker由Exchange,Binding,queue组成,其中exchange和binding组成了消息的路由键;客户端Producer通过连接channel和server进行通信,Consumer从queue获取消息进行消费(长连接,queue有消息会推送到consumer端,consumer循环从输入流读取数据)。rabbitMQ以broker为中心;有消息的确认机制。

kafka遵从一般的MQ结构,producer,broker,consumer,以consumer为中心,消息的消费信息保存的客户端consumer上,consumer根据消费的点,从broker上批量pull数据;无消息确认机制。

2、吞吐量

rabbitMQ在吞吐量方面稍逊于kafka,他们的出发点不一样,rabbitMQ支持对消息的可靠的传递,支持事务,不支持批量的作;基于存储的可靠性的要求存储可以采用内存或者硬盘。

kafka具有高的吞吐量,内部采用消息的批量处理,zero-copy机制,数据的存储和获取是本地磁盘顺序批量作,具有O(1)的复杂度,消息处理的效率很高。

3在实际应用中,Kafka高并发实现具有以下优势和劣势:、可用性

rabbitMQ支持or的queue,主queue失效,or queue接管。

4、先接收生产者发来的消息,再落入磁盘。集群负载均衡

kafka采用zookeeper对集群中的broker、consumer进行管理,可以注册topic到zookeeper上;通过zookeeper的协调机制,producer保存对应topic的broker信息,可以随机或者轮询发送到broker上;并且producer可以基于语义指定分片,消息发送到broker的某分片上。

kafka低版本的怎么用ja查询给定broker上所有的日志目录信息?

OpenTelemetry 收集器端点

1. 日志存储格式

比如:读取文件,再用socket发送出去

版本的kafka日志是以批为单位进行日志存储的,所谓的批指的是kafka会将多条日志压缩到同一个batch中,然后以batch为单位进行后续的诸如索引的创建和消息的查询等工作。

对于每个批次而言,其默认大小为4KB,并且保存了整个批次的起始位移和时间戳等元数据信息,而对于每条消息而言,其位移和时间戳等元数据存储的则是相对于整个批次的元数据的增量,通过这种方式,kafka能够减少每条消息中数据占用的磁盘空间。

这里我们首先展示一下每个批次的数据格式:

起始位移:占用8字节,其存储了当前batch中条消息的位移;

长度:占用了4个字节,其存储了整个batch所占用的磁盘空间的大小,通过该字段,kafka在进行消息遍历的时候,可以快速的跳跃到下一个batch进行数据读取;

分区leader版本号:记录了当前消息所在分区的leader的版本,主要用于进行一些数据版本的校验和转换工作;

CRC:对当前整个batch的数据的CRC校验码,主要是用于对数据进行错校验的;

属性:占用2个字节,这个字段的3位记录了当前batch中消息的压缩方式,现在主要有GZIP、LZ4和Snappy三种。第4位记录了时间戳的类型,第5和6位记录了新版本引入的事务类型和控制类型;

位移增量:的消息的位移相对于条消息的增量;

起始时间戳:占用8个字节,记录了batch中条消息的时间戳;

时间戳:占用8个字节,记录了batch中的一条消息的时间戳;

PID、producer epoch和起始序列号:这三个参数主要是为了实现事务和幂等性而使用的,其中PID和producer epoch用于确定当前producer是否合法,而起始序列号则主要用于进行消息的幂等校验;

消息个数:占用4个字节,记录当前batch中所有消息的个数;

通过上面的介绍可以看出,每个batch的头部数据中占用的字节数固定为61个字节,可变部分主要是与具体的消息有关,下面我们来看一下batch中每条消息的格式:

这里的消息的头部数据就与batch的大不相同,可以看到,其大部分数据的长度都是可变的。既然是可变的,这里我们需要强调两个问题:

1、对于数字的存储,kafka采用的是Zig-Zag的存储方式,也即负数并不会使用补码的方式进行编码,而是将其转换为对应的正整数,比如-1映射为1、1映射为2、-2映射为3、2映射为4,关系图如下所示:

通过图可以看出,在对数据反编码的时候,我们只需要将对应的整数转换成其原始值即可;

2、在使用Zig-Zag编码方式的时候,每个字节为128,而其中一半要存储正数,一半要存储负数,还有一个0,也就是说每个字节能够表示的整数为64,此时如果有大于64的数字,kafka就会使用多个字节进行存储。

而这多个字节的表征方式是通过将每个字节的位作为保留位来实现的,如果位为1,则表示需要与后续字节共同表征目标数字,如果位为0,则表示当前位即可表示目标数字。

对于上面的每条消息的格式,除了消息key和value相关的字段,其还有属性字段和header,属性字段的主要作用是存储当前消息key和value的压缩方式,而header则供给用户进行添加一些动态的属性,从而实现一些定制化的工作。

通过对kafka消息日志的存储格式我们可以看出,其使用batch的方式将一些公共信息进行提取,从而保证其只需要存储一份,虽然看起来每个batch的头部信息比较多,但其平摊到每条消息上之后使用的字节更少了;

在消息层面,kafka使用了数据增量的方式和Zig-Zag编码方式对数据进行的压缩,从而极大地减少其占用的字节数。总体而言,这种存储方式极大的减少了kafka占用的磁盘空间大小。

2. 日志存储方式

这里我们需要注意的是,图中对于分区日志的存储,当前broker只会存储分配给其的分区的日志,比如图中的connect-status就只有分区1和分区4的目录,而没有分区2和分区3的目录,这是因为这些分区被分配在了集群的其他上。

在每个分区日志目录中,存在有三种类型的日志文件,即后缀分别为log、index和timeindex的文件。其中log文件就是真正存储消息日志的文件,index文件存储的是消息的位移索引数据,而timeindex文件则存储的是时间索引数据。

从图中可以看出,每种类型的日志文件都是分段的,这里关于分段的规则主要有如下几点需要说明:

在为日志进行分段时,每个文件的文件名都是以该段中条消息的位移的偏移量来命名的;

kafka会在每个log文件的大小达到1G的时候关闭该文件,而新开一个文件进行数据的写入。可以看到,图中除了的log文件外,其余的log文件的大小都是1G;

对于index文件和timeindex文件,在每个log文件进行分段之后,这两个索引文件也会进行分段,这也就是它们的文件名与log文件一致的原因;

kafka日志的留存时间默认是7天,也就是说,kafka会删除存储时间超过7天的日志,但是对于某些文件,其部分日志存储时间未达到7天,部分达到了7天,此时还是会保留该文件,直至其所有的消息都超过留存时间;

3. 索引文件

kafka主要有两种类型的索引文件:位移索引文件和时间戳索引文件。位移索引文件中存储的是消息的位移与该位移所对应的消息的物理地址;时间戳索引文件中则存储的是消息的时间戳与该消息的位移值。

1、由于kafka消息都是以batch的形式进行存储,因而索引文件中索引元素的最小单元是batch,也就是说,通过位移索引文件能够定位到消息所在的batch,而没法定位到消息在batch中的具置,查找消息的时候,还需要进一步对batch进行遍历;

2、位移索引文件中记录的位移值并不是消息真正的位移值,而是该位移相对于该位移索引文件的起始位移的偏移量,通过这种方式能够极大的减小位移索引文件的大小。

如下图所示为一个位移索引文件的格式示意图:

如下则是具体的位移索引文件的示例:

关于时间戳索引文件,由于时间戳的变化比位移的变化幅度要大一些,其即使采用了增量的方式存储时间戳索引,但也没法有效地使用Zig-Zag方式对数据进行编码,因而时间戳索引文件是直接存储的消息的时间戳数据,

但是对于时间戳索引文件中存储的位移数据,由于其变化幅度不大,因而其还是使用相对位移的方式进行的存储,并且这种存储方式也可以直接映射到位移索引文件中而无需进行计算。如下图所示为时间戳索引文件的格式图:

如下则是时间戳索引文件的一个存储示例:

可以看到,如果需要通过时间戳来定位消息,就需要首先在时间戳索引文件中定位到具体的位移,然后通过位移在位移索引文件中定位到消息的具体物理地址。

4. 日志压缩

所谓的日志压缩功能,其主要是针对这样的场景的,比如对某个用户的邮箱数据进行修改MSK的计价方式是以Kafka Broker以及配置存储每小时计价,MSK的数据传输费用与原本的AWS数据传输相同,而集群所使用的Zookeeper,还有区域集群的Broker和Zookeeper互传数据是不额外收费的。现在用户已经可以在大部分的AWS区域使用到MSK服务,包括北美、与欧洲。,其总共修改了三次,修改过程如下:

email=john@gmail

kafka的日志压缩就是为了解决这个问题而存在的,对于使用相同key的消息,其会只保留的一条消息的记录,而中间过程的消息都会被kafka cleaner给清理掉。

图中K1的数据有V1、V3和V4,经过压缩之后只有V4保留了下来,K2的数据则有V2、V6和V10,压缩之后也只有V10保留了下来;同理可推断其他的Key的数据。

另外需要注意的是,kafka开启日志压缩使用的是log.cleanup.policy,其默认值为delete,也即我们正常使用的策略,如果将其设置为compaction,则开启了日志压缩策略,但是需要注意的是,开启了日志压缩策略并不代表kafka会清理历史数据,只有将log.cleaner.enable设置为true才会定时清理历史数据。

在kafka中,其本身也在使用日志压缩策略,主要体现在kafka消息的偏移量存储。在旧版本中,kafka将每个consumer分组当前消费的偏移量信息保存在zookeeper中,但是由于zookeeper是一款分布式协调工具,其对于读作具有非常高的性能,但是对于写作性能比较低,而consumer的位移提交动作是非常频繁的,这势必会导致zookeeper成为kafka消息消费的瓶颈。

另外,consumer在提交位移时,使用的key为groupId+topic+partition,而值则为当前提交的位移,也就是说,对于每一个分组所消费的topic的partition,其都只会保留的位移。如果consumer需要读取位移,那么只需要按照上述格式组装key,然后在该topic中读取的消息数据即可。

kafka是什么意思

分布式系统通信:Kafka可以作为分布式系统之间的通信协议,用来传输大量的数据和消息。

kafka的意思是:卡夫卡。

在使用kafka时,消息都是推送到某个topic中,然后由producer计算当前消息会发送到哪个partition,在partition中,kafka会为每条消息设置一个偏移量,也就是说,如果要定位一条消息,使用三元组即可。

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Ja编写。Kafka是一种高吞吐量的分布式发布消息系统,它可以处理消费者在网站中的所有动作流数据。

这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。

kafka的架构师jaykreps对于kafka的名称由来是这样讲的,由于jaykreps非常喜欢franzkafka。并且觉得kafka这个名字很酷,因此取了个和消息传递系统完全不相干的名称kafka,该名字并没有特别的含义。

经常由于各种缺陷而导致消息阻塞或者服务无常访问,为了能够解决这个问题,linkedin决定研发自己的消息传递系基于kafka的架构模式,其会将各个分区平均分配到每个broker中,也就是说每个broker会被分配用来提供一个或多个分区的日志存储服务。在broker上,kafka的日志也是按照partition进行存储的,其会在指定的日志存储目录中为每个topic的partition分别创建一个目录,目录中存储的就是这些分区的日志数据,而目录的名称则会以的格式进行创建。如下是kafka日志的存储目录示意图:统,当时linkedin的首席架构师jaykreps便开始组织团队进行消息传递系统的研发。

arm架构kafka安装

主要作在主机vcapp上进行

编辑config/zookeeper.properties

编辑config/server.prSleuth 是一个由 Spring Cloud 团队管理和维护的项目,旨在将分布式跟踪功能集成到 Spring Boot 应用程序中。它作为一个典型Spring Starter的 . 以下是一些开箱即用的 Sleuth 工具:operties

编辑bin/kafka-run-class.sh

将安装目录拷贝到在主要业务作中,我们修改自己的服务状态,而单独的进程可靠地捕获更改并生成。这种技术称为变更数据捕获(CDC)。实现此方法的一些技术是Kafka Connect或Debezium。剩余主机的对应目录中

启动zookeeper

启处理这些情况的常用方法是乐观并发:将读取模型版本放入中,如果读取模型已在消费者端更新,则在消费者端忽略它。另一种解决方案是使用悲观并发控制,例如在检查项目可用性时为项目创建锁定。动kafka

kafka记录hive中字段变化

消息队列:Kafka是一种高效的消息队列系统,可以用来实现实时消息的发布和。

从数据优势上游到数据下游,大致可以分为:数据采集 -> 数据清洗 -> 数据存储 -> 数据分析统计 -> 数据可视化。

高吞吐:Kafka拥有很高的吞吐量,即使是在单性能比较低下的商用集群中,也能保证单每秒10万条消息的传输。高容错:Kafka在设计上支持多分区、多副本的策略,拥有很强的容错性。

安全正成为系统选型不得不考虑的问题,Kafka由于其安全机制的匮乏,也导致其在数据敏感行业的部署存在的安全隐患。本文将围绕Kafka,先介绍其整体架构和关键概念,再深入分析其架构之中存在的安全问题,分享下Transwarp在Kafka安全性上所做的工作及其使用方法。

适用场景:

hive 构建在基于静态批处理的Hadoop 之上,Hadoop 通常都有较高的延迟并且在作业提交和调度的时候需要大量的开销。因此,hive 并不能够在大规模数据集上实现低延迟快速的查询,例如,hive 在几百MB 的数据集上执行查询一般有分钟级的时间延迟。

因此,hive 并不适合那些需要高实时性的应用,例如,联机事务处理(OLTP)。hive 查询作过程严格遵守Hadoop MapReduce 的作业执行模型,hive 将用户的hiveSQL 语句通过解释器转换为MapReduce 作业提交到Hadoop 集群上。

Kafka零拷贝

简称mmap,简单描述其作用就是:将磁盘文件映射到内存, 用户通过修改内存就能修改磁盘文件。

用kafka做存储层,为什么呢?一大堆可以做数据存储的 MySQL、MongoDB、HDFS……

6. 总结

因为kafka数据是持久化磁盘的,还速度快;还可靠、支持分布式……

啥!用了磁盘,还速度快!!!

没错,kafka就是速度,本文将探究kafka性能背后的秘密。

首先要有个概念,kafka高性能的背后,是多方面协同后、最终的结果,kafka从宏观架构、分布式partition存储、ISR数据同步、以及“无孔不入”的高效利用磁盘/作系统特性,这些多方面的协同,是kafka成为性能的必然结果。

本文将从kafka零拷贝,探究其是如何“无孔不入”的高效利用磁盘/作系统特性的。

零拷贝并不是不需要拷贝,而是减少不必要的拷贝次数。通常是说在IO读写过程中。

实际上,零拷贝是有广义和狭义之分,目前我们通常听到的零拷贝,包括上面这个定义 减少不必要的拷贝次数 都是广义上的零拷贝。其实了解到这点就足够了。

我们知道,减少不必要的拷贝次数,就是为了提高效率。那零拷贝之前,是怎样的呢?

传统方式实现:

1、次:将磁盘文件,读取到作系统内核缓冲区;

2、第二次:将内核缓冲区的数据,copy到application应用程序的buffer;

3、第三步:将application应用程序buffer中的数据,copy到socket网络发送缓冲区(属于作系统内核的缓冲区);

4、第四次:将socket buffer的数据,copy到网卡,由网卡进行网络传输。

传统方式,读取磁盘文件并进行网络发送,经过的四次数据copy是非常繁琐的。实际IO读写,需要进行IO中断,需要CPU响应中断(带来上下文切换),尽管后来引入DMA来接管CPU的中断请求,但四次copy是存在“不必要的拷贝”的。

重新思考传统IO方式,会注意到实际上并不需要第二个和第三个数据副本。应用程序除了缓存数据并将其传输回套接字缓冲区之外什么都不做。相反,数据可以直接从读缓冲区传输到套接字缓冲区。

显然,第二次和第三次数据copy 其实在这种场景下没有什么帮助反而带来开销,这也正是零拷rabbitMQ的负载均衡需要单独的loadbalancer进行支持。贝出现的意义。

这种场景 :是指读取磁盘文件后,不需要做其他处理,直接用网络发送出去。试想,如果读取磁盘的数据需要用程序进一步处理的话,必须要经过第二次和第三次数据copy,让应用程序在内存缓冲区处理。

kafka作为MQ也好,作为存储层也好,无非是两个重要功能,一是Producer生产的数据存到broker,二是 Consumer从broker读取数据;我们把它简化成如下两个过程:

1、网络数据持久化到磁盘 (Producer 到 Broker)

2、磁盘文件通过网络发送(Broker 到 Consumer)

下面,先给出“kafka用了磁盘,还速度快”的结论

磁盘顺序读或写的速度400M/s,能够发挥磁盘的速度。

随机读写,磁盘速度慢的时候十几到几百K/s。这就看出了距。

kafka将来自Producer的数据,顺序追加在partition,partition就是一个文件,以此实现顺序写入。

顺序读写,是kafka利用磁盘特性的一个重要体现。

2、零拷贝 sendfile(in,out)

数据直接在内核完成输入和输出,不需要拷贝到用户空间再写出去。

kafka数据写入磁盘前,数据先写到进程的内存空间。

3、mmap文件映射

虚拟映射只支持文件;

在进程 的非堆内存开辟一块内存空间,和OS内核空间的一块内存进行映射,

kafka数据写入、是写入这块内存空间,但实际这块内存和OS内核内存有映射,也就是相当于写在内核内存空间了,且这块内核空间、内核直接能够访问到,直接落入磁盘。

这里,我们需要清楚的是:内核缓冲区的数据,flush就能完成落盘。

我们来重点探究 kafka两个重要过程、以及是如何利用两个零拷贝技术sendfile和mmap的。

传统方式实现:

实际会经过四次copy,如下图的四个箭头。

数据落盘通常都是非实时的,kafka生产者数据持久化也是如此。Kafka的数据 并不是实时的写入硬盘 ,它充分利用了现代作系统分页存储来利用内存提高I/O效率。

对于kafka来说,Producer生产的数据存到broker,这个过程读取到socket buffer的网络数据,其实可以直接在OS内核缓冲区,完成落盘。并没有必要将socket buffer的网络数据,读取到应用进程缓冲区;在这里应用进程缓冲区其实就是broker,broker收到生产者的数据,就是为了持久化。

在此 特殊场景 下:接收来自socket buffer的网络数据,应用进程不需要中间处理、直接进行持久化时。——可以使用mmap内存文件映射。

它的工作原理是直接利用作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的作会被同步到硬盘上(作系统在适当的时候)。

通过mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存),也不必关心内存的大小有虚拟内存为我们兜底。

使用这种方式可以获取很大的I/O提升,省去了用户空间到内核空间的开销。

mmap也有一个很明显的缺陷——不可靠,写到mmap中的数据并没有被真正的写到硬盘,作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。Kafka提供了一个参数——producer.type来控制是不是主动flush;如果Kafka写入到mmap之后就立即flush然后再返回Producer叫同步(sync);写入mmap之后立即返回Producer不调用flush叫异步(async)。

Ja NIO,提供了一个 MappedByteBuffer 类可以用来实现内存映射。

MappedByteBuffer只能通过调用FileChannel的map()取得,再没有其他方式。

FileChannel.map()是抽象方法,具体实现是在 FileChannelImpl.c 可自行查看JDK源码,其map0()方法就是调用了Linux内核的mmap的API。

使用 MappedByteBuffer类要注意的是:mmap的文件映射,在full gc时才会进行释放。当close时,需要手动清除内存映射文件,可以反射调用sun.misc.Cleaner方法。

传统方式实现:

先读取磁盘、再用socket发送,实际也是进过四次copy。

而 Linux 2.4+ 内核通过 sendfile 系统调用,提供了零拷贝。磁盘数据通过 DMA 拷贝到内核态 Buffer 后,直接通过 DMA 拷贝到 NIC Buffer(socket buffer),无需 CPU 拷贝。这也是零拷贝这一说法的来源。除了减少数据拷贝外,因为整个读文件 - 网络发送由一个 sendfile 调用完成,整个过程只有两次上下文切换,因此大大提高了性能。零拷贝过程如下图所示。

相比于文章开始,对传统IO 4步拷贝的分析,sendfile将第二次、第三次拷贝,一步完成。

其实这项零拷贝技术,直接从内核空间(DMA的)到内核空间(Socket的)、然后发送网卡。

应用的场景非常多,如Tomcat、Nginx、Apache等web返回静态资源等,将数据用网络发送出去,都运用了sendfile。

简单理解 sendfile(in,out)就是,磁盘文件读取到作系统内核缓冲区后、直接扔给网卡,发送网络数据。

Ja NIO对sendfile的支持 就是FileChannel.transferTo()/transferFrom()。

fileChannel.transferTo( ition, count, socketChannel);

把磁盘文件读取OS内核缓冲区后的fileChannel,直接转给socketChannel发送;底层就是sendfile。消费者从broker读取数据,就是由此实现。

具体来看,Kafka 的数据传输通过 TransportLayer 来完成,其子类 PlaintextTransportLayer 通过Ja NIO 的 FileChannel 的 transferTo 和 transferFrom 方法实现零拷贝。

注: transferTo 和 transferFrom 并不保证一定能使用零拷贝。实际上是否能使用零拷贝与作系统相关,如果作系统提供 sendfile 这样的零拷贝系统调用,则这两个方通过这样的系统调用充分利用零拷贝的优势,否则并不能通过这两个方法本身实现零拷贝。

总的来说Kafka快的原因:

1、partition顺序读写,充分利用磁盘特性,这是基础;

2、Producer生产的数据持久化到broker,采用mmap文件映射,实现顺序的快速写入;

3、Customer从broker读取数据,采用sendfile,将磁盘文件读到OS内核缓冲区后,直接转到socket buffer进行网络发送。

1、都是Linux内核提供、实现零拷贝的API;

2、sendfile 是将读到内核空间的数据,转到socket buffer,进行网络发送;

3、mmap将磁盘文件映射到内存,支持读和写,对内存的作会反映在磁盘文件上。

RocketMQ 在消费消息时,使用了 mmap。kafka 使用了 sendFile。

AWS正式发布Kafka云服务,不用再为配置复杂心了

先读取、再1、架构模型发送,实际经过1~4四次copy。

AWS在re:Invent 20会上首先发布了托管Apache Kafka消息队列服务(Amazon Mad Streaming for Apache Kafka,MSK)的消息,现在已经从预览成为正式服务。

图中消息批次的每个元数据都有固定的长度大小,而只有面的消息个数的是可变的。如下是batch中主要的属性的含义:

Apache Kafka是一个分布式的消息队列系统,其使用发布以及的架构,将产生的流数据的应用与利用流数据的角色分离。Apache Kafka让使用者可以捕捉如消息队列、交易、物联网等,或是应用与日志等流数据,还能实时进行分析,连续不间断地转换数据,并再将收到的数据经过处理后,分发到其他的数据湖和数据库中。

AWS提到,用户在生产环境中要配置Apache Kafka,需要克服一些障碍,特别是在后续的管理以及规模扩展工作上,而现在AWS正式推出的MSK服务,则由AWS负责管理任务,让用户可以简单地配置使用,而且由于近几个版本的Kafka,都需要与协调程序Zookeeper共同使用,因此MSK服务也只要简单地设定,就能让Kafka与ZooKeeper一同运行。

使用MSK服务,用户可以在几分钟内创建集群,并使用AWS身分管理与访问控制IAM管理集群作,也能通过ACM(AWS Certificate Mar)完全托管的TLS私密凭证颁发机构授权客户端,以TLS加密数据,并使用KMS(AWS Key Mament Serv)中的密钥加密其他数据。当发生故障时,MSK还会替换故障机器,自动执行修补,用户可以从Amazon CloudWatch中,服务的状态指标。

AWS表示,MSK与Kafka 1.1.1和2.1.0版本完全兼容,因此用户可以在AWS直接执行原本的Kafka应用以及工具,而不需要修改任何的代码,用户能使用开源工具MirrorMaker,将数据从现有的Kafka集群直接迁移到MSK上。

OpenTelemetry、Spring Cloud Sleuth、Kafka、Jager实现分布式跟踪

如下图所示为一个分区的消息日志数据:

分布式跟踪可让您深入了解特定服务在分布式软件系统中作为整体的一部分是如何执行的。它跟踪和记录从起点到目的地的请求以及它们经过的系统。

kafka使用这种编码方式的优点在于,大部分的数据增量都是非常小的数字,因此使用一个字节即可保存,这比直接使用原始类型的数据要节约大概七倍的内存。

在本文中,我们将使用 OpenTelemetry、Spring Cloud Sleuth、Kafka 和 Jaeger 在三个 Spring Boot 微服务 中实现分布式跟踪。

我们先来看看分布式中的一些基本术语。

跨度:表示系统内的单个工作单元。跨度可以相互嵌套以模拟工作的分解。例如,一个跨度可能正在调用一个 REST 端点,然后另一个子跨度可能是该端点调用另一个,等等在不同的服务中。

Trace:所有共享相同根跨度的跨度,或者更简单地说,将所有跨度创建为原始请求的直接结果。跨度的层次结构(每个跨度在根跨度旁边都有自己的父跨度)可用于形成有向无环图,显示请求在通过各种组件时的路径。

OpenTelemetry ,也简称为 OTel,是一个供应商中立的开源 Observability 框架,用于检测、生成、收集和导出遥测数据,例如 跟踪 、 指标 和 日志 。作为 云原生 计算基金会 (CNCF) 的孵化项目,OTel 旨在提供与供应商无关的统一库和 API 集——主要用于收集数据并将其传输到某处。OTel 正在成为生成和管理遥测数据的世界标准,并被广泛采用。

Sleuth 添加了一个,以确保在请求中传递所有跟踪信息。每次调用时,都会创建一个新的 Span。它在收到响应后关闭。

Sleuth 能够跟踪您的请求和消息,以便您可以将该通信与相应的日志条目相关联。您还可以将跟踪信息导出到外部系统以可视化延迟。

Jaeger 最初由 Uber 的团队构建,然后于 2015 年开源。它于 2017 年被接受为云原生孵化项目,并于 2019 年毕业。作为 CNCF 的一部分,Jaeger 是云原生 架构 中公认的项目。它的源代码主要是用 Go 编写的。Jaeger 的架构包括:

与 Jaeger 类似,Zipkin 在其架构中也提供了相同的组件集。尽管 Zipkin 是一个较老的项目,但 Jaeger 具有更现代和可扩展的设计。对于此示例,我们选择 Jaeger 作为后端。

让我们设计三个 Spring Boot 微服务:

这三个微服务旨在:

这是为了观察 OpenTelemetry 如何结合 Spring Cloud Sleuth 处理代码的自动检测以及生成和传输跟踪数据。上面的虚线捕获了微服务导出的跟踪数据的路径,通过OTLP(OpenTelemetry Protocol)传输到OpenTelemetry Collector,收集器依次处理并将跟踪数据导出到后端Jaeger进行存储和查询。

使用 monorepo,我们的项目结构如下:

第 1 步:添加 POM 依赖项

这是使用 OTel 和 Spring Cloud Sleuth 实现分布式跟踪的关键。我们的目标是不必手动检测我们的代码,因此我们依靠这些依赖项来完成它们设计的工作——自动检测我们的代码,除了跟踪实现、将遥测数据导出到 OTel 收集器等。

第 2 步:OpenTelemetry 配置

对于每个微服务,我们需要在其中添加以下配置application.yml(请参阅下面部分中的示例片段)。spring.sleuth.o.exporter.otlp.endpoint主要是配置OTel Collector端点。它告诉导出器,在我们的例子中是 Sleuth,通过 OTLP 将跟踪数据发送到指定的收集器端点://o-collector:4317。注意o-collector端点 URL 来自o-collector图像的 docker-come 服务。

跟踪数据概率抽样

spring.sleuth.o.config.trace-id-ratio-based属性定义了跟踪数据的采样概率。它根据提供给采样器的分数对一部分迹线进行采样。概率抽样允许 OpenTelemetry 跟踪用户通过使用随机抽样技术降低跨度收集成本。如果该比率小于 1.0,则某些迹线将不会被导出。对于此示例,我们将采样配置为 1.0、。

有关其他 OTel SConsumer从broker读取数据时,因为自带了偏移量,接着上次读取的位置继续读,以此实现顺序读。pring Cloud Sleuth 属性,请参阅常见应用程序属性。

OpenTelemetry 配置文件

我们需要项目根目录下的 OTel 配置文件o-config.yaml。内容如下。此配置文件定义了 OTel 接收器、处理器和导出器的行为。正如我们所看到的,我们定义了我们的接收器来 gRPC 和 HTTP,处理器使用批处理和导出器作为 jaeger 和日志记录。

第 3 步:docker-come 将所有内容串在一起

让我们看看我们需要启动哪些 docker 容器来运行这三个微服务并观察它们的分布式跟踪,前三个微服务在上面的部分中进行了解释。

运行docker-come up -d以调出所有九个容器:

第 4 步:数据在行动

快乐之路

现在,让我们启动customer-serv-bff流程的入口点,以创建新客户。

启动 Jaeger UI, [= Traces按钮,这是我们看到的创建客户跟踪:它跨越三个服务,总共跨越六个,持续时间 82.35 毫秒。

除了 Trace Timeline 视图(上面的屏幕截图),Jaeger 还提供了一个图形视图(Trace Graph在右上角的下拉菜单中选择):

三个微服务在 docker 中的日志输出显示相同的跟踪 id,以红色突出显示,并根据其应用程序名称显示不同的跨度 id(应用程序名称及其对应的跨度 id 以匹配的颜色突出显示)。在 的情况下customer-serv,相同的 span id 从 REST API 请求传递到 Kafka 发布者请求。

customer-serv让我们在 docker 中暂停我们的PostgreSQL 数据库,然后重复从customer-serv-bff. 500 internal server error正如预期的那样,我们得到了。检查 Jaeger,我们看到以下跟踪,异常堆栈跟踪抱怨SocketTimeoutException,再次如预期的那样。

识别长期运行的跨度

在这个故事中,我们从 OpenTelemetry、Spring Cloud Sleuth 和 Jaeger 的角度解压了分布式跟踪,验证了 REST API 调用和 Kafka pub/sub 中分布式跟踪的自动检测。我希望这个故事能让你更好地理解这些跟踪框架和工具,尤其是 OpenTelemetry,以及它如何从根本上改变我们在 分布式系统 中进行可观察性的方式。

消息队列(三)kafka的一致性和失败处理策略

但是需要注意的是,kafka并不会清理当前处于活跃状态的日志文件中的消息记录。所谓当前处于活跃状态的日志文件,也就是当前正在写入数据的日志文件。如下图所示为一个kafka进行日志压缩的示例图:

处理消息需要是幂等的,消息的生产方和接收方都需要做到幂等性;发送放需要添加一个定时器来遍历重推未处理的消息,避免消息丢失,造成的事务执行断裂。

kafka的broker支持主备模式。

如果要保证一致性,需要生产者在失败后重试,不过重试又会导致消息重复的问题,一个解决方案是每个消息给一个的id,通过服务端的主动去重来避免重复消息的问题,不过这一机制目前Kafka还未实现。

这时rahbitMQ会立即将消息删除,这种情况下如果消费者出现异常而没能处理该消息(但是消息队列那边已经认为消息被消费了),就会丢失该消息。至于解决方案,采用手动确认消息即可。

Kafka存储机制此时Producer端生产的消息会不断追加到log文件末尾,这样文件就会越来1. Kafka是什么?越大,为了防止log文件过大导致数据定位效率低下,那么Kafka采取了分片和索引机制。