Pentaho Data Integration
InstallationBusiness AnalyticsCToolsData CatalogData QualityLLMs
  • Overview
    • Pentaho Data Integration ..
  • Data Integration
    • Getting Started
      • Configuring PDI UI
      • KETTLE Variables
    • Concepts & Terminolgy
      • Hello World
      • Logging
      • Error Handling
    • Data Sources
      • Flat Files
        • Text
          • Text File Input
          • Text File Output
        • Excel
          • Excel Writer
        • XML
          • Read XML
        • JSON
          • Read JSON
      • Databases
        • CRUID
          • Database Connections
          • Create DB
          • Read DB
          • Update DB
          • Insert / Update DB
          • Delete DB
        • SCDs
          • SCDs
      • Object Stores
        • MinIO
      • SMB
      • Big Data
        • Hadoop
          • Apache Hadoop
    • Enrich Data
      • Merge
        • Merge Streams
        • Merge Rows (diff)
      • Joins
        • Cross Join
        • Merge Join
        • Database Join
        • XML Join
      • Lookups
        • Database Lookups
      • Scripting
        • Formula
        • Modified JavaScript Value
        • User Defined Java Class
    • Enterprise Solution
      • Jobs
        • Job - Hello World
        • Backward Chaining
        • Parallel
      • Parameters & Variables
        • Parameters
        • Variables
      • Scalability
        • Run Configurations
        • Partition
      • Monitoring & Scheduling
        • Monitoring & Scheduling
      • Logging
        • Logging
      • Dockmaker
        • BA & DI Servers
      • Metadata Injection
        • MDI
    • Plugins
      • Hierarchical Data Type
  • Use Cases
    • Streaming Data
      • MQTT
        • Mosquitto
        • HiveMQ
      • AMQP
        • RabbitMQ
      • Kafka
        • Kafka
    • Machine Learning
      • Prerequiste Tasks
      • AutoML
      • Credit Card
    • RESTful API
    • Jenkins
    • GenAI
  • Reference
    • Page 1
Powered by GitBook
On this page
  1. Use Cases
  2. Streaming Data
  3. MQTT

Mosquitto

Use Case: Predictive maintainance ..

PreviousMQTTNextHiveMQ

Last updated 1 month ago

Use Case - Logistics

Eclipse Mosquitto is an open source (EPL/EDL licensed) message broker that implements the MQTT protocol versions 5.0, 3.1.1 and 3.1.

Mosquitto is lightweight and is suitable for use on all devices from low power single board computers to full servers.

As a Logistics company, one of the requirements is to track / monitor our delivery trucks for:

Predictive maintenance

Optimize delivery routes

  • Each truck is fitted with a sensor which streams data in a Topic over GSM:

(Global System for Mobile Communication – using MQTT)

  • End point is a MQTT Broker – Mosquitto.

  • Pentaho Data Integration subscribes to that Topic

Deploys an insecure mosquitto container on localhost:1883

  1. Copy over the required files.

