Pentaho Data Integration
InstallationBusiness AnalyticsCToolsData CatalogData QualityLLMs
  • Overview
    • Pentaho Data Integration ..
  • Data Integration
    • Getting Started
      • Configuring PDI UI
      • KETTLE Variables
    • Concepts & Terminolgy
      • Hello World
      • Logging
      • Error Handling
    • Data Sources
      • Flat Files
        • Text
          • Text File Input
          • Text File Output
        • Excel
          • Excel Writer
        • XML
          • Read XML
        • JSON
          • Read JSON
      • Databases
        • CRUID
          • Database Connections
          • Create DB
          • Read DB
          • Update DB
          • Insert / Update DB
          • Delete DB
        • SCDs
          • SCDs
      • Object Stores
        • MinIO
      • SMB
      • Big Data
        • Hadoop
          • Apache Hadoop
    • Enrich Data
      • Merge
        • Merge Streams
        • Merge Rows (diff)
      • Joins
        • Cross Join
        • Merge Join
        • Database Join
        • XML Join
      • Lookups
        • Database Lookups
      • Scripting
        • Formula
        • Modified JavaScript Value
        • User Defined Java Class
    • Enterprise Solution
      • Jobs
        • Job - Hello World
        • Backward Chaining
        • Parallel
      • Parameters & Variables
        • Parameters
        • Variables
      • Scalability
        • Run Configurations
        • Partition
      • Monitoring & Scheduling
        • Monitoring & Scheduling
      • Logging
        • Logging
      • Dockmaker
        • BA & DI Servers
      • Metadata Injection
        • MDI
    • Plugins
      • Hierarchical Data Type
  • Use Cases
    • Streaming Data
      • MQTT
        • Mosquitto
        • HiveMQ
      • AMQP
        • RabbitMQ
      • Kafka
        • Kafka
    • Machine Learning
      • Prerequiste Tasks
      • AutoML
      • Credit Card
    • RESTful API
    • Jenkins
    • GenAI
  • Reference
    • Page 1
Powered by GitBook
On this page
  1. Use Cases
  2. Streaming Data
  3. AMQP

RabbitMQ

Allows for messages to be queued, received, and delivered asynchronously ..

PreviousAMQPNextKafka

Last updated 1 month ago

RabbitMQ is a popular open-source message broker software that enables complex routing, message queuing, and message distribution. It facilitates a scalable and flexible way for applications to communicate with each other, decoupling producers of messages from consumers.

RabbitMQ supports multiple messaging protocols, most notably AMQP (Advanced Message Queuing Protocol). It is designed to handle high volumes of messages and allows for messages to be queued, received, and delivered asynchronously, improving the performance and reliability of applications.

Installation

One of the main RabbitMQ features is the ability to route messages to specific queues based on a routing key.

In this workshop lets assume we have a house that has a sensor in each room constantly monitoring:

  • temperature

  • pressure

  • humidity

  1. To deploy the 'rabbitmq' container.

cd
cd ~/RabbitMQ
docker compose up -d
  1. Log into RabbitMQ.

Username

guest

Password

guest

Sensor

Let's publish some sensor data to our RabbitMQ broker.

  1. Take a look at.

    ~/Workshop--Data-Integration/Labs/Module 3 - Data Sources/Streaming Data/03 RabbitMQ/sensor_tv_room.py

import pika
import random
import time
import json

# RabbitMQ settings
rabbitmq_host = 'localhost'
port = 5672
queue_name = 'tv_room'

# Generate random sensor data
def generate_sensor_data():
    return {
        'temperature': random.uniform(20.0, 100.0),
        'pressure': random.uniform(800.0, 1200.0),
        'humidity': random.uniform(30.0, 80.0)
    }

# Publish sensor data to RabbitMQ
def publish_sensor_data():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_host, port=port))
    channel = connection.channel()
    channel.queue_declare(queue=queue_name)

    while True:
        sensor_data = generate_sensor_data()
        message = json.dumps(sensor_data)
        channel.basic_publish(exchange='', routing_key=queue_name, body=message)
        print(f"Sent sensor data: {message}")
        time.sleep(1)

    connection.close()

if __name__ == '__main__':
    publish_sensor_data()

The sensor_tv_room.py script:

  • loads required libraries

  • sets RabbitMQ Broker connection details

  • generates random sensor data

  • connects to RabbitMQ broker

  • builds message - data dictionary in JSON format

  • publish topic + message every 1 second


Execute sensor.py

  1. Execute sensor_tv_room.py script.

cd
cd ~/Workshop--Data-Integration/Labs/'Module 3 - Data Sources'/'Streaming Data'/'03 RabbitMQ'
python3 sensor_tv_room.py
pentaho@pentaho-virtual-machine:~/Workshop--Data-Integration/Labs/Module 3 - Data Sources/Streaming Data/03 RabbitMQ$ python3 sensor_tv_room.py
Sent sensor data: {"temperature": 85.6651714188232, "pressure": 1016.9116665986137, "humidity": 70.52673568794194}
Sent sensor data: {"temperature": 58.979394104149, "pressure": 970.7208711731189, "humidity": 67.3330124529212}
Sent sensor data: {"temperature": 80.72999233834662, "pressure": 1085.8291135914146, "humidity": 42.93245165447303}
Sent sensor data: {"temperature": 78.77851120347607, "pressure": 930.8217146864357, "humidity": 73.69984450730081}
Sent sensor data: {"temperature": 66.34107198977061, "pressure": 916.521247736472, "humidity": 36.34932167085602}

