An Interest In:
Web News this Week
- April 3, 2024
- April 2, 2024
- April 1, 2024
- March 31, 2024
- March 30, 2024
- March 29, 2024
- March 28, 2024
Creating a Development Environment for Spark Structured Streaming, Kafka, and Prometheus
Docker-compose allows us to simulate pretty complex programming setups in our local environments. It is very fun to test some hard-to-maintain technologies such as Kafka and Spark using Docker-compose.
A few months ago, I created a demo application while using Spark Structured Streaming, Kafka, and Prometheus within the same Docker-compose file. One can extend this list with an additional Grafana service. The codebase was in Python and I was ingesting live Crypto-currency prices into Kafka and consuming those through Spark Structured Streaming. In this write-up instead of talking about the Watermarks and Sinking types in Spark Structured Streaming, I will be only talking about the Docker-compose and how I set up my development environment using Spark, Kafka, Prometheus, and a Zookeeper. To have the whole codebase for my demo project, please refer to the Github repository.
Service Blocks
In the Docker-compose, I needed the following services to keep my streaming data producer and consumer live, at the same time monitor the ingestions into Kafka:
- Spark standalone cluster: Consisting of one master and a worker code
- Spark-master
- Spark-worker
- Zookeeper: A requirement for Kafka (soon it will not be a requirement) to maintain the brokers and topics. For instance, if a broker joins or dies, Zookeeper informs the cluster.
- Kafka: A Message-oriented Middleware (MoM) for dealing with large streams of data. In this case, we have streams of crypto-currency prices.
- Prometheus-JMX-Exporter: An exporter to connect Java Management Extensions (JMX) and translate into the language that Prometheus can understand. Remembering the Kafka is an example of a Java application, this will be a magic service that enables us to scrape Kafka metrics automatically.
- Prometheus: Time-series database logging and modern alerting tool.
Spark Services
In the most basic setup for the standalone Spark cluster, we need one master and one worker node. You can use Docker-compose volumes for mounting folders. For Spark, perhaps the most common mounting reason is sharing the connectors (.jar
files) or scripts.
For retrieving a Spark image from Docker Hub, as Big Data Europe has a very stable and extensive set of Spark Hadoop images, I preferred to use their images in my demo project. This prevented also some redundant work, like creating multiple Dockerfiles per Spark node.
I needed to take care of the Networking within the Docker-compose settings. Hence, I created a Bridge network with a custom naming as "crypto-network". The Bridge network enables us to run our standalone containers while communicating with each other. For more information about different network drivers in Docker containers, please refer to Docker documentation, very fun to read. While setting up I tried to give different forwarded host ports rather than using 8080 for the Web UI to prevent conflicts with JMX-Exporter. Besides, I wanted the worker nodes to be dependent on the master node to set up the order of container creations.
Lastly, following the BDE example, I override the SPARK_MASTER
with environment variables. Here I am sharing the Spark component of the demo application.
---version: "3.2"services: spark-master: image: bde2020/spark-master:2.2.2-hadoop2.7 container_name: spark-master networks: - crypto-network volumes: - ./connectors:/connectors - ./:/scripts/ ports: - 8082:8080 - 7077:7077 environment: - INIT_DAEMON_STEP=false spark-worker-1: image: bde2020/spark-worker:2.2.2-hadoop2.7 container_name: spark-worker-1 networks: - crypto-network depends_on: - spark-master ports: - 8083:8081 environment: - "SPARK_MASTER=spark://spark-master:7077"networks: crypto-network: driver: "bridge"
You can start the services with:
docker-compose up
Then you can see the Spark master node setup with:
docker exec -it spark-master bash
Kafka Services
To run Kafka in a standalone mode, I needed Zookeeper and Kafka itself with some fancy environment variables. Basically, Kafka needs to find the Zookeeper client port and it needs to advertise the correct ports to Spark applications.
To run this setting I used the Confluent images. Here, I am sharing the Kafka related services block. A Confluent image already allows us to set up:
- Kafka topics by using the environment variables:
KAFKA_CREATE_TOPICS
: Topic names to be createdKAFKA_AUTO_CREATE_TOPICS_ENABLE
: Self-explaining perhapsKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
: Self-explaining perhaps
- Connection to Zookeeper using the environment variable
KAFKA_ZOOKEEPER_CONNECT
- With
KAFKA_BROKER_ID
giving a custom broker id for a particular node - Advertising the correct ports for the docker network internal services or external connections:
KAFKA_INTER_BROKER_LISTENER_NAME
: Listener name for the setupKAFKA_LISTENER_SECURITY_PROTOCOL_MAP
: Listener setup with mappingKAFKA_ADVERTISED_LISTENERS
: Listener setup for internal and external networking. This is a bit tricky, so if I consume or produce any message in the internal Docker network, with the example below I need to connect tokafka:29092
. From outside of Docker, I can use a consumer or producer vialocalhost:9092
. For more information, here is an awesome explanation.
---version: "3.2"services: zookeeper: image: confluentinc/cp-zookeeper container_name: zookeeper networks: - crypto-network environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka container_name: kafka depends_on: - zookeeper networks: - crypto-network ports: - 9092:9092 - 30001:30001 environment: KAFKA_CREATE_TOPICS: crypto_raw,crypto_latest_trends,crypto_moving_average KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100networks: crypto-network: driver: "bridge"
Prometheus Services
In this project, I wanted to scrape Kafka's logs automatically. Hence, apart from the Prometheus service itself, I needed to also use the JMX-Exporter. And I realized that it is the coolest kid in a Docker-compose.
For both Prometheus and it JMX-Exporter, I needed to use custom Dockerfiles as they require some templates to be aware of each other. I used a separate ./tools/
folder to keep my monitoring related settings. And within the ./tools/prometheus-jmx-exporter
, I had a confd
folder to make use of and configure Docker containers at run-time. Here the file structure is as follows:
. prometheus Dockerfile prometheus.yml prometheus-jmx-exporter Dockerfile confd conf.d kafka.yml.toml start-jmx-scraper.sh.toml templates kafka.yml.tmpl start-jmx-scraper.sh.tmpl entrypoint.sh
Let's start with the Prometheus image as it is more straightforward. We need to use a custom Dockerfile to get the config with custom scraper settings.
The Dockerfile will be:
FROM prom/prometheus:v2.8.1ADD ./prometheus.yml /etc/prometheus/prometheus.ymlCMD [ "--config.file=/etc/prometheus/prometheus.yml","--web.enable-admin-api" ]
And the prometheus.yml
would be pointing the following, with a scrape interval of 5 seconds. In prometheus.yml
, Prometheus targets a service called kafka-jmx-exporter
with port 8080
. Hence, in the Docker-compose, I should be using the same container name for JMX-Exporter as the targeted service.
global: scrape_interval: 5s evaluation_interval: 5sscrape_configs: - job_name: 'kafka' scrape_interval: 5s static_configs: - targets: ['kafka-jmx-exporter:8080']
To create the JMX-Exporter image, I needed more tweaks. Let's start with the Dockerfile. The image for the JMX-Exporter uses a base image from Java. Then downloads from Maven repository JMX Prometheus .jar and writes to a file with the name /opt/jmx_prometheus_httpserver/jmx_prometheus_httpserver.jar
. Next it downloads the Confd and stores in /usr/local/bin/confd
, gives execute permissions. Lastly, it copies the entrypoint
into /opt/entrypoint.sh
.
FROM java:8RUN mkdir /opt/jmx_prometheus_httpserver && wget 'https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_httpserver/0.11.0/jmx_prometheus_httpserver-0.11.0-jar-with-dependencies.jar' -O /opt/jmx_prometheus_httpserver/jmx_prometheus_httpserver.jarADD https://github.com/kelseyhightower/confd/releases/download/v0.16.0/confd-0.16.0-linux-amd64 /usr/local/bin/confdCOPY confd /etc/confdRUN chmod +x /usr/local/bin/confdCOPY entrypoint.sh /opt/entrypoint.shENTRYPOINT ["/opt/entrypoint.sh"]
In the entrypoint.sh
, I had only the execution of Confd, then running the start-jmx-scraper.sh
. Hence, after the Confd sets up the source and destination files for both Kafka and JMX Scrapers with .toml
, we run the downloaded jmx_prometheus_httpserver.jar
file. The entrypoint.sh
looks like this:
#!/bin/bash/usr/local/bin/confd -onetime -backend env/opt/start-jmx-scraper.sh
And the start-jmx-scraper.sh
is as follows, the environment variables in Docker-compose define each of the key (JMX_PORT
, JMX_HOST
, HTTP_PORT
, JMX_EXPORTER_CONFIG_FILE
) mentioned in the command:
#!/bin/bashjava \ -Dcom.sun.management.jmxremote.ssl=false \ -Djava.rmi.server.hostname={{ getv "/jmx/host" }} \ -Dcom.sun.management.jmxremote.authenticate=false \ -Dcom.sun.management.jmxremote.port={{ getv "/jmx/port" }} \ -jar /opt/jmx_prometheus_httpserver/jmx_prometheus_httpserver.jar \ {{ getv "/http/port" }} \ /opt/jmx_prometheus_httpserver/{{ getv "/jmx/exporter/config/file" }}
With the given custom Docker images for Prometheus automatically scraping Kafka, the full Docker-compose file for the demo project is as follows:
---version: "3.2"services: zookeeper: image: confluentinc/cp-zookeeper container_name: zookeeper networks: - crypto-network environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka container_name: kafka depends_on: - zookeeper networks: - crypto-network ports: - 9092:9092 - 30001:30001 environment: KAFKA_CREATE_TOPICS: crypto_raw,crypto_latest_trends,crypto_moving_average KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100 KAFKA_JMX_PORT: 30001 KAFKA_JMX_HOSTNAME: kafka kafka-jmx-exporter: build: ./tools/prometheus-jmx-exporter container_name: jmx-exporter ports: - 8080:8080 links: - kafka networks: - crypto-network environment: JMX_PORT: 30001 JMX_HOST: kafka HTTP_PORT: 8080 JMX_EXPORTER_CONFIG_FILE: kafka.yml prometheus: build: ./tools/prometheus container_name: prometheus networks: - crypto-network ports: - 9090:9090 spark-master: image: bde2020/spark-master:2.2.2-hadoop2.7 container_name: spark-master networks: - crypto-network volumes: - ./connectors:/connectors - ./:/scripts/ ports: - 8082:8080 - 7077:7077 environment: - INIT_DAEMON_STEP=setup_spark spark-worker-1: image: bde2020/spark-worker:2.2.2-hadoop2.7 container_name: spark-worker-1 networks: - crypto-network depends_on: - spark-master ports: - 8083:8081 environment: - "SPARK_MASTER=spark://spark-master:7077" producer: build: context: . dockerfile: ./Dockerfile.producer container_name: producer depends_on: - kafka networks: - crypto-networknetworks: crypto-network: driver: "bridge"
As the Docker-compose contains an additional Producer service when we run the following, we can test our Kafka topic messages per minute by checking the <IP_LOCAL>:9000
:
docker-compose up
Here the output of the Prometheus UI will be as follows:
Last Words
This was a demo project that I made for studying Watermarks and Windowing functions in Streaming Data Processing. Therefore I needed to create a custom producer for Kafka, and consume those using Spark Structured Streaming. Although the development phase of the project was super fun, I also enjoyed creating this pretty long Docker-compose example.
In case more detail is needed, I am sharing the Github repository.
Original Link: https://dev.to/nazliander/creating-a-development-environment-for-spark-structured-streaming-kafka-and-prometheus-29dl
Dev To
An online community for sharing and discovering great ideas, having debates, and making friendsMore About this Source Visit Dev To