docker-compose搭建kafka集群及spring

docker搭建kafka集群(基于centos7)

1 docker安装

yum remove docker \
                  docker-client \
                  docker-client-latest \
                  docker-common \
                  docker-latest \
                  docker-latest-logrotate \
                  docker-logrotate \
                  docker-selinux \
                  docker-engine-selinux \
                  docker-engine

复制代码

1.1 安装docker 源

yum install -y yum-utils device-mapper-persistent-data lvm2

yum-config-manager \
    --add-repo \
    http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
yum install docker-ce-18.03.0.ce
复制代码

1.2 安装docker所需工具

yum install -y yum-utils device-mapper-persistent-data lvm2

复制代码

1.3 安装指定版本docker

yum install -y docker-ce-18.09.9-3.el7
复制代码

1.4 启动docker

systemctl enable docker && systemctl start docker
复制代码

1.5 因国内下载docker镜像存在问题,需要配置下docker镜像源

vi /etc/docker/daemon.json 
## daemon.json 内容如下(daemon.json 初始不存在)
{
  "registry-mirrors": ["https://alzgoonw.mirror.aliyuncs.com","http://hub-mirror.c.163.com"]
}
复制代码

1.6 配置完docker 镜像后记得重启docker

systemctl restart docker
复制代码

2 docker-compose 安装

2.1 下载docker-compose

curl -L "https://github.com/docker/compose/releases/download/1.23.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
复制代码

2.2 添加docker-compose 执行命令

chmod +x /usr/local/bin/docker-compose
复制代码

3 使用docker-compose安装docker集群

3.1环境配置

3.1.1 kafka目录
mkdir kafka1
mkdir kafka2
mkdir kafka3
复制代码
3.1.2 zookeeper集群目录
mkdir zookeeper1
mkdir zookeeper2
mkdir zookeeper3
复制代码
3.1.3 zookeeper其他目录及配置
mkdir zooConfig
cd zooConfig
mkdir zoo1
mkdir zoo2
mkdir zoo3
复制代码

3.1.4在zoo1,zoo2,zoo3中分别创建myid文件,并写入分别写入id数字,如zoo1中的myid中写入1

echo 1 ./zoo1/myid
echo 2 ./zoo1/myid
echo 3 ./zoo1/myid
复制代码

3.1.5 创建zoo配置文件zoo.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/data
dataLogDir=/datalog
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1
server.1= 172.23.0.11:2888:3888
server.2= 172.23.0.12:2888:3888
server.3= 172.23.0.13:2888:3888
复制代码

3.2 创建docker容器网络

docker network create --driver bridge --subnet 172.23.0.0/25 --gateway 172.23.0.1 zookeeper_network

3.3 docker-compose.yml

version: '2'

services:

  zoo1:
    image: zookeeper:3.4.14
    restart: always
    container_name: zoo1
    hostname: zoo1
    ports:
    - "2181:2181"
    volumes:
    - "./zooConfig/zoo.cfg:/conf/zoo.cfg"
    - "/disk/docker/zookeeper1/data:/data"
    - "/disk/docker/zookeeper1/datalog:/datalog"
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
    networks:
      default:
        ipv4_address: 172.23.0.11

  zoo2:
    image: zookeeper:3.4.14
    restart: always
    container_name: zoo2
    hostname: zoo2
    ports:
    - "2182:2181"
    volumes:
    - "./zooConfig/zoo.cfg:/conf/zoo.cfg"
    - "/disk/docker/zookeeper2/data:/data"
    - "/disk/docker/zookeeper2/datalog:/datalog"
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
    networks:
      default:
        ipv4_address: 172.23.0.12

  zoo3:
    image: zookeeper:3.4.14
    restart: always
    container_name: zoo3
    hostname: zoo3
    ports:
    - "2183:2181"
    volumes:
    - "./zooConfig/zoo.cfg:/conf/zoo.cfg"
    - "/disk/docker/zookeeper3/data:/data"
    - "/disk/docker/zookeeper3/datalog:/datalog"
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
    networks:
      default:
        ipv4_address: 172.23.0.13

  kafka1:
    image: wurstmeister/kafka:2.12-2.0.1
    restart: always
    container_name: kafka1
    hostname: kafka1
    ports:
    - 9092:9092
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.18.255.9:9092
      KAFKA_ADVERTISED_HOST_NAME: kafka1
      KAFKA_HOST_NAME: kafka1
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_BROKER_ID: 0
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    volumes:
    - /etc/localtime:/etc/localtime
    - "/disk/docker/kafka1/logs:/kafka"
    links:
    - zoo1
    - zoo2
    - zoo3
    networks:
      default:
        ipv4_address: 172.23.0.14

  kafka2:
    image: wurstmeister/kafka:2.12-2.0.1
    restart: always
    container_name: kafka2
    hostname: kafka2
    ports:
    - 9093:9092
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.18.255.9:9093
      KAFKA_ADVERTISED_HOST_NAME: kafka2
      KAFKA_HOST_NAME: kafka2
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
      KAFKA_ADVERTISED_PORT: 9093
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    volumes:
    - /etc/localtime:/etc/localtime
    - "/disk/docker/kafka2/logs:/kafka"
    links:
    - zoo1
    - zoo2
    - zoo3
    networks:
      default:
        ipv4_address: 172.23.0.15

  kafka3:
    image: wurstmeister/kafka:2.12-2.0.1
    restart: always
    container_name: kafka3
    hostname: kafka3
    ports:
    - 9094:9092
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.18.255.9:9094
      KAFKA_ADVERTISED_HOST_NAME: kafka3
      KAFKA_HOST_NAME: kafka3
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
      KAFKA_ADVERTISED_PORT: 9094
      KAFKA_BROKER_ID: 2
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    volumes:
    - /etc/localtime:/etc/localtime
    - "/disk/docker/kafka3/logs:/kafka"
    links:
    - zoo1
    - zoo2
    - zoo3
    networks:
      default:
        ipv4_address: 172.23.0.16

  kafka-manager:
    image: hlebalbau/kafka-manager:1.3.3.22
    restart: always
    container_name: kafka-manager
    hostname: kafka-manager
    ports:
    - 9000:9000
    links:
    - kafka1
    - kafka2
    - kafka3
    - zoo1
    - zoo2
    - zoo3
    environment:
      ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181
      KAFKA_BROKERS: kafka1:9092,kafka2:9093,kafka3:9094
      APPLICATION_SECRET: letmein
      KAFKA_MANAGER_AUTH_ENABLED: "true"
      KAFKA_MANAGER_USERNAME: "admin"
      KAFKA_MANAGER_PASSWORD: "admin"
      KM_ARGS: -Djava.net.preferIPv4Stack=true
    networks:
      default:
        ipv4_address: 172.23.0.10

