kafka-broker

瞻前

简单使用

http://ks.netease.com/blog?id=9859

producer

http://ks.netease.com/blog?id=9859

consumer

http://ks.netease.com/blog?id=9860


顾后

1、结构

admin - 主要有个adminUtil比较重要,创建topic的

api - 处理各个消息

client - 仅一个clientUtil,客户端的方法封装

cluster - 包含了cluser、topic、patition、replica的类的封装

common - 公共包

consumer - 消费者相关

controller - kafka的中央控制器,负责topic的创建、消息备份、leader选举等

javaapi - java相关

log - 日志读写相关

message - 接收的消息相关

metrics - 一些统计值

network -  网络相关,接收消息、消息buffer等

producer - 生成者

sserializer - 消息序列化

server - 里面的东西比较杂,主要还是启动相关

tools - 工具类


1、配置

broker.id=3
listeners=PLAINTEXT://:9092
log.dirs=/home/kafkaInstance/quote.s.hhtcex/logs
log.retention.check.interval.ms=300000
log.retention.hours=168
log.segment.bytes=1073741824
num.io.threads=8
num.network.threads=3
num.partitions=1
num.recovery.threads.per.data.dir=1
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
zookeeper.connection.timeout.ms=6000
zookeeper.connect=



2、topic创建

-----创建日志

topic的创建分为两种,一种通过命令行进行创建;一种是自动创建,即producer往kafka写消息时,如果没有topic就会自动创建。

二者的区别:

1、命令行创建,可以指定partition的数目以及备份数

2、自动创建,无法指定partition和备份数,通过配置文件读取即 num.partitions 以及 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test


创建原理:

无论是自动创建还是命令行创建,最后都是通过同样的方式进行创建,只是入口不一样而已:

1、命令行直接敲命令,然后通过TopicCommand的方法调用下层进行创建;而自动创建时producer在每次发送消息时,会请求元数据信息,在获取元数据的时候,server发现没有该topic的时候就会调用底层方法进行创建

2、消息创建分为两个步骤:1、主动的过程和监听的过程:

2.1 主动过程:

在该过程中会调用admin包下的AdminUtils,这个过程主要包含三部分

a、获取节点信息

b、根据partition数量和备份数量分配节点

c、将分配的结果写入zk

注:在该过程中,只是将结果写入zk,这个时候,各个节点还没有正式创建topic


2.2 监听的过程

首先需要了解几个类

KafkaController - 中央控制器

PartitionStateMachine - topic的状态机,负责zk的监听,消息的创建、删除,leader的选举

ReplicaStateMachine - 备份的状态机,负责备份的处理

ReplicaManager - 负责创建leader和follower,调用Partition 

Partition - 实际创建leader和follower,调用LogManager 

LogManager - 负责创建log日志


1、首先topic的生命过程分为:NonExistentPartition → NewPartition → OnlinePartition → OfflinePartition, 这几个过程在PartitionStateMachine中定义。

2、PartitionStateMachine监听zk上/brokers/topics 节点的变化,发现如果有新的节点创建时, 会调用中央控制器的onNewTopicCreation方法,该方法会调用PartitionStateMachine 以及

ReplicaStateMachine 的方法处理消息

3、PartitionStateMachine 负责选举leader,然后将消息发送给api进行leader的创建

4、ReplicaStateMachine 负责将消息发送给api进行备份的创建

5、在KafkaApi的handleLeaderAndIsrRequest方法中调用ReplicaManager 的becomeLeaderOrFollower方法创建leader或follower

6、在ReplicaManager 调用Partition的getOrCreateReplica创建leader或者follower

7、Partition的getOrCreateReplica方法调用LogManager 的createLog方法创建Log对象

在这个流程重,才完成消息的创建


3、kafka消息分类和处理

reactor模式

以上消息依次可分为:

消息的生产

消息的获取

获取该节点各个消息的偏移量

获取消息的元数据

leader和ISR更新的消息

停止备份

更新消息元数据

停止中央控制器

提动消息的偏移量

获取consumer的消息偏移量


4、消息写入kafka