Check that the sensor data is being successfully published to HiveMQ.

  1. Click on Queues & Streams

  1. To get a message, ensure the Ack Mode: Automatic ack

Similiar to HiveMQ, lets subscribe to the tv_room topic retained in RabbitMQ and use Pentaho Data Integration to process the data.

  1. Start Pentaho Data Integration:

cd
cd Pentaho/design-tools/data-integration
sh spoon.sh

Again .. let's Publish some sensor data ..

  1. Execute sensor.py script.

cd
cd ~/Workshop--Data-Integration/Labs/'Module 3 - Data Sources'/'Streaming Data'/'03 RabbitMQ'
python3 sensor_tv_room.py
  1. Log into RabbitMQ Control Center.

User

guest

Password

guest

  1. Click on Queues & Streams.

The Advanced Message Queuing Protocol (AMQP) Consumer step receives streaming data from an AMQP message producer through an AMQP 0-9-1 compatible broker. You can configure this step to use an existing AMQP message queue or create a new one.

You can also set up the AMQP Consumer step to continuously ingest streaming data from either an AMQP message or broker to collect messages about monitored events, track user consumption of data streams, or monitor alerts. The parent AMQP Consumer step runs a child (sub-transformation) that executes according to the message batch size or duration, letting you process a continuous stream of records in near real-time.

The child transformation must start with the Get records from stream step. Additionally, you can select a step in the child transformation to stream records back to the parent transformation, which passes the records downstream to any other steps included within the same parent transformation.

  1. Open the following transformation:

~/Workshop--Data-Integration/Labs/Module 3 - Data Sources/Streaming Data/03 RabbitMQ/tr_amqp_consumer.ktr

  1. Double click on the AMQP Consumer step.


Setup

The options and settings in the AMQP Consumer Setup tab make it possible for you to create a new AMQP message queue the first time you run the AMQP Consumer step in a transformation. The new AMQP message queue will default to the following properties:

Durable Non auto-delete Non-exclusive When you use the AMQP Consumer to create a new queue, the broker bindings are initialized the first time you run the AMQP Consumer step in a transformation. Once you initialize the bindings, you can then start the AMQP 0-9-1 message producer before running the AMQP Consumer step. As a recommended best practice, always run the AMQP Consumer step first, before you start producing or publishing any messages through the AMQP producer.

Option
Description

Connection

Queue name

Specify the name of a new AMQP message queue from which this step will ingest messages.

The new queue will be created automatically the first time you run the transformation.

The new queue and its exchange attributes will default to the following properties:

  • Durable

  • Non auto-delete

  • Non-exclusive

Note: If you specify a queue name that already exists on the broker, but the existing queue has parameter settings that differ from these, or if the specified queue has an Exchange type (below) that is different, the transformation will abort.

Exchange name

Specify either a new exchange name or an existing exchange name from which to bind the queue.

If the exchange name does not already exist, it will default to the following properties:

  • Durable

  • Non auto-delete

Routing Keys

When using either DIRECT or TOPIC as the Exchange type, specify the appropriate routing key (or multiple keys) in the Routing Keys table. Routing keys are input as string names.

This step returns records that were previously generated by another transformation in a job. The records were passed to this step using either the Copy rows to result step or the Transformation Executor step. You can enter the metadata of the fields you are expecting from the previous transformation in a job.

  1. Open the following transformation:

~/Workshop--Data-Integration/Labs/Module 3 - Data Sources/Streaming Data/03 RabbitMQ/tr_process_sensor_data.ktr

  1. Double-click the JSON Input step and configure with the following settings.

  • Pull the 'message' - records from the stream

  • Add a timestamp

  • Read the JSON stream

  • Append a file - Project/RabbitMQ directory

  1. Open the following files.

~/Project/RabbitMQ/tv_room_sensor.txt

Notice the records are appended

Ensure you have installed the Python AMQP Client Library

Specify the URI address of the AMQP broker to which this step connects to ingest messages into PDI. For more information see:

Leave the Exchange name blank to use the DEFAULT as the Exchange type (below) and set the Exchange type to DIRECT. The step will require a matching blank entry in its for the Exchange name.

pika
https://www.rabbitmq.com/uri-spec.html
AMQP Producer
Setup tab
http://localhost:15672localhost
Link to RabbitMQ
http://localhost:15672localhost
Link to RabbitMQ Control Center
Deploy RabbitMQ container
RabitMQ UI
RabbitMQ - tv_room queue
Get message
tv_room queue
AMQP Consumer
tr_process_sensor_data
JSON Input - Fields
tv_room_sensor