Apache Kafka Setup & Testing Instructions for RHEL/CentOS/Alma Linux 7-8

Single Node Setup Instructions

1. Ensure you have the syndeia-cloud-3.5_cassandra_zookeeper_kafka_setup.zip (or latest service pack) downloaded to your home directory from the download/license instructions sent out by our team.  

(info)  Note: the .ZIP will pre-create a separate folder for its contents when extracted so there is no need to pre-create a separate folder for it.  

2. Review Kafka's recommendations, ie: (Open|Oracle)JDK/JRE, memory, FS selection, params, etc. (see http://kafka.apache.org/documentation/#java till the Monitoring section for more details, however keep in mind most of it was written when deploying as a cluster).  

3. If using a firewall, ensure the following port is accessible (consult your local network admin if required): TCP port 9092 (this is the port to listen to for client connections).  



Download, Install, Configure & Run Apache Kafka

1. Download Kafka 2.13-3.2.1 from http://kafka.apache.org/downloads#3.2.1 (ie: wget https://archive.apache.org/dist/kafka/3.2.1/kafka_2.13-3.2.1.tgz)

2. Use tar to extract the .tar.gz file to /opt/ , ie:  KAFKA_build_ver=2.13-3.2.1 ; sudo tar -xvzf kafka_${KAFKA_build_ver}.tgz -C /opt/ ; where KAFKA_build_ver = the version you downloaded, ex:  2.13-3.2.1

3. Create/update a symlink to the current version, ie:  sudo ln -nfs /opt/kafka_${KAFKA_build_ver} /opt/kafka-current

4. Create a new group named  kafka-zookeeper, ie:  sudo groupadd --system kafka-zookeeper

(info) Note, if you've already setup Zookeeper, this group should already exist.   

5. Create a new user named kafka in the previously named group, ie:  sudo useradd --system --groups kafka-zookeeper kafka

6. Take ownership of the extracted folder & symlink, ie:  sudo chown -R kafka:kafka-zookeeper /opt/kafka_${KAFKA_build_ver}; sudo chown -h kafka:kafka-zookeeper /opt/kafka-current

7.  Edit /opt/kafka_${KAFKA_build_ver}/config/server.properties and change log.dirs=/tmp/kafka-logs (on ~L60) to log.dirs=/opt/kafka-current/logs

8.  Start the Kafka service, ie:  /opt/kafka_${KAFKA_build_ver}/bin/kafka-server-start.sh /opt/kafka_${KAFKA_build_ver}/config/server.properties, you should get some console output with no errors.  Hit CTRL-C to halt.  

(info) Note, Apache Kafkfa doesn't include a native systemd .service file by default.  While systemd will dynamically create one at runtime via its SysV compatibility module, you may wish to create one yourself to exercise better control over the various service parameters.  For your convenience we have created a systemd kafka.service file (included in the syndeia-cloud-3.4_cassandra_zookeeper_kafka_setup.zip download).  To use this, copy it to /etc/systemd/system, reload systemd units, enable kafka to start on boot and start the service, ie:  sudo cp <service_file_download_dir>/kafka.service /etc/systemd/system/. && sudo systemctl daemon-reload && sudo systemctl enable kafka && sudo systemctl start kafka 

9. If you are using the systemd service file (highly recommended, see note above) you can verify that it started by verifying "Active: active (running)" shows up in the output of systemctl status kafka

$ sudo systemctl status kafka
● kafka.service
   Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: enabled)
   Active: active (running) since Thu 2019-05-23 20:53:46 EDT; 1 weeks 3 days ago
  Process: 12023 ExecStop=/opt/kafka_2.13-3.2.1/bin/kafka-server-stop.sh (code=exited, status=0/SUCCESS)
 Main PID: 12099 (java)
   CGroup: /system.slice/kafka.service
           └─12099 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Xloggc:

10. To examine the log file, you can use less /opt/kafka_2.13-3.2.1/logs/server.log.  To follow the log, you can use tail -f /opt/kafka_2.13-3.2.1/logs/server.log . 

(info) Note, for your convenience you may wish to create a symlink to /opt/kafka_2.13-3.2.1/logs/ from /var/log, ie: sudo ln -nfs /opt/kafka_2.13-3.2.1/logs/ /var/log/kafka

    You should see output similar to the following (abridged) text:

$ less /opt/kafka_2.13-3.2.1/logs/server.log
[...]
[2019-04-05 20:07:30,673] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-04-05 20:07:31,000] INFO starting (kafka.server.KafkaServer)
[2019-04-05 20:07:31,001] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2019-04-05 20:07:31,015] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2019-04-05 20:07:31,032] INFO Client environment:zookeeper.version=3.6.3-6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, built on 04/08/2021 16:35 GMT (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,032] INFO Client environment:host.name=kafka.domain.tld (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,032] INFO Client environment:java.version=1.8.0_332 (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,032] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,032] INFO Client environment:java.home=/usr/lib/jvm/java-8-oracle/jre (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,032] INFO Client environment:java.class.path=/opt/kafka_2.13-3.2.1/bin/../libs/aopalliance-repackaged-2.5.0-b32.jar:/opt/kafka_2.13-3.2.1/bin/../libs/argparse4j-0.7.0
.jar:/opt/kafka_2.13-3.2.1/bin/../libs/commons-lang3-3.5.jar:/opt/kafka_2.13-3.2.1/bin/../libs/connect-api-1.1.0.jar:/opt/kafka_2.13-3.2.1/bin/../libs/connect-file-1.1.0.jar:/opt/kafka_2.13-3.2.1/bin/../libs/connect-json-1.1.0.jar:/opt/kafka_2.13-3.2.1/bin/../libs/connect-runtime-1.1.0.jar:/opt/kafka_2.13-3.2.1/bin/../libs/connect-transforms-1.1.0.jar:/opt/kafka_2.13-3.2.1/bin/../libs/guava-20.0.jar:/opt/kafka_2.13-3.2.1/bin/../libs/hk2-api-2.5.0-b32.jar:/opt/kafka_2.13-3.2.1/bin/../libs/hk2-locator-2.5.0-b32.jar:/opt/kafka_2.13-3.2.1/bin/../libs/hk2-utils-2.5.0-b32.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jackson-annotations-2.9.4.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jackson-core-2.9.4.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jackson-databind-2.9.4.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jackson-jaxrs-base-2.9.4.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jackson-jaxrs-json-provider-2.9.4.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jackson-module-jaxb-annotations-2.9.4.jar:/opt/kafka_2.13-3.2.1/bin/../libs/javassist-3.20.0-GA.jar:/opt/kafka_2.13-3.2.1/bin/../libs/javassist-3.21.0-GA.jar:/opt/kafka_2.13-3.2.1/bin/../libs/javax.annotation-api-1.2.jar:/opt/kafka_2.13-3.2.1/bin/../libs/javax.inject-1.jar:/opt/kafka_2.13-3.2.1/bin/../libs/javax.inject-2.5.0-b32.jar:/opt/kafka_2.13-3.2.1/bin/../libs/javax.servlet-api-3.1.0.jar:/opt/kafka_2.13-3.2.1/bin/../libs/javax.ws.rs-api-2.0.1.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jersey-client-2.25.1.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jersey-common-2.25.1.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jersey-container-servlet-2.25.1.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jersey-container-servlet-core-2.25.1.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jersey-guava-2.25.1.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jersey-media-jaxb-2.25.1.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jersey-server-2.25.1.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jetty-client-9.2.24.v20180105.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jetty-continuation-9.2.24.v20180105.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jetty-http-9.2.24.v20180105.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jetty-io-9.2.24.v20180105.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jetty-security-9.2.24.v20180105.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jetty-server-9.2.24.v20180105.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jetty-servlet-9.2.24.v20180105.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jetty-servlets-9.2.24.v20180105.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jetty-util-9.2.24.v20180105.jar:/opt/kafka_2.13-3.2.1/bin/../libs/jopt-simple-5.0.4.jar:/opt/kafka_2.13-3.2.1/bin/../libs/kafka_2.13-3.2.1.jar:/opt/kafka_2.13-3.2.1/bin/../libs/kafka_2.13-3.2.1-sources.jar:/opt/kafka_2.13-3.2.1/bin/../libs/kafka_2.13-3.2.1-test-sources.jar:/opt/kafka_2.13-3.2.1/bin/../libs/kafka-clients-1.1.0.jar:/opt/kafka_2.13-3.2.1/bin/../libs/kafka-log4j-appender-1.1.0.jar:/opt/kafka_2.13-3.2.1/bin/../libs/kafka-streams-1.1.0.jar:/opt/kafka_2.13-3.2.1/bin/../libs/kafka-streams-examples-1.1.0.jar:/opt/kafka_2.13-3.2.1/bin/../libs/kafka-streams-test-utils-1.1.0.jar:/opt/kafka_2.13-3.2.1/bin/../libs/kafka-tools-1.1.0.jar:/opt/kafka_2.13-3.2.1/bin/../libs/log4j-1.2.17.jar:/opt/kafka_2.13-3.2.1/bin/../libs/lz4-java-1.4.jar:/opt/kafka_2.13-3.2.1/bin/../libs/maven-artifact-3.5.2.jar:/opt/kafka_2.13-3.2.1/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka_2.13-3.2.1/bin/../libs/osgi-resource-locator-1.0.1.jar:/opt/kafka_2.13-3.2.1/bin/../libs/plexus-utils-3.1.0.jar:/opt/kafka_2.13-3.2.1/bin/../libs/reflections-0.9.11.jar:/opt/kafka_2.13-3.2.1/bin/../libs/rocksdbjni-5.7.3.jar:/opt/kafka_2.13-3.2.1/bin/../libs/scala-library-2.11.12.jar:/opt/kafka_2.13-3.2.1/bin/../libs/scala-logging_2.11-3.7.2.jar:/opt/kafka_2.13-3.2.1/bin/../libs/scala-reflect-2.11.12.jar:/opt/kafka_2.13-3.2.1/bin/../libs/slf4j-api-1.7.25.jar:/opt/kafka_2.13-3.2.1/bin/../libs/slf4j-log4j12-1.7.25.jar:/opt/kafka_2.13-3.2.1/bin/../libs/snappy-java-1.1.7.1.jar:/opt/kafka_2.13-3.2.1/bin/../libs/validation-api-1.1.0.Final.jar:/opt/kafka_2.13-3.2.1/bin/../libs/zkclient-0.10.jar:/opt/kafka_2.13-3.2.1/bin/../libs/zookeeper-3.4.10.jar (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,032] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,032] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,033] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,033] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,033] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,033] INFO Client environment:os.version=4.18.16-x86_64 (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,033] INFO Client environment:user.name=kafka (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,033] INFO Client environment:user.home=/home/kafka (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,033] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,034] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@49ec71f8 (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,047] INFO [ZooKeeperClient] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2019-04-05 20:07:31,048] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2019-04-05 20:07:31,053] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2019-04-05 20:07:31,059] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x169eeac4d530002, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2019-04-05 20:07:31,062] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)
[2019-04-05 20:07:31,283] INFO Cluster ID = k7If0XwzQS6n-iqqQz6ztr (kafka.server.KafkaServer)