1、producer维持一个producerpool, 该池里面维持与各个broker的连接

2、producer每次写消息时,首先获取消息的元数据,即topic的partition的leader在哪个节点上,然后将消息发送给对应的节点

3、在api中对应上面的第一中消息类型,在处理消息时,主要调用log包的log类进行消息写入

4、在消息写入时,消息是存放在段中的,每次写的时候段会append该消息,然后判断段是否应该flush


5、消息从kafka读出

1、consumer从zk中获取到消息存放在哪些节点上,然后给这些节点发送抓取消息的请求,如上图的第二种消息。

2、broker接收到消息请求后,首选会拉取数据,然后由两种处理形式:

        1、数据是否需要延期返回,在request里面可以设置一个maxWait 的延期时间,如果没设置那么将当前抓取的数据直接返回;

        2、如果设置了延期时间,会将本次请求放入延期处理的队列中(底层用户java的delayed接口),然后阻塞获取到期的任务,然后再次抓取数据,并将数据返回


6、日志记录和获取

在目前的版本,kafka存放的日志包含两部分内容,index日志和log日志:log日志记录produce发送给kafka的数据内容,index日志记录log日志的文件位置

这里两个类

Log : 这里需要区分log日志,这里的log是封装了kafka的日志处理,包含了对日志的抓取和写入,里面核心是segments,一个ConcurrentSkipListMap<Long, LogSegment>

LogSegment: 日志的载体,里面包含了真正的log文件(FileMessageSet)和index(OffsetIndex)文件的读写


每次写消息时,LogSegment会将数据append,然后校验每个段的数据和上次刷新的时间,然后做一次flush

每次读数据时,从Log的segments中获取所有文件的索引,然后根据偏移量进行数据读取

文件真实的写入磁盘等操作底层走的是FileChannel以及MappedByteBuffer


可以直接从kafka里面看.log里面的数据内容,对于内容是无影响的但是,对于每条数据,除了内容日志还有kafka增加的数据。

可以通过:

bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files .log文件  --print->该命令可以解析.log内容,解析后的内容如下:

Starting offset: 11239943
offset: 11239943 position: 0 isvalid: true payloadsize: 1094 magic: 0 compresscodec: NoCompressionCodec crc: 4034970652 keysize: 0 key: payload: ["{\"cruRight\":1987.54,\"curCapital\":1907.54,\"customerId\":\"777180110684860\",\"deliveryMargin\":0.0,\"fee\":-1.46,\"floatLoss\":-80.0,\"infos\":{\"AU10kg\":{\"buyHoldFunds\":2080.0,\"buyQuantity\":1.0,\"commodityId\":\"AU10kg\",\"floatLoss\":-80.0,\"netQuantity\":1.0,\"sellHoldFunds\":0.0,\"sellQuantity\":0.0}},\"lastCapital\":1000.0,\"margin\":166.4,\"memberId\":\"001777777\",\"riskRate\":11.4636,\"todayClosePl\":0.0,\"todayFundio\":989.0}","{\"cruRight\":487370.39,\"curCapital\":477202.39,\"customerId\":\"777171226766359\",\"deliveryMargin\":0.0,\"fee\":-39.42,\"floatLoss\":-10168.0,\"infos\":{\"AU10kg\":{\"buyHoldFunds\":458040.0,\"buyQuantity\":224.0,\"commodityId\":\"AU10kg\",\"floatLoss\":-10040.0,\"netQuantity\":224.0,\"sellHoldFunds\":0.0,\"sellQuantity\":0.0},\"AU1kg\":{\"buyHoldFunds\":10200.0,\"buyQuantity\":5.0,\"commodityId\":\"AU1kg\",\"floatLoss\":-128.0,\"netQuantity\":3.0,\"sellHoldFunds\":4080.0,\"sellQuantity\":2.0}},\"lastCapital\":486537.81,\"margin\":38542.05,\"memberId\":\"001777777\",\"riskRate\":12.3813,\"todayClosePl\":0.0,\"todayFundio\":872.0}"]



网易云新用户大礼包:https://www.163yun.com/gift

本文来自网易实践者社区,经作者何涵授权发布。