Kafka入门

本文环境基于MacOS 代码参考kafka

目录

安装

1
2
3
4
5
6
7
brew install kafka

brew services start zookeeper # or "zkServer start"

brew services start kafka # or "kafka-server-start /usr/local/etc/kafka/server.properties"

brew services list

关于brew services 更多参考”brew services -h”

基础

topic

1
2
3
/usr/local/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

/usr/local/bin/kafka-topics --list --zookeeper localhost:2181
1
/usr/local/bin/kafka-topics --delete --zookeeper localhost:2181 --topic test

producer

1
/usr/local/bin/kafka-console-producer --broker-list localhost:9092 --topic test

consumer

1
/usr/local/bin/kafka-console-consumer --zookeeper localhost:2181 --from-beginning --topic test

集群

broker

1
2
3
4
5
6
7
8
9
10
11
cp /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/server-1.properties

# on MacOS: sed -i "" 's/broker.id=0/broker.id=1/g' /usr/local/etc/kafka/server-1.properties
sed -i 's/broker.id=0/broker.id=1/g' /usr/local/etc/kafka/server-1.properties

# on MacOS: sed -i "" 's/kafka-logs/kafka-logs-1/g' /usr/local/etc/kafka/server-1.properties
sed -i 's/kafka-logs/kafka-logs-1/g' /usr/local/etc/kafka/server-1.properties

echo "port=9093" >> /usr/local/etc/kafka/server-1.properties

/usr/local/bin/kafka-server-start /usr/local/etc/kafka/server-1.properties &
1
2
3
4
5
6
7
8
9
10
11
cp /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/server-2.properties

# on MacOS: sed -i "" 's/broker.id=0/broker.id=2/g' /usr/local/etc/kafka/server-2.properties
sed -i 's/broker.id=0/broker.id=2/g' /usr/local/etc/kafka/server-2.properties

# on MacOS: sed -i "" 's/kafka-logs/kafka-logs-2/g' /usr/local/etc/kafka/server-2.properties
sed -i 's/kafka-logs/kafka-logs-2/g' /usr/local/etc/kafka/server-2.properties

echo "port=9094" >> /usr/local/etc/kafka/server-2.properties

/usr/local/bin/kafka-server-start /usr/local/etc/kafka/server-2.properties &

topic

1
2
3
/usr/local/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test-replicated

/usr/local/bin/kafka-topics --list --zookeeper localhost:2181
1
/usr/local/bin/kafka-topics --describe --zookeeper localhost:2181 --topic test-replicated
1
2
Topic:test-replicated   PartitionCount:1        ReplicationFactor:3     Configs:
Topic: test-replicated Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2

producer

1
/usr/local/bin/kafka-console-producer --broker-list localhost:9092 --topic test-replicated

consumer

1
/usr/local/bin/kafka-console-consumer --zookeeper localhost:2181 --from-beginning --topic test-replicated

测试

1
/usr/local/bin/kafka-topics --describe --zookeeper localhost:2181 --topic test-replicated
1
2
Topic:test-replicated   PartitionCount:1        ReplicationFactor:3     Configs:
Topic: test-replicated Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
1
2
# close leader broker
ps | grep server-1.properties | grep -v grep | awk '{print $1}' | xargs kill -9
1
/usr/local/bin/kafka-topics --describe --zookeeper localhost:2181 --topic test-replicated
1
2
Topic:test-replicated   PartitionCount:1        ReplicationFactor:3     Configs:
Topic: test-replicated Partition: 0 Leader: 0 Replicas: 1,0,2 Isr: 0,2
1
2
3
/usr/local/bin/kafka-console-producer --broker-list localhost:9092 --topic test-replicated

/usr/local/bin/kafka-console-consumer --zookeeper localhost:2181 --from-beginning --topic test-replicated

概念

KafkaArchitecture.png

上图摘自Kafka设计解析(一)- Kafka背景及架构介绍

监控

下载kafka-manager源码

1
brew install sbt
1
2
3
4
5
# cd kafka-manager-1.3.3.17
sbt clean dist

# cd target/universal
unzip kafka-manager-1.3.3.17.zip

编译好的kafka-manager可以从这里下载

1
2
3
4
# cd kafka-manager-1.3.3.17
echo 'kafka-manager.zkhosts="localhost:2181"' >> ./conf/application.conf

nohup ./bin/kafka-manager -Dconfig.file=./conf/application.conf >/dev/null 2>&1 &

关于nohup 更多参考nohup 命令

kafka-introduction-01.png

kafka-introduction-02.png

kafka-introduction-03.png

PHP

1
phpbrew ext install rdkafka

关于phpbrew 详细参考PHP开发 之 开发环境

1
vim comsumer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<?php

try {
$rk = new \RdKafka\Consumer();
$rk->addBrokers("localhost");

$topic = $rk->newTopic("test-replicated");
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

while (true) {
$msg = $topic->consume(0, 10000);
if ($msg && $msg->payload) {
echo $msg->payload, "\n";
}
}
} catch (Exception $e) {
echo $e->getMessage();
}
  • 测试
1
/usr/local/bin/kafka-console-producer --broker-list localhost:9092 --topic test-replicated

参考