瞻前
简单使用
http://ks.netease.com/blog?id=9859
producer
http://ks.netease.com/blog?id=9859
consumer
http://ks.netease.com/blog?id=9860
顾后
1、结构
admin - 主要有个adminUtil比较重要,创建topic的
api - 处理各个消息
client - 仅一个clientUtil,客户端的方法封装
cluster - 包含了cluser、topic、patition、replica的类的封装
common - 公共包
consumer - 消费者相关
controller - kafka的中央控制器,负责topic的创建、消息备份、leader选举等
javaapi - java相关
log - 日志读写相关
message - 接收的消息相关
metrics - 一些统计值
network - 网络相关,接收消息、消息buffer等
producer - 生成者
sserializer - 消息序列化
server - 里面的东西比较杂,主要还是启动相关
tools - 工具类
1、配置
broker.id=3
listeners=PLAINTEXT://:9092
log.dirs=/home/kafkaInstance/quote.s.hhtcex/logs
log.retention.check.interval.ms=300000
log.retention.hours=168
log.segment.bytes=1073741824
num.io.threads=8
num.network.threads=3
num.partitions=1
num.recovery.threads.per.data.dir=1
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
zookeeper.connection.timeout.ms=6000
zookeeper.connect=
2、topic创建
-----创建日志
topic的创建分为两种,一种通过命令行进行创建;一种是自动创建,即producer往kafka写消息时,如果没有topic就会自动创建。
二者的区别:
1、命令行创建,可以指定partition的数目以及备份数
2、自动创建,无法指定partition和备份数,通过配置文件读取即 num.partitions 以及 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
创建原理:
无论是自动创建还是命令行创建,最后都是通过同样的方式进行创建,只是入口不一样而已:
1、命令行直接敲命令,然后通过TopicCommand的方法调用下层进行创建;而自动创建时producer在每次发送消息时,会请求元数据信息,在获取元数据的时候,server发现没有该topic的时候就会调用底层方法进行创建
2、消息创建分为两个步骤:1、主动的过程和监听的过程:
2.1 主动过程:
在该过程中会调用admin包下的AdminUtils,这个过程主要包含三部分
a、获取节点信息
b、根据partition数量和备份数量分配节点
c、将分配的结果写入zk
注:在该过程中,只是将结果写入zk,这个时候,各个节点还没有正式创建topic
2.2 监听的过程
首先需要了解几个类
KafkaController - 中央控制器
PartitionStateMachine - topic的状态机,负责zk的监听,消息的创建、删除,leader的选举
ReplicaStateMachine - 备份的状态机,负责备份的处理
ReplicaManager - 负责创建leader和follower,调用Partition
Partition - 实际创建leader和follower,调用LogManager
LogManager - 负责创建log日志
1、首先topic的生命过程分为:NonExistentPartition → NewPartition → OnlinePartition → OfflinePartition, 这几个过程在PartitionStateMachine中定义。
2、PartitionStateMachine监听zk上/brokers/topics 节点的变化,发现如果有新的节点创建时, 会调用中央控制器的onNewTopicCreation方法,该方法会调用PartitionStateMachine 以及
ReplicaStateMachine 的方法处理消息
3、PartitionStateMachine 负责选举leader,然后将消息发送给api进行leader的创建
4、ReplicaStateMachine 负责将消息发送给api进行备份的创建
5、在KafkaApi的handleLeaderAndIsrRequest方法中调用ReplicaManager 的becomeLeaderOrFollower方法创建leader或follower
6、在ReplicaManager 调用Partition的getOrCreateReplica创建leader或者follower
7、Partition的getOrCreateReplica方法调用LogManager 的createLog方法创建Log对象
在这个流程重,才完成消息的创建
3、kafka消息分类和处理
reactor模式
以上消息依次可分为:
消息的生产
消息的获取
获取该节点各个消息的偏移量
获取消息的元数据
leader和ISR更新的消息
停止备份
更新消息元数据
停止中央控制器
提动消息的偏移量
获取consumer的消息偏移量
4、消息写入kafka
1、producer维持一个producerpool, 该池里面维持与各个broker的连接
2、producer每次写消息时,首先获取消息的元数据,即topic的partition的leader在哪个节点上,然后将消息发送给对应的节点
3、在api中对应上面的第一中消息类型,在处理消息时,主要调用log包的log类进行消息写入
4、在消息写入时,消息是存放在段中的,每次写的时候段会append该消息,然后判断段是否应该flush
5、消息从kafka读出
1、consumer从zk中获取到消息存放在哪些节点上,然后给这些节点发送抓取消息的请求,如上图的第二种消息。
2、broker接收到消息请求后,首选会拉取数据,然后由两种处理形式:
1、数据是否需要延期返回,在request里面可以设置一个maxWait 的延期时间,如果没设置那么将当前抓取的数据直接返回;
2、如果设置了延期时间,会将本次请求放入延期处理的队列中(底层用户java的delayed接口),然后阻塞获取到期的任务,然后再次抓取数据,并将数据返回
6、日志记录和获取
在目前的版本,kafka存放的日志包含两部分内容,index日志和log日志:log日志记录produce发送给kafka的数据内容,index日志记录log日志的文件位置
这里两个类
Log : 这里需要区分log日志,这里的log是封装了kafka的日志处理,包含了对日志的抓取和写入,里面核心是segments,一个ConcurrentSkipListMap<Long, LogSegment>
LogSegment: 日志的载体,里面包含了真正的log文件(FileMessageSet)和index(OffsetIndex)文件的读写
每次写消息时,LogSegment会将数据append,然后校验每个段的数据和上次刷新的时间,然后做一次flush
每次读数据时,从Log的segments中获取所有文件的索引,然后根据偏移量进行数据读取
文件真实的写入磁盘等操作底层走的是FileChannel以及MappedByteBuffer
可以直接从kafka里面看.log里面的数据内容,对于内容是无影响的但是,对于每条数据,除了内容日志还有kafka增加的数据。
可以通过:
bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files .log文件 --print->该命令可以解析.log内容,解析后的内容如下:
Starting offset: 11239943
offset: 11239943 position: 0 isvalid: true payloadsize: 1094 magic: 0 compresscodec: NoCompressionCodec crc: 4034970652 keysize: 0 key: payload: ["{\"cruRight\":1987.54,\"curCapital\":1907.54,\"customerId\":\"777180110684860\",\"deliveryMargin\":0.0,\"fee\":-1.46,\"floatLoss\":-80.0,\"infos\":{\"AU10kg\":{\"buyHoldFunds\":2080.0,\"buyQuantity\":1.0,\"commodityId\":\"AU10kg\",\"floatLoss\":-80.0,\"netQuantity\":1.0,\"sellHoldFunds\":0.0,\"sellQuantity\":0.0}},\"lastCapital\":1000.0,\"margin\":166.4,\"memberId\":\"001777777\",\"riskRate\":11.4636,\"todayClosePl\":0.0,\"todayFundio\":989.0}","{\"cruRight\":487370.39,\"curCapital\":477202.39,\"customerId\":\"777171226766359\",\"deliveryMargin\":0.0,\"fee\":-39.42,\"floatLoss\":-10168.0,\"infos\":{\"AU10kg\":{\"buyHoldFunds\":458040.0,\"buyQuantity\":224.0,\"commodityId\":\"AU10kg\",\"floatLoss\":-10040.0,\"netQuantity\":224.0,\"sellHoldFunds\":0.0,\"sellQuantity\":0.0},\"AU1kg\":{\"buyHoldFunds\":10200.0,\"buyQuantity\":5.0,\"commodityId\":\"AU1kg\",\"floatLoss\":-128.0,\"netQuantity\":3.0,\"sellHoldFunds\":4080.0,\"sellQuantity\":2.0}},\"lastCapital\":486537.81,\"margin\":38542.05,\"memberId\":\"001777777\",\"riskRate\":12.3813,\"todayClosePl\":0.0,\"todayFundio\":872.0}"]
网易云新用户大礼包:https://www.163yun.com/gift
本文来自网易实践者社区,经作者何涵授权发布。