Apache Kafka Setup & Testing Instructions for RHEL/CentOS7

Single Node Setup Instructions

1. Ensure you have the syndeia-cloud-3.3.${build_number}_cassandra_zookeeper_kafka_setup.zip downloaded to your home directory (or home directory's Downloads folder) from the download/license instructions sent out by our team.  

(info)  Note: the .ZIP will pre-create a separate folder for its contents when extracted so there is no need to pre-create a separate folder for it.  

2. Review Kafka's recommendations, ie: (Open|Oracle)JDK/JRE, memory, FS selection, params, etc. (see http://kafka.apache.org/documentation/#java till the Monitoring section for more details, however keep in mind most of it was written when deploying as a cluster).  

3. If using a firewall, ensure the following port is accessible (consult your local network admin if required): TCP port 9092 (this is the port to listen to for client connections).  

(info) Note: If required by your IT department, perform any other standard configuration, server hardening (ie: enabling & configuring local firewall, etc.)



Download, Install, Configure & Run Apache Kafka

1. Download Kafka 2.11-1.1.0 from http://kafka.apache.org/downloads#1.1.0 (ie: wget https://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz)

2. Use tar to extract the .tar.gz file to /opt/ , ie:  KAFKA_build_ver=2.11-1.1.1 ; sudo tar -xvzf kafka_${KAFKA_build_ver}.tgz -C /opt/ ; where KAFKA_build_ver = the version you downloaded, ex:  2.11-1.1.1

3. Create/update a symlink to the current version, ie:  sudo ln -nfs /opt/kafka_${KAFKA_build_ver} /opt/kafka-current

4. Create a new group named  kafka-zookeeper, ie:  sudo groupadd --system kafka-zookeeper

(info) Note, if you've already setup Zookeeper, this group should already exist.   

5. Create a new user named kafka in the previously named group, ie:  sudo useradd --system --groups kafka-zookeeper kafka

6. Take ownership of the extracted folder & symlink, ie:  sudo chown -R kafka:kafka-zookeeper /opt/kafka_${KAFKA_build_ver}; sudo chown -h kafka:kafka-zookeeper /opt/kafka-current

7.  Edit /opt/kafka-${KAFKA_build_ver}/config/server.properties and change log.dirs=/tmp/kafka-logs (on ~L60) to log.dirs=/opt/kafka-current/logs

8.  Start the Kafka service, ie:  /opt/kafka_${KAFKA_build_ver}/bin/kafka-server-start.sh /opt/kafka_${KAFKA_build_ver}/config/server.properties

(info) Note, Apache Kafkfa doesn't include a native systemd .service file by default.  While systemd will dynamically create one at runtime via its SysV compatibility module, you may wish to create one yourself to exercise better control over the various service parameters.  For your convenience we have created a systemd kafka.service file (included in the syndeia-cloud-3.3.${build_number}_cassandra_zookeeper_kafka_setup.zip download).  To use this, copy it to /etc/systemd/system, reload systemd units, enable kafka to start on boot and start the service, ie:  sudo cp <service_file_download_dir>/kafka.service /etc/systemd/system/. && sudo systemctl daemon-reload && sudo systemctl enable kafka && sudo systemctl start kafka 

9. If the service successfully starts you should get the command prompt again.  If you are using the systemd service file you can verify that it started by verifying "Active: active (running)" shows up in the output of systemctl status kafka

$ sudo systemctl status kafka
● kafka.service
   Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: enabled)
   Active: active (running) since Thu 2019-05-23 20:53:46 EDT; 1 weeks 3 days ago
  Process: 12023 ExecStop=/opt/kafka_2.11-1.1.0/bin/kafka-server-stop.sh (code=exited, status=0/SUCCESS)
 Main PID: 12099 (java)
   CGroup: /system.slice/kafka.service
           └─12099 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Xloggc:

10. To examine the log file, you can use less /opt/kafka_2.11-1.1.0/logs/server.log.  To follow the log, you can use tail -f /opt/kafka_2.11-1.1.0/logs/server.log . 

(info) Note, for your convenience you may wish to create a symlink to /opt/kafka_2.11-1.1.0/logs/ from /var/log, ie: sudo ln -nfs /opt/kafka_2.11-1.1.0/logs/ /var/log/kafka

    You should see output similar to the following (abridged) text:

