1、server.properties
属性 | 默认值 | 描述 |
---|---|---|
broker.id | 0 | 每个broker都需要有一个标识符,使用broker.id来表示。它的默认值是0,也可以被设置成其他任意整数。这个值在整个Kafka集群里必须是唯一的。这个值可以任意选定,如果出于维护的需要,可以在服务器节点间交换使用这些D。建议把它们设置成与机器名具有相关性的整数,这样在进行维护时,将①号映射到机器名就没那么麻烦了。例如,如果机器名包含唯一性的数字(比如host1.example…com、host2.example.com),那么用这些数字来设置broker.id就再好不过了。 |
log.dirs | /tmp/kafka-logs | Kafka把所有消息都保存在磁盘上,存放这些日志片段的目录是通过log.dis指定的。它是一组用逗号分隔的本地文件系统路径。如果指定了多个路径,那么broker会根据“最少使用”原则,把同一个分区的日志片段保存到同一个路径下。要注意,broker会往拥有最少数目分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。 |
port | 9092 | 如果使用配置样本来启动Kafka,它会监听9092端口。修改port配置参数可以把它设置成其他任意可用的端口。要注意,如果使用1024以下的端口,需要使用oot权限启动Kafka,不过不建议这么做。 |
zookeeper.connect | null | 用于保存broker元数据的Zookeeper地址是通过zookeeper.connect来指定的。localhost:2181表示这个Zookeeper是运行在本地的2181端口上。该配置参数是用冒号分隔的一组hostname:port/path列表,每一部分的含义如下:hostname是Zookeeper服务器的机器名或IP地址;port是Zookeeper的客户端连接端口;/path是可选的Zookeeper路径,作为Kafka集群的chroot环境。如果不指定,默认使用根路径。如果指定的chroot路径不存在,broker会在启动的时候创建它。 |
message.max.bytes | 1000000 | broker通过设置message.max.bytes参数来限制单个消息的大小,默认值是1000000,也就是1MB。如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到broker返回的错误信息。跟其他与字节相关的配置参数一样,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于message.max.bytes指定的值,消息的实际大小可以远大于这个值。这个值对性能有显著的影响。值越大,那么负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求。它还会增加磁盘写入块的大小,从而影响IO吞吐量。注意此参数要和consumer的maximum.message.size大小一致,否则会因为生产者生产的消息太大导致消费者无法消费 |
num.io.threads | 8 | 服务器用来执行读写请求的IO线程数,此参数的数量至少要等于服务器上磁盘的数量 |
queued.max.requests | 500 | IO线程可以处理的队列大小,若实际请求数超过此大小,若实际请求数超过此大小,网络线程将停止接收新的请求 |
socket.send.buffer.bytes | 100*1024 | socket的发送缓冲区大小 |
socket.receive.buffer.bytes | 100*1024 | socket的接收缓冲区大小 |
socket.request.max.bytes | 10010241024 | 服务器允许请求的最大值,用来防止内存溢出,其值应该小于JavaHeapSize |
num.partitions | 1 | 默认partition数量,如果topic在创建时没有指定partition数量,默认使用此值。要注意,我们可以增加主题分区的个数,但不能减少分区的个数。所以,如果要让一个主题的分区个数少于num.partitions指定的值,需要手动创建该主题。 |
log.segment.bytes | 102410241024 | Segment文件的大小,超过此值将会自动新建一个segment,此值可以被topic级别的参数覆盖。以上的设置作用在日志片段上,而不是作用在单个消息上。当消息到达broker时,它们被追加到分区的当前日志片段上。当日志片段大小达到Log.segment.bytes指定的上限(默认是1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就会越频繁地关闭和分配新文件,从而降低磁盘写入的整体效率。如果主题的消息量不大,那么如何调整这个参数的大小就变得尤为重要。例如,如果一个主题每天只接收100MB的消息,而log.segment.bytes使用默认设置,那么需要10天时间才能填满一个日志片段。因为在日志片段被关闭之前消息是不会过期的,所以如果1ogretention.ms被设为604800000(也就是1周),那么日志片段最多需要17天才会过期。这是因为关闭日志片段需要10天的时间,而根据配置的过期时间,还需要再保留7天时间(要等到日志片段里的最后一个消息过期才能被删除)。 |
log.segment.ms | 另一个可以控制日志片段关闭时间的参数是Log.segment.ms,它指定了多长时间之后日志片段会被关闭。就像log.retention.bytes和log.retention.ms这两个参数一样,log.segment.bytes和Log.retention.ms这两个参数之间也不存在互斥问题。日志片段会在大小或时间达到上限时被关闭,就看哪个条件先得到满足。默认情况下,log.segment.ms没有设定值,所以只根据大小来关闭日志片段。 | |
log.retention.{ms,minutes,hours} | 7days | kafkasegmentlog的保存周期,保存周期超过此时间日志就会被删除。此参数可以被topic级别参数覆盖。数据量大时,建议减小此值。 |
log.retention.bytes | -1 | 每个partition的最大容量,若数据量超过此值,partition数据将会被删除。注意这个参数控制的是每个partition而不是topic。此参数可以被log级别参数覆盖。如果同时指定了log.retention.bytes和log.retention.ms(或者另一个时间参数),只要任意一个条件得到满足,消息就会被删除。例如,假设1og·retention.ms设置为86400000(也就是1天),log.retention.bytes设置为1000000000(也就是1GB),如果消息字节总数在不到一天的时间就超过了1GB,那么多出来的部分就会被删除。相反,如果消息字节总数小于1GB,那么一天之后这些消息也会被删除,尽管分区的数据总量小于1GB。 |
log.retention.check.interval.ms | 5minutes | 删除策略的检查周期 |
auto.create.topics.enable | true | 自动创建topic参数,建议此值设置为false,严格控制topic管理,防止生产者错写topic。 |
default.replication.factor | 1 | 默认副本数量 |
replica.lag.time.max.ms | 10000 | 此窗口时间内没有收到follower的fetch请求,leader会将其从ISR(in-syncreplicas)中移除 |
replica.lag.max.messages | 4000 | 如果replica节点落后leader节点此值大小的消息数量,leader节点就会将其从ISR中移除 |
replica.socket.timeout.ms | 30*1000 | replica向leader发送请求的超时时间 |
replica.socket.receive.buffer.bytes | 64*1024 | replica接收leader数据的缓冲区大小 |
replica.fetch.max.bytes | 1024*1024 | Brokers会为每个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则需要差不多1G的内存 |
replica.fetch.wait.max.ms | 500 | replicas同leader之间通信的最大等待时间,失败了会重试 |
num.replica.fetchers | 1 | leader进行复制的线程数,增大这个数值会增加follower的IO |
fetch.purgatory.purge.interval.requests | 1000 | 获取请求清除的清除间隔(以请求数为单位) |
zookeeper.session.timeout.ms | 6000 | ZooKeepersession超时时间。如果在此时间内server没有向zookeeper发送心跳,zookeeper就会认为此节点已挂掉。此值太低导致节点容易被标记死亡;若太高,.会导致太迟发现节点死亡。 |
zookeeper.connection.timeout.ms | 6000 | 客户端连接zookeeper的超时时间 |
zookeeper.sync.time.ms | 2000 | zookeeper的follower同leader的同步时间 |
controlled.shutdown.enable | true | 允许brokershutdown。如果启用,broker在关闭自己之前会把它上面的所有leaders:转移到其它brokers上,建议启用,增加集群稳定性。 |
auto.leader.rebalance.enable | true | 是否自动平衡broker之间的分配策略 |
leader.imbalance.per.broker.percentage | 10 | leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡 |
leader.imbalance.check.interval.seconds | 300 | 检查leader是否不平衡的时间间隔 |
offset.metadata.max.bytes | 4096 | 客户端保留offset信息的最大空间大小 |
connections.max.idle.ms | 600000 | 这个参数用来指定在多久之后关闭闲置的连接 |
num.recovery.threads.per.data.dir | 1 | 默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到并行操作的目的。特别是对于包含大量分区的服务器来说,一旦发生崩溃,在进行恢复时使用并行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是log.dirs指定的单个日志目录。也就是说,如果num.recovery.threads.per.data.dir被设为8,并且log.dir指定了3个路径,那么总共需要24个线程。 |
unclean.leader.election.enable | true | 如果unclean.leader.election.enable参数的值为false,那么就意味着非ISR中的副本不能够参与选举.如果unclean.leader.election.enable参数的值为true,那么可以从非ISR集合中选举follower副本成为新的leader。 |
delete.topic.enable | false | 启用deletetopic参数,建议设置为true。 |
offsets.topic.num.partitions | 50 | 自动创建的位移主题分区数 |
offsets.topic.retention.minutes | 1440 | 超过此期限的偏移将被标记为删除。当日志清理程序压缩偏移主题时,将进行实际清除 |
offsets.retention.check.interval.ms | 600000 | 偏移管理器检查过时偏移的频率。 |
offsets.topic.replication.factor | 3 | 偏移量提交主题的复制因子。表示kafka的内部topicconsumer_offsets副本数。当该副本所在的broker宕机,consumer_offsets只有一份副本,该分区宕机。使用该分区存储消费分组offset位置的消费者均会收到影响,offset无法提交,从而导致生产者可以发送消息但消费者不可用。所以需要设置该字段的值大于1。 |
offsets.topic.segment.bytes | 104857600 | 偏移量主题的段大小。 |
offsets.load.buffer.size | 5242880 | 每次从offset段文件往缓存加载offsets数据时的读取的数据大小。默认5M |
offsets.commit.required.acks | -1 | 可以接受consumer提交之前所需的ack。通常,不应覆盖默认值(-1) |
offsets.commit.timeout.ms | 5000 | consumer提交的延迟时间,offsets提交将被延迟,直到偏移量topic的所有副本都收到提交或达到此超时。这与producer请求超时类似。 |
2、producer.properties
属性 | 默认值 | 描述 |
---|---|---|
metadata.broker.list | 启动时producer2查询orokers的列表,可以是集群中所有brokersl的一个子集。注意,这个参数只是用来获取topic的元信息用,producer会从元信息中挑选合适的broker并与之建立socket;连接。格式是:host1:port1,host2:port2。 | |
request.required.acks | 0 | #设置发送数据是否需要服务端的反馈,有三个值0,1,-10:producer不会等待broker发送ack1:当leader接收到消息后发送ack-1:当所有的follower都同步消息成功后发送ack |
request.timeout.ms | 10000 | Broker等待ack的超时时间,若等待时间超过此值,会返回客户端错误信息。 |
producer.type | sync | 同步异步模式。asynca表示异步,sync表示同步。如果设置成异步模式,可以允许生产者以patch的形式push数据,这样会极大的提高broker性能,推荐设置为异步。 |
serializer.class | kafka.serializer.DefaultEncoder | 指定序列化处理类 |
key.serializer.class | key的序列化类 | |
partitioner.class | kafka.producer.DefaultPartitioner | 指定分区处理类 |
compression.codec | none | 指定producer消息的压缩格式,可选参数为:none、gzip、snappy |
compressed.topics | null | 启用压缩的topic:名称。若上面参数选择了一个压缩格式,那么压缩仅对本参数指定的topic有效,若本参数为空,则对所有topic有效。 |
message.send.max.retries | 3 | Producer发送失败时重试次数。若网络出现问题,可能会导致不断重试。 |
retry.backoff.ms | 100 | 每次重试的间隔时间 |
topic.metadata.refresh.interval.ms | 600*1000 | 每个使用者和生产者实例将以topic.metadata.refresh.interval.ms参数定义的间隔获取主题元数据。 |
queue.buffering.max.ms | 5000 | 在async模式下,当message缓存超时后,将会批量发送给broker,默认5000ms |
queue.buffering.max.messages | 10000 | 在async模式下,Producer端允许buffer的最大消息量,如果超过这个数值,producer就会阻塞或者丢掉消息 |
queue.enqueue.timeout.ms | -1 | 当消息在producer端沉积的条数达到“queue.buffering.max.messages"后阻塞一定时间后,队列仍然没有enqueue(producer仍然没有发送出任何消息),此时producer可以继续阻塞,或者将消息抛弃-1:无阻塞超时限制,消息不会被抛弃0:立即清空队列,消息被抛弃 |
batch.num.messages | 200 | 采用异步模式时,一个batch:缓存的消息数量。达到这个数量值时producer才会发送消息。 |
send.buffer.bytes | 100*1024 | 发送缓冲大小 |
client.id | 设置单个Kafka生产者或消费者客户的名称 |
3、consumer.propertis
属性 | 默认值 | 描述 |
---|---|---|
group.id | Consumer的组ID,相同goup.id的consumer)属于同一个组。 | |
zookeeper.connect | 对于zookeeper集群的指定,可以是多个hostname1:port1,hostname2:port2,hostname3 | |
consumer.id | 消费者的ID,若是没有设置的话,会自增 | |
socket.timeout.ms | 30*1000 | socket的超时时间,实际的超时时间是:max.fetch.wait+socket.timeout.ms. |
zookeeper.session.timeout.ms | 6000 | zookeeper的心跳超时时间,超过这个时间就认为是dead消费者 |
zookeeper.connection.timeout.ms | 6000 | zookeeper的等待连接时间 |
zookeeper.sync.time.ms | 2000 | zookeeper的follower同leader的同步时间 |
auto.offset.reset | largest | 当zookeeper中没有初始的offset时候的处理方式。smallest:重置为最小值largest:重置为最大值anythingelse:抛出异常 |
socket.receive.buffer.bytes | 64*1024 | socket的接受缓存空间大小 |
fetch.message.max.bytes | 1024*1024 | 从每个分区获取的消息大小限制 |
auto.commit.enable | true | 是否在消费消息后将offset同步到zookeeper,当Consumer失败后就能从zookeeper获取最新的offset |
auto.commit.interval.ms | 60*1000 | 自动提交的时间间隔 |
queued.max.message.chunks | 10 | 用来处理消费消息的块,每个块可以等同于fetch.message.max.bytes中数值 |
rebalance.max.retries | 4 | 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册"PartitionOwnerregistry"节点信息,但是有可能此时旧consumer尚没有释放此节点,此值用于控制,注册节点的重试次数. |
rebalance.backoff.ms | 2000 | 每次再平衡的时间间隔 |
refresh.leader.backoff.ms | 每次重新选举leader的时间 | |
fetch.min.bytes | 1 | server发送到消费端的最小数据,若是不满足这个数值则会等待,知道满足数值要求 |
fetch.wait.max.ms | 100 | 若是不满足最小大小(fetch.min.bytes)的话,等待消费端请求的最长等待时间 |
consumer.timeout.ms | -1 | 指定时间内没有消息到达就抛出异常,一般不需要改 |
4、server.properties模板
# broker id,多个broker服务器的话,每个broker id必须不同
broker.id=1
# kafka broker所在节点的
hostnamehostname=10.1.1.1.3:9092
# 处理网络请求的线程数
num.network.threads= 8
# 执行磁盘IO的线程数
num.io.threads=8
# server使用的send buffer大小。
socket.send.buffer.bytes=1048576
# server使用的recive buffer大小。
socket.receive.buffer.bytes=1048576
# 接受的最大请求大小(防止OOM)
socket.request.max.bytes=104857600
#-------------added by Kaim ---------------
# 加入队列的最大请求数(超过该值,network thread阻塞)
queued.max.requests=16
# purgatory(炼狱)是个容器,用来存放不能马上答复的网络请求。如果能答复请求则从炼狱删除。这个是fetch炼狱保存的最大请求数。设置的比默认值小是据说因为这里有个BUG,不知道0.10.x中解决没
# BUG说明见:http://blog.csdn.net/stark_summer/article/details/50203133fetch.purgatory.purge.interval.requests=100
# 生产者炼狱保存的最大请求数
producer.purgatory.purge.interval.requests=100
############################# 日志配置#############################
# 可以设置多个日志存放的路径
log.dirs=~/kafka-logs
# 默认每个主题的分区数,生产环境建议值:
8num.partitions= 8
# 启停时做日志恢复每个目录所需的线程数,采用RAID的时候可以增大该值
num.recovery.threads.per.data.dir=1
# 写入磁盘的消息批大小
log.flush.interval.messages=10000
# 强制刷新消息到磁盘的时间阈值
log.flush.interval.ms=10000
# 日志保留的最少时间 由于做压测,防止占用磁盘太多,保留时间为1ms#
log.retention.hours=168log.retention.minutes=5
# 每个日志段大小,超过该值会生成新日志段
log.segment.bytes=1073741824
# 检查日志分段文件的间隔时间,以确定是否文件属性是否到达删除要求。
log.retention.check.interval.ms=300000
# --------------added by kami--------------
# 自动创建主题
auto.create.topics.enable=true
# 当执行一次fetch后,需要一定的空间扫描最近的offset,设置的越大越好,一般使用默认值就可以
log.index.interval.bytes=4096
# 每个log segment的最大尺寸。注意,如果log尺寸达到这个数值,即使尺寸没有超过log.segment.bytes限制,也需要产生新的log segment。log.index.size.max.bytes=10485760
# 检查是否需要fsync的时间间隔
log.flush.scheduler.interval.ms=2000
# 即使文件没有到达log.segment.bytes,只要文件创建时间到达此属性,就会创建新文件。
log.roll.hours=168
# server可以接收的消息最大尺寸。重要的是,consumer和producer有关这个属性的设置必须同步,否则producer发布的消息对consumer来说太大。默认值均为一百万
message.max.bytes=1000000
############################# Zookeeper #############################
# zookeeper server地址,如果有多个则用逗号分隔
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181
# Timeout in ms for connecting to zookeeper
# zk连接的超时时间
zookeeper.connection.timeout.ms=6000
# zk follower的同步延迟时间
zookeeper.sync.time.ms = 2000
############################ replication configuration added by KamiWan##############
# 从leader备份数据的线程数
num.replica.fetchers=4
# 备份时每次fetch的最大值
replica.fetch.max.bytes=1048576
# follwer执行fetcher请求时的最大等待时间
replica.fetch.wait.max.ms=500
# 默认的replication数量,可以根据所需要的可靠性要求来配置
default.replication.factor=2