cd
mkdir -p ~/Streaming/Mosquitto && cd "$_"
cp -R ~/Workshop--Data-Integration/Labs/'Module 7 - Workflows'/'Streaming Data'/MQTT/* .
  1. To deploy the 'mosquitto' container.

cd
cd ~/Streaming/Mosquitto/scripts
docker compose up -d
  1. Check the container has been successfully deployed. Log into Portainer.

Username

admin

Password

Portainer123


MQTT Explorer

MQTT Explorer is a comprehensive MQTT client that provides a structured overview of your MQTT topics.

MQTT Explorer has already been installed and configured.

  1. Install MQTTExplorer.

cd
snap install mqtt-explorer
  1. Click on the MQTT Explorer button in the taskbar.

  2. Enter the following connection details:

Name

mosquitto

Protocol

mqtt://

Host

localhost

Port

1883

  1. Click save & Connect

  2. You can test publishing a message to the broker.

The message will appear at the bottom..

As there isn't a truck .. we'll need to generate IoT sensor data.

  1. Start Pentaho Data Integration:

cd
cd ~/Pentaho/design-tools/data-integration
./spoon.sh

The MQTT Producer step allows you to publish messages in near-real-time to an MQTT broker. Within a transformation, the MQTT Producer step publishes a stream of records to one MQTT topic.

  1. Open the following transformation:

~/Streaming/Mosquitto/solution/tr_mqtt_producer.ktr

  • just for 1 vehicle_id 111 - every 5 seconds

  • timestamp added

  • remove some fields

  • javascript to generate sensor data

  • dummy step to collect data streams

  • concat the fields into a 'message' payload

  • MQTT Producer - connect to broker & publish message / payload

  1. Double-click on MQTT producer step and configure with the following settings.

Setup

In the Setup tab, define the connection to the MQTT server, the topics for publishing, Quality of Service level, and the Message field.

Option
Description

Connection

Specify the address of the MQTT server to which this step will connect for sending or receiving messages.

Client ID

Specify a unique ID for the MQTT client. The MQTT server uses this Client ID to recognize each distinct client and that client's current state.

Topic

Specify the Topic name using one of the following methods:

  • Select Specify topic to enter a specific Topic name. Then, in the Topic name field, enter the name of the MQTT topic to which you want to publish streaming data (messages). Each MQTT Producer step will start a single thread for publishing.

  • Select Get data from field to specify a Topic name based on a field from another step which is generating rows in the same transformation stream. Using the dropdown list, then select the name of the field you want to use. You can use this option to dynamically control setting the value for the Topic name. Each individual message will still have only one topic, but each row coming into the MQTT Producer step will generate a new message with a potentially different topic.

Quality of Service (QoS)

Quality of Service (QoS) is a level of guarantee for message delivery. Select one of the following options.

  • At most once (0), which is the default value

  • At least once (1)

  • Exactly once (2)

Message Field

Select the individual record contained in a topic that you want to send.

Security

Option
Description

Username

Specify the user name required to access the MQTT server.

Password

Specify the password associated with the Username.

Use secure protocol

Select this option if you want to define SSL properties for the connection.

SSL Properties

ssl.contextProviderSpecify the underlying JSSE provider.ssl.enabledCipherSuitesSpecify which ciphers are enabled. Values are dependent on the provider.ssl.keyManagerSpecify the algorithm that will be used to create a KeyManagerFactory object instead of using the default algorithm available in the platform.ssl.keyStoreSpecify the name of the file that contains the KeyStore object that you want the KeyManager to use.ssl.keyStorePasswordSpecify the password for the KeyStore object that you want the KeyManager to use.ssl.keyStoreProviderSpecify the identifying name or string for the key store provider.ssl.keyStoreTypeSpecify the identifying name or string for the type of key store.ssl.protocolSpecify the type of SSL protocol to use.ssl.trustManagerSpecify the algorithm that will be used to create a TrustManagerFactory object, instead of using the default algorithm available in the platform.ssl.trustStoreSpecify the name of the file that contains the KeyStore object that you want the TrustManager to use.ssl.trustStorePasswordSpecify the password for the TrustStore object that you want the TrustManager to use.ssl.trustStoreProviderSpecify the identifier or string for the trust store provider.ssl.trustStoreTypeSpecify the type of KeyStore object that you want the TrustManager to use.

Options

Parameter
Description

Keep Alive Interval

Specify a maximum number of interval seconds that is permitted to elapse between the point at which the PDI client finishes transmitting one Control Packet and the point it starts sending the next.

Max Inflight

Specify a number for the maximum number of messages to have in process at any given time.

Connection Timeout

Specify the time, in seconds, to disconnect if a message is not received.

Clean Session

Specify if the broker will store or purge messages for a session. Select one of the following.

TrueWhen set to True, the broker will not store any information for the client. All information from a previous persistent session will be purged.FalseWhen set to False, the broker will store all subscriptions for the client. When the QoS (Quality of Service) parameter is set to either 1 or 2, all missed messages will be stored.

Storage Level

Indicates if messages are stored in memory or on disk.

  • The default (blank) is memory.

  • For disk, enter a valid path.

Server URIs

Specify the MQTT server’s universal resource identifier (URI).

MQTT Version

Specify the MQTT protocol version that this step is connecting to.

Automatic Reconnect

Enables the client to attempt an automatic re-connect to the server if it becomes disconnected. Select True or False:

TrueYes, attempt to reconnect to the server.FalseNo, do not attempt to reconnect to the server.


MQTT Explorer

  1. Launch MQTT Explorer.

  2. Create a connection & Connect to MQTT Broker.


RUN Transformation

  1. RUN the transformation.

  2. Click on Topic: SensorData in MQTT Explorer.

  3. Expand History.

The PDI client can pull streaming data from an MQTT broker or clients through an MQTT transformation. The parent MQTT Consumer step runs a child transformation that executes according to the message batch size or duration, allowing you to process a continuous stream of records in near real-time. The child transformation must start with the Get records from stream step.

Additionally, from the MQTT Consumer step, you can select a step in the child transformation to stream records back to the parent transformation. This capability allows records processed by an MQTT Consumer step in a parent transformation to be passed downstream to any other steps included within the same parent transformation.

  1. Open the following transformation:

~/Streaming/Mosquitto/solution/tr_mqtt_consumer.ktr

  1. Double click on the MQTT Consumer step and configure with the following settings.

  1. Enter the following information for the Step Name and Transformation fields.

Option
Description

Step name

Specifies the unique name of the step on the canvas. The Step name is set to MQTT Consumer by default.

Transformation

Specify the child transformation to execute by performing any of the following actions:

  • Entering its path

  • Clicking Browse to select an existing child transformation

  • Clicking New to create and save a new child transformation.

If you select a transformation that has the same root path as the current transformation, the variable ${Internal.Entry.Current.Directory} is automatically inserted in place of the common root path.

For example, if the current transformation's path is: /home/admin/transformation.ktr and you select a transformation in the directory /home/admin/path/sub.ktr, then the path is automatically converted to: ${Internal.Entry.Current.Directory}/path/sub.ktr

If you are working with a repository, you must specify the name of the transformation. If you are not working with a repository, you must specify the XML file name of the transformation.

Transformations previously specified by reference are automatically converted to be specified by the transformation name in the Pentaho Repository.

Transformation:

${Internal.Entry.Current.Directory}/tr_process_sensor_data.ktr

Batch

Use this tab to designate how many messages to consume before processing. You can specify message count and/or a specific amount of time.

How many messages consumed before processing is defined by either the Duration (ms) or the Number of records option. Messages are consumed when either the specified duration or number of records occur. For example, if Duration (ms) is set to 1000 milliseconds and Number of records is 1000, messages are consumed for processing whenever time intervals of 1000 milliseconds are reached or 1000 records have been received. If you set either option to zero, PDI will ignore that parameter.

You can also specify the maximum number of batches used to collect records at the same time.

Option
Description

Duration (ms)

Specify a time in milliseconds. This value is the amount of time the step will spend collecting records prior to the execution of the transformation.

If this option set to a value of 0, then Number of records triggers consumption. Either the Duration or the Number of records option must contain a value greater than 0 to run the transformation.

Number of records

Specify a number. After every ‘X’ number of records, the specified transformation will be executed and these ‘X’ records will be passed to the transformation.

If this option set to a value of 0 then Duration triggers consumption. Either the Duration or the Number of records option must contain a value greater than 0 to run the transformation.

Maximum concurrent batches

Specify the maximum number of batches used to collect records at the same time. The default value is 1, which indicates a single batch is used for collecting records.

This option should only be used when your consumer step cannot keep pace with the speed at which the data is streaming. Your computing environment must have adequate CPU and memory for this implementation. An error will occur if your environment cannot handle the maximum number of concurrent batches specified.

Message prefetch limit

Specify a limit for how many incoming messages this step will queue for processing, as they are received from the broker. Setting this value forces the broker to manage the backpressure of messages exceeding the specified limit. The default number of messages to queue is 100000.

Results fields

Use this tab to select the step from the child transformation that will stream records back to the parent transformation. This capability allows records processed by an MQTT Consumer step in the parent transformation to be passed downstream to any other steps included within the same parent transformation.

Option
Description

Return fields from:

Select the name of the step (from the child transformation) that will stream fields back to the parent transformation. The data values in these returned fields will be available to any subsequent downstream steps in the parent transformation.

This step returns records that were previously generated by another transformation in a job. The records were passed to this step using either the Copy rows to result step or the Transformation Executor step. You can enter the metadata of the fields you are expecting from the previous transformation in a job.

  1. Open the following transformation:

~/Streaming/Mosquitto/solution/tr_process_sensor_data.ktr

  • Pull the 'message' - records from the stream

  • Add a timestamp

  • Resolve the sensor_type

  • depending on alert (minor or major) filter the record

  • group the records to count the alerts

  • look up the driver

  • append a file - Project/MQTT directory

  1. Open the following files:

~/Streaming/Mosquitto/solution/output/major_alert.txt

~/Streaming/Mosquitto/solution/output/major_alert.txt

Notice the records are appended

➡️ Next:

➡️ Next:

➡️ Next:

MQTT Use Case using PDI
Consume the SensorData stream
Transformation to process the datastream
Eclipse MosquittoEclipse Mosquitto
Link to Mosquitto documentation
https://localhost:9443localhost
Link to Portainer123
MQTT ExplorerThomas Nordquist
Link to MQTT Explorer
Logistics
Mosquitto
mosquitto container
MQTT Connection
Publish test message to Topic: Test
MQTT Producer
MQTT Producer
mosquitto connection
messages - SensorData
MQTT Consumer
MQTT Consumer
Process the SensorData
major_alert
minor_alert
Logo