$ less /opt/kafka_2.11-1.1.0/logs/server.log
[...]
[2019-04-05 20:07:30,673] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-04-05 20:07:31,000] INFO starting (kafka.server.KafkaServer)
[2019-04-05 20:07:31,001] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2019-04-05 20:07:31,015] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2019-04-05 20:07:31,032] INFO Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,032] INFO Client environment:host.name=kafka.domain.tld (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,032] INFO Client environment:java.version=1.8.0_131 (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,032] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,032] INFO Client environment:java.home=/usr/lib/jvm/java-8-oracle/jre (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,032] INFO Client environment:java.class.path=/opt/kafka_2.11-1.1.0/bin/../libs/aopalliance-repackaged-2.5.0-b32.jar:/opt/kafka_2.11-1.1.0/bin/../libs/argparse4j-0.7.0
.jar:/opt/kafka_2.11-1.1.0/bin/../libs/commons-lang3-3.5.jar:/opt/kafka_2.11-1.1.0/bin/../libs/connect-api-1.1.0.jar:/opt/kafka_2.11-1.1.0/bin/../libs/connect-file-1.1.0.jar:/opt/kafka_2.11-1.1.0/bin/../libs/connect-json-1.1.0.jar:/opt/kafka_2.11-1.1.0/bin/../libs/connect-runtime-1.1.0.jar:/opt/kafka_2.11-1.1.0/bin/../libs/connect-transforms-1.1.0.jar:/opt/kafka_2.11-1.1.0/bin/../libs/guava-20.0.jar:/opt/kafka_2.11-1.1.0/bin/../libs/hk2-api-2.5.0-b32.jar:/opt/kafka_2.11-1.1.0/bin/../libs/hk2-locator-2.5.0-b32.jar:/opt/kafka_2.11-1.1.0/bin/../libs/hk2-utils-2.5.0-b32.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jackson-annotations-2.9.4.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jackson-core-2.9.4.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jackson-databind-2.9.4.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jackson-jaxrs-base-2.9.4.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jackson-jaxrs-json-provider-2.9.4.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jackson-module-jaxb-annotations-2.9.4.jar:/opt/kafka_2.11-1.1.0/bin/../libs/javassist-3.20.0-GA.jar:/opt/kafka_2.11-1.1.0/bin/../libs/javassist-3.21.0-GA.jar:/opt/kafka_2.11-1.1.0/bin/../libs/javax.annotation-api-1.2.jar:/opt/kafka_2.11-1.1.0/bin/../libs/javax.inject-1.jar:/opt/kafka_2.11-1.1.0/bin/../libs/javax.inject-2.5.0-b32.jar:/opt/kafka_2.11-1.1.0/bin/../libs/javax.servlet-api-3.1.0.jar:/opt/kafka_2.11-1.1.0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jersey-client-2.25.1.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jersey-common-2.25.1.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jersey-container-servlet-2.25.1.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jersey-container-servlet-core-2.25.1.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jersey-guava-2.25.1.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jersey-media-jaxb-2.25.1.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jersey-server-2.25.1.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jetty-client-9.2.24.v20180105.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jetty-continuation-9.2.24.v20180105.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jetty-http-9.2.24.v20180105.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jetty-io-9.2.24.v20180105.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jetty-security-9.2.24.v20180105.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jetty-server-9.2.24.v20180105.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jetty-servlet-9.2.24.v20180105.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jetty-servlets-9.2.24.v20180105.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jetty-util-9.2.24.v20180105.jar:/opt/kafka_2.11-1.1.0/bin/../libs/jopt-simple-5.0.4.jar:/opt/kafka_2.11-1.1.0/bin/../libs/kafka_2.11-1.1.0.jar:/opt/kafka_2.11-1.1.0/bin/../libs/kafka_2.11-1.1.0-sources.jar:/opt/kafka_2.11-1.1.0/bin/../libs/kafka_2.11-1.1.0-test-sources.jar:/opt/kafka_2.11-1.1.0/bin/../libs/kafka-clients-1.1.0.jar:/opt/kafka_2.11-1.1.0/bin/../libs/kafka-log4j-appender-1.1.0.jar:/opt/kafka_2.11-1.1.0/bin/../libs/kafka-streams-1.1.0.jar:/opt/kafka_2.11-1.1.0/bin/../libs/kafka-streams-examples-1.1.0.jar:/opt/kafka_2.11-1.1.0/bin/../libs/kafka-streams-test-utils-1.1.0.jar:/opt/kafka_2.11-1.1.0/bin/../libs/kafka-tools-1.1.0.jar:/opt/kafka_2.11-1.1.0/bin/../libs/log4j-1.2.17.jar:/opt/kafka_2.11-1.1.0/bin/../libs/lz4-java-1.4.jar:/opt/kafka_2.11-1.1.0/bin/../libs/maven-artifact-3.5.2.jar:/opt/kafka_2.11-1.1.0/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka_2.11-1.1.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/opt/kafka_2.11-1.1.0/bin/../libs/plexus-utils-3.1.0.jar:/opt/kafka_2.11-1.1.0/bin/../libs/reflections-0.9.11.jar:/opt/kafka_2.11-1.1.0/bin/../libs/rocksdbjni-5.7.3.jar:/opt/kafka_2.11-1.1.0/bin/../libs/scala-library-2.11.12.jar:/opt/kafka_2.11-1.1.0/bin/../libs/scala-logging_2.11-3.7.2.jar:/opt/kafka_2.11-1.1.0/bin/../libs/scala-reflect-2.11.12.jar:/opt/kafka_2.11-1.1.0/bin/../libs/slf4j-api-1.7.25.jar:/opt/kafka_2.11-1.1.0/bin/../libs/slf4j-log4j12-1.7.25.jar:/opt/kafka_2.11-1.1.0/bin/../libs/snappy-java-1.1.7.1.jar:/opt/kafka_2.11-1.1.0/bin/../libs/validation-api-1.1.0.Final.jar:/opt/kafka_2.11-1.1.0/bin/../libs/zkclient-0.10.jar:/opt/kafka_2.11-1.1.0/bin/../libs/zookeeper-3.4.10.jar (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,032] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,032] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,033] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,033] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,033] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,033] INFO Client environment:os.version=4.18.16-x86_64 (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,033] INFO Client environment:user.name=kafka (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,033] INFO Client environment:user.home=/home/kafka (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,033] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,034] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@49ec71f8 (org.apache.zookeeper.ZooKeeper)
[2019-04-05 20:07:31,047] INFO [ZooKeeperClient] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2019-04-05 20:07:31,048] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2019-04-05 20:07:31,053] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2019-04-05 20:07:31,059] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x169eeac4d530002, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2019-04-05 20:07:31,062] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)
[2019-04-05 20:07:31,283] INFO Cluster ID = k7If0XwzQS6n-iqqQz6ztr (kafka.server.KafkaServer)

