docker搭建kafka集群(基于centos7)
1 docker安装
yum remove docker \
docker-client \
docker-client-latest \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-selinux \
docker-engine-selinux \
docker-engine
复制代码
1.1 安装docker 源
yum install -y yum-utils device-mapper-persistent-data lvm2
yum-config-manager \
--add-repo \
http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
yum install docker-ce-18.03.0.ce
复制代码
1.2 安装docker所需工具
yum install -y yum-utils device-mapper-persistent-data lvm2
复制代码
1.3 安装指定版本docker
yum install -y docker-ce-18.09.9-3.el7
复制代码
1.4 启动docker
systemctl enable docker && systemctl start docker
复制代码
1.5 因国内下载docker镜像存在问题,需要配置下docker镜像源
vi /etc/docker/daemon.json
## daemon.json 内容如下(daemon.json 初始不存在)
{
"registry-mirrors": ["https://alzgoonw.mirror.aliyuncs.com","http://hub-mirror.c.163.com"]
}
复制代码
1.6 配置完docker 镜像后记得重启docker
systemctl restart docker
复制代码
2 docker-compose 安装
2.1 下载docker-compose
curl -L "https://github.com/docker/compose/releases/download/1.23.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
复制代码
2.2 添加docker-compose 执行命令
chmod +x /usr/local/bin/docker-compose
复制代码
3 使用docker-compose安装docker集群
3.1环境配置
3.1.1 kafka目录
mkdir kafka1
mkdir kafka2
mkdir kafka3
复制代码
3.1.2 zookeeper集群目录
mkdir zookeeper1
mkdir zookeeper2
mkdir zookeeper3
复制代码
3.1.3 zookeeper其他目录及配置
mkdir zooConfig
cd zooConfig
mkdir zoo1
mkdir zoo2
mkdir zoo3
复制代码
3.1.4在zoo1,zoo2,zoo3中分别创建myid文件,并写入分别写入id数字,如zoo1中的myid中写入1
echo 1 ./zoo1/myid
echo 2 ./zoo1/myid
echo 3 ./zoo1/myid
复制代码
3.1.5 创建zoo配置文件zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/data
dataLogDir=/datalog
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1
server.1= 172.23.0.11:2888:3888
server.2= 172.23.0.12:2888:3888
server.3= 172.23.0.13:2888:3888
复制代码
3.2 创建docker容器网络
docker network create --driver bridge --subnet 172.23.0.0/25 --gateway 172.23.0.1 zookeeper_network
3.3 docker-compose.yml
version: '2'
services:
zoo1:
image: zookeeper:3.4.14
restart: always
container_name: zoo1
hostname: zoo1
ports:
- "2181:2181"
volumes:
- "./zooConfig/zoo.cfg:/conf/zoo.cfg"
- "/disk/docker/zookeeper1/data:/data"
- "/disk/docker/zookeeper1/datalog:/datalog"
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
networks:
default:
ipv4_address: 172.23.0.11
zoo2:
image: zookeeper:3.4.14
restart: always
container_name: zoo2
hostname: zoo2
ports:
- "2182:2181"
volumes:
- "./zooConfig/zoo.cfg:/conf/zoo.cfg"
- "/disk/docker/zookeeper2/data:/data"
- "/disk/docker/zookeeper2/datalog:/datalog"
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
networks:
default:
ipv4_address: 172.23.0.12
zoo3:
image: zookeeper:3.4.14
restart: always
container_name: zoo3
hostname: zoo3
ports:
- "2183:2181"
volumes:
- "./zooConfig/zoo.cfg:/conf/zoo.cfg"
- "/disk/docker/zookeeper3/data:/data"
- "/disk/docker/zookeeper3/datalog:/datalog"
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
networks:
default:
ipv4_address: 172.23.0.13
kafka1:
image: wurstmeister/kafka:2.12-2.0.1
restart: always
container_name: kafka1
hostname: kafka1
ports:
- 9092:9092
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.18.255.9:9092
KAFKA_ADVERTISED_HOST_NAME: kafka1
KAFKA_HOST_NAME: kafka1
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_ADVERTISED_PORT: 9092
KAFKA_BROKER_ID: 0
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
volumes:
- /etc/localtime:/etc/localtime
- "/disk/docker/kafka1/logs:/kafka"
links:
- zoo1
- zoo2
- zoo3
networks:
default:
ipv4_address: 172.23.0.14
kafka2:
image: wurstmeister/kafka:2.12-2.0.1
restart: always
container_name: kafka2
hostname: kafka2
ports:
- 9093:9092
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.18.255.9:9093
KAFKA_ADVERTISED_HOST_NAME: kafka2
KAFKA_HOST_NAME: kafka2
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_ADVERTISED_PORT: 9093
KAFKA_BROKER_ID: 1
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
volumes:
- /etc/localtime:/etc/localtime
- "/disk/docker/kafka2/logs:/kafka"
links:
- zoo1
- zoo2
- zoo3
networks:
default:
ipv4_address: 172.23.0.15
kafka3:
image: wurstmeister/kafka:2.12-2.0.1
restart: always
container_name: kafka3
hostname: kafka3
ports:
- 9094:9092
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.18.255.9:9094
KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_HOST_NAME: kafka3
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_ADVERTISED_PORT: 9094
KAFKA_BROKER_ID: 2
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
volumes:
- /etc/localtime:/etc/localtime
- "/disk/docker/kafka3/logs:/kafka"
links:
- zoo1
- zoo2
- zoo3
networks:
default:
ipv4_address: 172.23.0.16
kafka-manager:
image: hlebalbau/kafka-manager:1.3.3.22
restart: always
container_name: kafka-manager
hostname: kafka-manager
ports:
- 9000:9000
links:
- kafka1
- kafka2
- kafka3
- zoo1
- zoo2
- zoo3
environment:
ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_BROKERS: kafka1:9092,kafka2:9093,kafka3:9094
APPLICATION_SECRET: letmein
KAFKA_MANAGER_AUTH_ENABLED: "true"
KAFKA_MANAGER_USERNAME: "admin"
KAFKA_MANAGER_PASSWORD: "admin"
KM_ARGS: -Djava.net.preferIPv4Stack=true
networks:
default:
ipv4_address: 172.23.0.10
networks:
default:
external:
name: zookeeper_network
复制代码
注意:
1.网上不少配置JMX_PORT端口,因启动就会出错,我这里没有配置,如必须要配置,见附录1kafka配置JMX_PORT出错解决方案
;
2.将配置"PLAINTEXT://172.18.255.9:9094" 中的ip换成自己的机器的ip
3.4 一定要记得开启防火墙
## 开启kafka-manager外网访问端口
firewall-cmd --zone=public --add-port=9000/tcp --permanent
##这三个为kafka集群节点互相通信的端口,服务器端需要开启,否则各个节点间无法互通
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --zone=public --add-port=9093/tcp --permanent
firewall-cmd --zone=public --add-port=9094/tcp --permanent
## 重启防火墙
firewall-cmd --reload
## 最好重启下docker
systemctl restart docker
复制代码
3.5 启动kafka集群
3.5.1 启动集群
docker-compose -f docker-compose.yml up -d
3.5.2 停止集群
docker-compose -f docker-compose.yml stop
3.6 kafka集群检查
3.6.1 查看zookeeper集群是否正常
docker exec -it zoo1 bash
bin/zkServer.sh status
复制代码
3.6.2 创建topic
docker exec -it kafka1 bash
kafka-topics.sh --create --zookeeper zoo1:2181 --replication-factor 1 --partitions 3 --topic test001
kafka-topics.sh --list --zookeeper zoo1:2181
kafka-topics.sh --list --zookeeper zoo2:2181
kafka-topics.sh --list --zookeeper zoo3:2181
复制代码
3.6.3 生产消息
kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test001
下面两条警告日志不影响使用,实际是连通状态的
3.6.3 消费消息
kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic test001 --from-beginning
3.7 kafka-manager访问
3.7.1 浏览器打开访问
http://你的虚拟机ip:9000/
3.7.2 配置你创建的集群
3.7.2 下面就可以查看你的集群了
3 spring-boot连接kafka集群
3.1 application.yml配置
spring:
application:
name: kafka-service
kafka:
# 指定kafka 代理地址,可以多个
bootstrap-servers: 192.168.207.82:9092,192.168.207.82:9093,192.168.207.82:9094
producer:
retries: 0
# 每次批量发送消息的数量
batch-size: 16384
# 缓存容量
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 指定默认消费者group id
group-id: consumer-tutorial
auto-commit-interval: 100
auto-offset-reset: earliest
enable-auto-commit: true
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 指定listener 容器中的线程数,用于提高并发量
listener:
concurrency: 3
复制代码
3.2 启动类
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
@EnableEurekaClient
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
}
复制代码
3.3 用了监听启动观察kafka生产及消费情况,小伙伴们可以单测手动调试
3.3.1 KafkaProducer
@Component
@EnableScheduling
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 定时任务
*/
@Scheduled(cron = "00/1 * * * * ?")
public void send(){
String message = UUID.randomUUID().toString();
ListenableFuture<?> future = kafkaTemplate.send("test003","kafka message test");
String message2 = "第二种类的订阅消息发送";
ListenableFuture<?> future2 = kafkaTemplate.send("test002",message2);
future.addCallback(o -> System.out.println("send-消息发送成功:" + message), throwable -> System.out.println("消息发送失败:" + message));
future2.addCallback(o -> System.out.println("send-消息发送成功:" + message2), throwable -> System.out.println("消息发送失败:" + message2));
}
}
复制代码
3.3.1 KafkaConsumer
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"test001"})
public void receive(String message){
System.out.println("test001--消费消息:" + message);
}
@KafkaListener(topics = {"test002"})
public void receive2(String message){
System.out.println("test002--消费消息:" + message);
}
}
复制代码
以上spring-boot项目就自己搭建吧,这里不赘述了
踩坑不易,欢迎大家评阅
近期评论