Step 1: Add a System User Kafka

Create a new user called ‘kafka’, after which we will add this new user as a sudoer.

root@bd:~# adduser kafka
root@bd:~# usermod -aG sudo kafka

Step 2: Download Apache Kafka

su - kafka
kafka@bd:~$ mkdir -p ~/kafka
kafka@bd:~$ cd kafka
kafka@bd:~/kafka$ wget https://mirrors.estointernet.in/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz

--2020-07-09: command not found
root@bd:~# Resolving mirrors.estointernet.in (mirrors.estointernet.in)... 43.255.166.254, 2403:8940:3:1::f
-bash: syntax error near unexpected token `('
root@bd:~# Connecting to mirrors.estointernet.in (mirrors.estointernet.in)|43.255.166.254|:443... connected.
-bash: syntax error near unexpected token `('
root@bd:~# HTTP request sent, awaiting response... 200 OK
HTTP: command not found
root@bd:~# Length: 61604633 (59M) [application/octet-stream]
-bash: syntax error near unexpected token `('
root@bd:~# Saving to: ‘kafka_2.12-2.5.0.tgz.1’
Saving: command not found
root@bd:~#
root@bd:~# kafka_2.12-2.5.0.tgz.1 79%[========================================================> ] 46.90M 1.31MB/s e

Apache Kafka gz has been downloaded, now we need to extract it in kafka directory

kafka@bd:~/kafka$ tar -xzvf kafka_2.12-2.5.0.tgz

This is the Directory structure of kafka

Step 3: Configure Apache Kafka

To be able to delete topics, take off ‘#’ from ~/kafka/config/server.properties

kafka@bd:~$ vi kafka/kafka_2.12-2.5.0/config/server.properties 

Append the following line to the last line of the configuration file.

delete.topic.enable = true

Step 3: Start Zookeeper service

kafka@bd:~/kafka$ sudo service zookeeper start
kafka@bd:~/kafka$ sudo service zookeeper status
[sudo] password for kafka:
zookeeper.service - LSB: centralized coordination service Loaded: loaded (/etc/init.d/zookeeper; generated)
Active: active (running) since Wed 2020-07-08 23:12:57 IST; 1h 22min ago Docs: man:systemd-sysv-generator(8) Process: 20703 ExecStop=/etc/init.d/zookeeper stop (code=exited, status=0/SUCCESS) Process: 20855 ExecStart=/etc/init.d/zookeeper start (code=exited, status=0/SUCCESS) Tasks: 20 (limit: 4915) CGroup: /system.slice/zookeeper.service └─20865 /usr/bin/java -cp /etc/zookeeper/conf:/usr/share/java/jline.jar:/usr/share/java/log4j-1.2.jar:/usr/share/java/xercesImpl.jar:/usr/s Jul 08 23:12:57 bd systemd[1]: Starting LSB: centralized coordination service... Jul 08 23:12:57 bd systemd[1]: Started LSB: centralized coordination service. lines 1-12/12 (END)

Step 3: Start Kafka server (broker)

start kafka server kafka-server-start.sh with server.properies file

kafka@bd:~/kafka/kafka_2.12-2.5.0/bin$ ./kafka-server-start.sh ~/kafka/kafka_2.12-2.5.0/config/server.properties

nceId=Some(null), clientId=consumer-console-consumer-6451-1, clientHost=/192.168.0.108, sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, supportedProtocols=List(range), ).groupInstanceId of group console-consumer-6451 loaded with member id consumer-console-consumer-6451-1-2c9a81f8-d685-419c-adc7-704fb1952773 at generation 1. (kafka.coordinator.group.GroupMetadata$)
[2020-07-09 00:42:02,414] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-45 in 3 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-09 00:42:02,414] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-48 in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

Now, we can check listening ports:

  1. ZooKeeper : 2181
  2. Kafka : 9092
netstat -nlpt | grep 2181
netstat -nlpt | grep 9092

Step 4: Create a Topic

we will create a topic named “Topic1”, with a single partition and only one replica:

kafka@bd:~/kafka/kafka_2.12-2.5.0/bin$ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Topic1
Created topic Topic1. kafka@bd:~/kafka/kafka_2.12-2.5.0/bin$

replication-factor : describes how many copies of data will be created. We are running with a single instance, so the value would be 1

partitions :  describe the number of brokers you want your data to be split between. We are running with a single broker, so the value would be 1

List Created Topics

kafka@bd:~/kafka/kafka_2.12-2.5.0/bin$ ./kafka-topics.sh --list --zookeeper localhost:2181

Step 5: Send Messages using Apache Kafka

Apache Kafka comes with a command line client that will take input from a file or standard input and send it out as messages to the Kafka cluster. The “producer” is the process that has responsibility for putting data into our Kafka service. By default, Kafka sends each line as a separate message.

Let’s run the producer and then type a few messages into the console to send to the server.

kafka@bd:~/kafka/kafka_2.12-2.5.0/bin$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic Topic1
>HI I am sachin
>Welcome to my website sqlnosql
>hi
>how are you ?

Keep the terminal opened,Open one more Terminal to start Kafka Consumer

Step 6: Use Apache Kafka as a Consumer

kafka@bd:~/kafka/kafka_2.12-2.5.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic1  --from-beginning
HI I am sachin
Welcome to my website sqlnosql
hi
how are you ?

Together Console Output

Even If you close consumer and producer is listening, If consumer restarts it will start giving output.

The messages will be immediately visible on our consumer terminal.