Lab: RabbitMQ
Allows for messages to be queued, received, and delivered asynchronously ..
Last updated
Allows for messages to be queued, received, and delivered asynchronously ..
Last updated
RabbitMQ is a popular open-source message broker software that enables complex routing, message queuing, and message distribution. It facilitates a scalable and flexible way for applications to communicate with each other, decoupling producers of messages from consumers.
RabbitMQ supports multiple messaging protocols, most notably AMQP (Advanced Message Queuing Protocol). It is designed to handle high volumes of messages and allows for messages to be queued, received, and delivered asynchronously, improving the performance and reliability of applications.
One of the main RabbitMQ features is the ability to route messages to specfic queues based on a routing key.
In this Lab lets assume we have a house that has a sensor in each room constantly monitoring:
• temperature
• pressure
• humidity
To deploy the 'rabbitmq' container.
cd
cd ~/RabbitMQ
docker compose up -d
Log into RabbitMQ.
Username
guest
Password
guest
Let's publish some sensor data to our RabbitMQ broker.
Take a look at.
~/Workshop--Data-Integration/Labs/Module 3 - Data Sources/Streaming Data/03 RabbitMQ/sensor_tv_room.py
import pika
import random
import time
import json
# RabbitMQ settings
rabbitmq_host = 'localhost'
port = 5672
queue_name = 'tv_room'
# Generate random sensor data
def generate_sensor_data():
return {
'temperature': random.uniform(20.0, 100.0),
'pressure': random.uniform(800.0, 1200.0),
'humidity': random.uniform(30.0, 80.0)
}
# Publish sensor data to RabbitMQ
def publish_sensor_data():
connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_host, port=port))
channel = connection.channel()
channel.queue_declare(queue=queue_name)
while True:
sensor_data = generate_sensor_data()
message = json.dumps(sensor_data)
channel.basic_publish(exchange='', routing_key=queue_name, body=message)
print(f"Sent sensor data: {message}")
time.sleep(1)
connection.close()
if __name__ == '__main__':
publish_sensor_data()
The sensor_tv_room.py script:
• loads required libraries
• sets RabbitMQ Broker connection details
• generates random sensor data
• connects to RabbitMQ broker
• builds message - data dictionary in JSON format
• publish topic + message every 1 second
Ensure you have installed the pika Python AMQP Client Library
Execute sensor_tv_room.py script.
cd
cd ~/Workshop--Data-Integration/Labs/'Module 3 - Data Sources'/'Streaming Data'/'03 RabbitMQ'
python3 sensor_tv_room.py
pentaho@pentaho-virtual-machine:~/Workshop--Data-Integration/Labs/Module 3 - Data Sources/Streaming Data/03 RabbitMQ$ python3 sensor_tv_room.py
Sent sensor data: {"temperature": 85.6651714188232, "pressure": 1016.9116665986137, "humidity": 70.52673568794194}
Sent sensor data: {"temperature": 58.979394104149, "pressure": 970.7208711731189, "humidity": 67.3330124529212}
Sent sensor data: {"temperature": 80.72999233834662, "pressure": 1085.8291135914146, "humidity": 42.93245165447303}
Sent sensor data: {"temperature": 78.77851120347607, "pressure": 930.8217146864357, "humidity": 73.69984450730081}
Sent sensor data: {"temperature": 66.34107198977061, "pressure": 916.521247736472, "humidity": 36.34932167085602}
Check that the sensor data is being successfully published to HiveMQ.
Click on Queues & Streams
To get a message, ensure the Ack Mode: Automatic ack
Similiar to HiveMQ, lets subscribe to the tv_room topic retained in RabbitMQ and use Pentaho Data Integration to process the data.
Start Pentaho Data Integration:
cd
cd Pentaho/design-tools/data-integration
sh spoon.sh
Again .. let's Publish some sensor data ..
Execute sensor.py script.
cd
cd ~/Workshop--Data-Integration/Labs/'Module 3 - Data Sources'/'Streaming Data'/'03 RabbitMQ'
python3 sensor_tv_room.py
Log into RabbitMQ Control Center.
User
guest
Password
guest
Click on Queues & Streams.
The Advanced Message Queuing Protocol (AMQP) Consumer step receives streaming data from an AMQP message producer through an AMQP 0-9-1 compatible broker. You can configure this step to use an existing AMQP message queue or create a new one.
You can also set up the AMQP Consumer step to continuously ingest streaming data from either an AMQP message or broker to collect messages about monitored events, track user consumption of data streams, or monitor alerts. The parent AMQP Consumer step runs a child (sub-transformation) that executes according to the message batch size or duration, letting you process a continuous stream of records in near real-time.
The child transformation must start with the Get records from stream step. Additionally, you can select a step in the child transformation to stream records back to the parent transformation, which passes the records downstream to any other steps included within the same parent transformation.
Open the following transformation:
~/Workshop--Data-Integration/Labs/Module 3 - Data Sources/Streaming Data/03 RabbitMQ/tr_amqp_consumer.ktr
Double click on the AMQP Consumer step.
Setup
The options and settings in the AMQP Consumer Setup tab make it possible for you to create a new AMQP message queue the first time you run the AMQP Consumer step in a transformation. The new AMQP message queue will default to the following properties:
Durable Non auto-delete Non-exclusive When you use the AMQP Consumer to create a new queue, the broker bindings are initialized the first time you run the AMQP Consumer step in a transformation. Once you initialize the bindings, you can then start the AMQP 0-9-1 message producer before running the AMQP Consumer step. As a recommended best practice, always run the AMQP Consumer step first, before you start producing or publishing any messages through the AMQP producer.
Connection
Specify the URI address of the AMQP broker to which this step connects to ingest messages into PDI. For more information see: https://www.rabbitmq.com/uri-spec.html
Queue name
Specify the name of a new AMQP message queue from which this step will ingest messages.
The new queue will be created automatically the first time you run the transformation.
The new queue and its exchange attributes will default to the following properties:
Durable
Non auto-delete
Non-exclusive
Note: If you specify a queue name that already exists on the broker, but the existing queue has parameter settings that differ from these, or if the specified queue has an Exchange type (below) that is different, the transformation will abort.
Exchange name
Specify either a new exchange name or an existing exchange name from which to bind the queue.
If the exchange name does not already exist, it will default to the following properties:
Durable
Non auto-delete
Leave the Exchange name blank to use the DEFAULT as the Exchange type (below) and set the Exchange type to DIRECT. The AMQP Producer step will require a matching blank entry in its Setup tab for the Exchange name.
When using either DIRECT or TOPIC as the Exchange type, specify the appropriate routing key (or multiple keys) in the Routing Keys table. Routing keys are input as string names.
This step returns records that were previously generated by another transformation in a job. The records were passed to this step using either the Copy rows to result step or the Transformation Executor step. You can enter the metadata of the fields you are expecting from the previous transformation in a job.
Open the following transformation:
~/Workshop--Data-Integration/Labs/Module 3 - Data Sources/Streaming Data/03 RabbitMQ/tr_process_sensor_data.ktr
Double-click the JSON Input step and configure with the following settings.
• Pull the 'message' - records from the stream
• Add a timestamp
• Read the JSON stream
• Append a file - Project/RabbitMQ directory
Open the following files.
~/Project/RabbitMQ/tv_room_sensor.txt
Notice the records are appended