페이지

2024년 9월 3일 화요일

What if I use the public IP?

 1. Advertised hostname is the most important setting of Kafka




But what if I put localhost? It works on my machine!

 1. Advertised hostname is the most important setting of Kafka




Understanding communications between Client and with Kafka

1. Advertised listeners is the most important setting of Kafka


2024년 9월 2일 월요일

Can I connect from the outside? e.g. my computer?

1. Zookeeper:

    - If you used Elastic private IPs(like in the tutorial), YES - demo

    - If you used Elastic public IPs                           YES


2. Kafka   

    - If you used Elastic private IPs(like in this tutorial), NO - demo

    - If you used Elastic public IPs                            YES




Hands On: Testing the cluster

1. Creating topics with a replication factor of 3

2. Publishing data to the topics

3. Reading data from the topics

4. Observing the Kafka filesystem

5. Deleting a topic

#!/bin/bash

# we can create topics with replication-factor 3 now!
bin/kafka-topics.sh --zookeeper zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --create --topic second_topic --replication-factor 3 --partitions 3

# we can publish data to Kafka using the bootstrap server list!
bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic second_topic

# we can read data using any broker too!
bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic second_topic --from-beginning

# we can create topics with replication-factor 3 now!
bin/kafka-topics.sh --zookeeper zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --create --topic third_topic --replication-factor 3 --partitions 3

# let's list topics
bin/kafka-topics.sh --zookeeper zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --list

# publish some data
bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic third_topic

# let's delete that topic
bin/kafka-topics.sh --zookeeper zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --delete --topic third_topic

# it should be deleted shortly:
bin/kafka-topics.sh --zookeeper zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --list


Hands On: Cluster Setup

1. Apply same operations to the other machines

2. Edit configurations to make sure the broker ids are different

3. Launch Cluster, observe the logs

4. Verify in Zookeeper that all three brokers are registered

#!/bin/bash

# Make sure you have done the steps from 5 on all machines
# we repeat the steps from 6

# Add file limits configs - allow to open 100,000 file descriptors
echo "* hard nofile 100000
* soft nofile 100000" | sudo tee --append /etc/security/limits.conf
sudo reboot
sudo service zookeeper start
sudo chown -R ubuntu:ubuntu /data/kafka

# edit the config
rm config/server.properties
# MAKE SURE TO USE ANOTHER BROKER ID
nano config/server.properties
# launch kafka - make sure things look okay
bin/kafka-server-start.sh config/server.properties

# Install Kafka boot scripts
sudo nano /etc/init.d/kafka
sudo chmod +x /etc/init.d/kafka
sudo chown root:root /etc/init.d/kafka
# you can safely ignore the warning
sudo update-rc.d kafka defaults

# start kafka
sudo service kafka start
# verify it's working
nc -vz localhost 9092
# look at the logs
cat /home/ubuntu/kafka/logs/server.log
# make sure to fix the __consumer_offsets topic
bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --config min.insync.replicas=1 --topic __consumer_offsets --alter

# read the topic on broker 1 by connecting to broker 2!
bin/kafka-console-consumer.sh --bootstrap-server kafka2:9092 --topic first_topic --from-beginning


# DO THE SAME FOR BROKER 3

# After, you should see three brokers here
bin/zookeeper-shell.sh localhost:2181
ls /kafka/brokers/ids

# you can also check the zoonavigator UI





2024년 8월 31일 토요일

Hands On: One Machine Setup

1. Augment the file handle limits

2. Launch Kafka on one machine

3. Setup Kafka as a Service


#!/bin/bash

# Add file limits configs - allow to open 100,000 file descriptors
echo "* hard nofile 100000
* soft nofile 100000" | sudo tee --append /etc/security/limits.conf

# reboot for the file limit to be taken into account
sudo reboot
sudo service zookeeper start
sudo chown -R ubuntu:ubuntu /data/kafka

# edit kafka configuration
rm config/server.properties
nano config/server.properties

# launch kafka
bin/kafka-server-start.sh config/server.properties

# Install Kafka boot scripts
sudo nano /etc/init.d/kafka
sudo chmod +x /etc/init.d/kafka
sudo chown root:root /etc/init.d/kafka
# you can safely ignore the warning
sudo update-rc.d kafka defaults

# start kafka
sudo service kafka start
# verify it's working
nc -vz localhost 9092
# look at the server logs
cat /home/ubuntu/kafka/logs/server.log


# create a topic
bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --create --topic first_topic --replication-factor 1 --partitions 3
# produce data to the topic
bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic first_topic
hi
hello
(exit)
# read that data
bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic first_topic --from-beginning
# list kafka topics
bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --list



############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# change your.host.name by your machine's IP or hostname
advertised.listeners=PLAINTEXT://kafka1:9092

# Switch to enable topic deletion or not, default value is false
delete.topic.enable=true

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/data/kafka

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=8
# we will have 3 brokers so the default replication factor should be 2 or 3
default.replication.factor=3
# number of ISR to have in order to minimize data loss
min.insync.replicas=1

############################# Log Retention Policy #############################

# The minimum age of a log file to be eligible for deletion due to age
# this will delete data after a week
log.retention.hours=168

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################## Other ##################################
# I recommend you set this to false in production.
# We'll keep it as true for the course
auto.create.topics.enable=true


#!/bin/bash
#/etc/init.d/kafka
DAEMON_PATH=/home/ubuntu/kafka/bin
DAEMON_NAME=kafka
# Check that networking is up.
#[ ${NETWORKING} = "no" ] && exit 0

PATH=$PATH:$DAEMON_PATH

# See how we were called.
case "$1" in
start)
# Start daemon.
pid=`ps ax | grep -i 'kafka.Kafka' | grep -v grep | awk '{print $1}'`
if [ -n "$pid" ]
then
echo "Kafka is already running"
else
echo "Starting $DAEMON_NAME"
$DAEMON_PATH/kafka-server-start.sh -daemon /home/ubuntu/kafka/config/server.properties
fi
;;
stop)
echo "Shutting down $DAEMON_NAME"
$DAEMON_PATH/kafka-server-stop.sh
;;
restart)
$0 stop
sleep 2
$0 start
;;
status)
pid=`ps ax | grep -i 'kafka.Kafka' | grep -v grep | awk '{print $1}'`
if [ -n "$pid" ]
then
echo "Kafka is Running as PID: $pid"
else
echo "Kafka is not Running"
fi
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
exit 1
esac

exit 0