11.  Validate correct operation and create/update an archive image to use as a new base image if the node needs to be rebuilt or if you wish to create a cluster. 

(info)  Before making the image you may wish to first stop and optionally disable the service temporarily to prevent auto-start on boot, ie:  sudo systemctl disable kafkfa.   



Multi-Node (Cluster) Setup Instructions (Adding nodes to an existing single-node SC deployment)

11. Deploy another instance of your Kafka base image.
12. Make any appropriate changes for the MAC address (ex: in the VM settings and/or udev, if used).
13. Setup forward & reverse DNS records on your DNS server (consult your IT admin/sysadmin if required) and set the hostname and primary DNS suffix on the machine itself (sudo hostnamectl set-hostname <new_Kafka_node_FQDN> where FQDN = Fully Qualified Domain Name, ex: kafkfa2.mycompany.com )
14. SSH to the IP (or the FQDN of the new node if DNS has already propagated).

(info) Note: If using Fail2Ban, update the sender line in /etc/fail2ban/jail.local to root@<new_Kafkfa_node_FQDN>. Restart the fail2ban service (sudo systemctl restart fail2ban)

15A. On the ORIGINAL Zookeeper (ZK) node:  ensure you have already re-bound the Zookeeper port to an external interface that the additional NEW Kafka node can reach, note the FQDN of the ORIGINAL Zookeeper node.  

15B. On the NEW Kafka node:  Update the Kafka Config file (/opt/kafka-current/config/server.properties) : ie, change the zookeeper.connect (on ~L125) from localhost / 127.0.0.1 to the name of the ORIGINAL Zookeeper node, ex:  

zookeeper.connect=my-ZK-server.domain.tld:2181

(info) Note, it is possible to list additional failover zNode servers by just listing each additional server as comma-separated entries, but that is beyond the scope of this document, see https://kafka.apache.org/32/documentation.html#brokerconfigs_zookeeper.connect for more details

(info) Note, if your topology evolution involves moving Kafka COMPLETELY off the original single-node SC deployment, ex: you are moving to host Kafka on a managed service provider (ex:  AWS Managed Services for Kafka (MSK)), you will want to disable both the local Apache Zookeeper and Kafka services and update kafka_native = "http://localhost:9092" in all SC application.conf files under each service in /opt/syndeia-cloud/current/... on the SC node(s) to point to Kafka on the managed service provider, ex:  

for i in /opt/icx/syndeia-cloud-current/**/conf/application.conf; do sudo -u syndeia-cloud sed -i 's#kafka_native = "http://localhost:9092"#kafka_native = "http://my-Kafka-server.domain.tld:9092"#' $i; done;

15C. On the ORIGINAL Kafka node:  Migrate / Reassign Partitions Topics:  Follow https://kafka.apache.org/32/documentation.html#basic_ops_cluster_expansion

16. Repeat steps 11 ~ 15 for each additional cluster node.



Validating Kafka Operation (or Cluster Replication) for 1-node (or multiple nodes)

17. To validate Kafka operation, we create a sample test topic, start a producer, put some messages into the queue, start a consumer script, and validate we see them.  To do this, perform the following steps:  

17.1.  Open two new terminal windows

17.2.  Follow steps 3-5 of "Quickstart" https://kafka.apache.org/32/documentation.html#quickstart .  You should see messages input into the producer show up on the consumer side:    

17.3.  To quit hit ^C in the producer and consumer scripts. 

Related pages