networks:
  default:
    external:
      name: zookeeper_network
复制代码

注意:

1.网上不少配置JMX_PORT端口,因启动就会出错,我这里没有配置,如必须要配置,见附录1kafka配置JMX_PORT出错解决方案;

2.将配置"PLAINTEXT://172.18.255.9:9094" 中的ip换成自己的机器的ip

3.4 一定要记得开启防火墙

## 开启kafka-manager外网访问端口
firewall-cmd --zone=public --add-port=9000/tcp --permanent
##这三个为kafka集群节点互相通信的端口,服务器端需要开启,否则各个节点间无法互通
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --zone=public --add-port=9093/tcp --permanent
firewall-cmd --zone=public --add-port=9094/tcp --permanent
## 重启防火墙
firewall-cmd --reload
## 最好重启下docker
systemctl restart docker
复制代码

3.5 启动kafka集群

3.5.1 启动集群

docker-compose -f docker-compose.yml up -d

image.png

3.5.2 停止集群

docker-compose -f docker-compose.yml stop

3.6 kafka集群检查

3.6.1 查看zookeeper集群是否正常
docker exec -it zoo1 bash
bin/zkServer.sh status
复制代码
3.6.2 创建topic
docker exec -it kafka1 bash
kafka-topics.sh --create --zookeeper zoo1:2181 --replication-factor 1 --partitions 3 --topic test001
kafka-topics.sh --list --zookeeper zoo1:2181
kafka-topics.sh --list --zookeeper zoo2:2181
kafka-topics.sh --list --zookeeper zoo3:2181
复制代码
3.6.3 生产消息

kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test001

image.png
下面两条警告日志不影响使用,实际是连通状态的

3.6.3 消费消息

kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic test001 --from-beginning

image.png

3.7 kafka-manager访问

3.7.1 浏览器打开访问

http://你的虚拟机ip:9000/

image.png

3.7.2 配置你创建的集群

image.png

3.7.2 下面就可以查看你的集群了

image.png

3 spring-boot连接kafka集群

3.1 application.yml配置

spring:
  application:
    name: kafka-service
  kafka:
    # 指定kafka 代理地址,可以多个
    bootstrap-servers: 192.168.207.82:9092,192.168.207.82:9093,192.168.207.82:9094
    producer:
      retries: 0
      # 每次批量发送消息的数量
      batch-size: 16384
      # 缓存容量
      buffer-memory: 33554432
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # 指定默认消费者group id
      group-id: consumer-tutorial
      auto-commit-interval: 100
      auto-offset-reset: earliest
      enable-auto-commit: true
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # 指定listener 容器中的线程数,用于提高并发量
    listener:
      concurrency: 3
复制代码

3.2 启动类

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
@EnableEurekaClient
public class KafkaApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class, args);
    }
}

复制代码

3.3 用了监听启动观察kafka生产及消费情况,小伙伴们可以单测手动调试

3.3.1 KafkaProducer
@Component
@EnableScheduling
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 定时任务
     */
    @Scheduled(cron = "00/1 * * * * ?")
    public void send(){
        String message = UUID.randomUUID().toString();
        ListenableFuture<?> future = kafkaTemplate.send("test003","kafka message test");
        String message2 = "第二种类的订阅消息发送";
        ListenableFuture<?> future2 = kafkaTemplate.send("test002",message2);
        future.addCallback(o -> System.out.println("send-消息发送成功:" + message), throwable -> System.out.println("消息发送失败:" + message));
        future2.addCallback(o -> System.out.println("send-消息发送成功:" + message2), throwable -> System.out.println("消息发送失败:" + message2));
    }
}
复制代码
3.3.1 KafkaConsumer
@Component
public class KafkaConsumer {

    @KafkaListener(topics = {"test001"})
    public void receive(String message){
        System.out.println("test001--消费消息:" + message);
    }

    @KafkaListener(topics = {"test002"})
    public void receive2(String message){
        System.out.println("test002--消费消息:" + message);
    }
}
复制代码

image.png
以上spring-boot项目就自己搭建吧,这里不赘述了

踩坑不易,欢迎大家评阅