11.  Validate correct operation and create/update an archive image to use as a new base image if the node needs to be rebuilt or if you wish to create a cluster. 

(info)  Before making the image you may wish to first stop and optionally disable the service temporarily to prevent auto-start on boot, ie:  sudo systemctl disable kafkfa.   



Multi-Node (Cluster) Setup Instructions (Adding nodes to an existing single-node)

11. Deploy another instance of your Kafka base image.
12. Make any appropriate changes for the MAC address (ex: in the VM settings and/or udev, if used).
13. Setup forward & reverse DNS records on your DNS server (consult your IT admin/sysadmin if required) and set the hostname and primary DNS suffix on the machine itself (sudo hostnamectl set-hostname <new_Kafka_node_FQDN> where FQDN = Fully Qualified Domain Name, ex: kafkfa2.mycompany.com )
14. SSH to the IP (or the FQDN of the new node if DNS has already propagated).

(info) Note: If using Fail2Ban, update the sender line in /etc/fail2ban/jail.local to root@<new_Kafkfa_node_FQDN>. Restart the fail2ban service (sudo systemctl restart fail2ban)

15. Follow steps 3-5 of "Quickstart" http://kafka.apache.org/quickstart

16. Repeat steps 11 ~ 15 for each additional cluster node.



Validating Kafka Operation (or Cluster Replication) for 1-node (or multiple nodes)

17. To validate Kafka operation, we create a sample test topic, start a producer, put some messages into the queue, start a consumer script, and validate we see them.  To do this, perform the following steps:  

17.1.  Open two new terminal windows

17.2.  Follow steps 3-5 of "Quickstart" http://kafka.apache.org/0110/documentation.html#quickstart .  You should see messages input into the producer show up on the consumer side:    

17.3.  To quit hit ^C in the producer and consumer scripts. 


.....Page break.....