Pentaho Data Integration
InstallationBusiness AnalyticsCToolsData CatalogData QualityLLMs
  • Overview
    • Pentaho Data Integration ..
  • Data Integration
    • Getting Started
      • Configuring PDI UI
      • KETTLE Variables
    • Concepts & Terminolgy
      • Hello World
      • Logging
      • Error Handling
    • Data Sources
      • Flat Files
        • Text
          • Text File Input
          • Text File Output
        • Excel
          • Excel Writer
        • XML
          • Read XML
        • JSON
          • Read JSON
      • Databases
        • CRUID
          • Database Connections
          • Create DB
          • Read DB
          • Update DB
          • Insert / Update DB
          • Delete DB
        • SCDs
          • SCDs
      • Object Stores
        • MinIO
      • SMB
      • Big Data
        • Hadoop
          • Apache Hadoop
    • Enrich Data
      • Merge
        • Merge Streams
        • Merge Rows (diff)
      • Joins
        • Cross Join
        • Merge Join
        • Database Join
        • XML Join
      • Lookups
        • Database Lookups
      • Scripting
        • Formula
        • Modified JavaScript Value
        • User Defined Java Class
    • Enterprise Solution
      • Jobs
        • Job - Hello World
        • Backward Chaining
        • Parallel
      • Parameters & Variables
        • Parameters
        • Variables
      • Scalability
        • Run Configurations
        • Partition
      • Monitoring & Scheduling
        • Monitoring & Scheduling
      • Logging
        • Logging
      • Dockmaker
        • BA & DI Servers
      • Metadata Injection
        • MDI
    • Plugins
      • Hierarchical Data Type
  • Use Cases
    • Streaming Data
      • MQTT
        • Mosquitto
        • HiveMQ
      • AMQP
        • RabbitMQ
      • Kafka
        • Kafka
    • Machine Learning
      • Prerequiste Tasks
      • AutoML
      • Credit Card
    • RESTful API
    • Jenkins
    • GenAI
  • Reference
    • Page 1
Powered by GitBook
On this page
  1. Use Cases
  2. Streaming Data
  3. MQTT

HiveMQ

Use Case: Manufacturing ..

PreviousMosquittoNextAMQP

Last updated 1 month ago

Use Case - SCADA

Nowadays, plants use a wide selection of industrial sensors, each with a unique design, and application to collect and analyze data.

These Supervisory Control and Data Acquisition (SCADA) systems consist of both software and hardware components which enable remote and on-site gathering of data from the industrial equipment.

Pentaho Data Integration enables you to collect data from any source in real-time, augment data streams in a single interface and transform raw data into actionable manufacturing insights.

Lets take a brief look at a couple of the challenges that face implementing a SCADA system:

  • Data Silos: Brownfield factories will have manufacturing equipment and backend systems from a wide variety of vendors that produce data in proprietary formats. These formats often create data silos that hinder deep level analysis across the entire factory operation.

  • IT/OT Priorities: A successful modernization project needs to include experts from the operations side (OT) and the enterprise IT side (IT).

The goal is to connect the various functions of the factory across a standardized IIoT bus:

  • Automation area: Factory machines, sensors and gateways. Data needs to be able to flow between the machines and the sensors and gateways. The gateways are typically used to communicate with other areas in the factory architecture.

  • Manufacturing area: Systems used to control the factory equipment such as SCADA and MES systems.

  • Factory area: Systems used to manage the entire factory such as PLM (Product Lifecycle Management) and OEE (Overall Equipment Effectiveness) systems.

  • Cloud: Connectivity to the enterprise IT systems of the organization that allows for deeper integration between the OT and IT systems.

The majority of SCADA and MES systems on the market come with support for MQTT.

Unified Namespace (UNS) is a novel solution that allows you to collect data from various industrial IoT (IIoT) systems, add context to it, and transform it into a format that other systems can understand.


Remember to stop the mosquitto container.

  1. Ensure the Mosquitto Broker has been stopped.

docker stop mosquitto
  1. Copy over the required files.

cd
mkdir -p ~/Streaming/HiveMQ4 && cd "$_"
cp -R ~/Workshop--Data-Integration/Labs/'Module 7 - Workflows'/'Streaming Data'/HiveMQ/* .

Docker Network

As we're running quite a few containers on the same server, let's ensure that the containers for each How-To are isolated from each other.

docker network create -d bridge hivemq

HiveMQ Container

  • This HiveMQ deployment is not secure! It's lacking Authentication and Authorization.

  • Right now any MQTT client can connect to the broker with a full set of permissions.

  • For production usage, add an appropriate security extension and remove the hivemq-allow-all extension.

  • You can download security extensions from the HiveMQ Marketplace (https://www.hivemq.com/extensions/).

  1. Run HiveMQ Docker container.

docker run --ulimit nofile=500000:500000 --name=hivemq4 -p 9090:8080 -p 9000:9000 -p 1883:1883 --net=hivemq hivemq/hivemq4
Flag
Description

--ulimit

imits system resource amounts that individual users can consume

nofile

the maximum number of Open Files/File Descriptors this user can have at one time

--name

name of container

-p 9090

mapped container port. Exposes HiveMQ container Control Center on port 8080 to external 9090

-p 9000

mapped container port. Exposes HiveMQ container Websocket on port 9000

-p 1883

mapped container port. Exposes HiveMQ container TCP Listener on port 1883.

--net

name of isolated Docker network: hivemq

hivemq/hivemq4

Docker Hub image

  1. Log into HiveMQ Control Center.

User

admin

Password

hivemq

Let's publish some sensor data to our HiveMQ broker. Based on the factory area, the data needs to processed in a timely manner.

  • Integration of the automation and manufacturing areas (levels 0-3) requires reliable data exchange between various machines, PLCs, and sensors that have very little memory or computing power. The raw data is is used in positive and negative feedbacks to automatically trigger actuators and alerts.

  • Level 4 aggregated data is usually augmented to provide not only OT daily, weekly, monthly OEE reports but also OT across the organisation - for example the number of cars assembled based on engine / area factory output, manufacturing of chassis , and so on ..

  1. Start the HiveMQ Broker.

docker start hivemq4
  1. Take a look at:

~/Streaming/HiveMQ4/scripts/sensor.py

cd
cd ~/Streaming/HiveMQ4/scripts
cat sensor.py
# python 3.10
# Note: This script requires the 'paho-mqtt' package to be installed.
# pip3 install paho-mqtt python-etcd for V2
# pip3 install "paho-mqtt<2.0.0" for V1

import random
import time
import json
from paho.mqtt import client as mqtt_client
# username = 'emqx'  not required as HiveMQ has no security
# password = 'public'

# MQTT settings
broker = 'localhost'
port = 1883
topic = "industrial/robot/sensor"
# Generate a Client ID with the subscribe prefix.
client_id = f'python-mqtt-{random.randint(0, 1000)}'

# Connect to MQTT broker
def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
    # For paho-mqtt 2.0.0, you need to add the properties parameter.
    # def on_connect(client, userdata, flags, rc, properties):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)
    
    # Set Connecting Client ID
    # client = mqtt_client.Client(client_id)

    # For paho-mqtt 2.0.0, you need to set callback_api_version.
    client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1)

    # client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

# Publish sensor data
def publish(client):
    while True:
        temperature = random.uniform(20.0, 100.0)  # Simulate temperature sensor data
        position = {'x': random.uniform(-10.0, 10.0), 'y': random.uniform(-10.0, 10.0), 'z': random.uniform(-10.0, 10.0)}  # Simulate position sensor data
        message = json.dumps({'temperature': temperature, 'position': position})
        result = client.publish(topic, message)
        status = result[0]
        if status == 0:
            print(f"Sent `{message}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")
        time.sleep(1)

# Main function
def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)

# Execute the main function
if __name__ == '__main__':
    run()

The sensor.py script:

  • loads required libraries

  • sets HiveMQ Broker connection details

  • connects to HiveMQ MQTT broker

  • sets client_id

  • generates random sensor data

  • builds message - data dictionary in JSON format

  • publish topic + message every 1 second

Execute sensor.py

  1. Execute sensor.py script.

cd
cd ~/Streaming/HiveMQ4/scripts
python3 sensor.py
pentaho@pentaho-virtual-machine:~/Workshop--Data-Integration/Labs/Module 3 - Data Sources/Streaming Data/02 HiveMQ$ python3 subscribe.py
/home/pentaho/Workshop--Data-Integration/Labs/Module 3 - Data Sources/Streaming Data/02 HiveMQ/subscribe.py:25: DeprecationWarning: Callback API version 1 is deprecated, update to latest version
  client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1)
Connected to MQTT Broker!
Received `{"temperature": 58.80126840794128, "position": {"x": -9.382200219332637, "y": 6.256113402110607, "z": 1.3091624489302767}}` from `industrial/robot/sensor` topic
Received `{"temperature": 80.70263220990923, "position": {"x": 7.8459965093831165, "y": -0.37157016433597967, "z": -4.3821611625288455}}` from `industrial/robot/sensor` topic
Received `{"temperature": 87.65337962266882, "position": {"x": 8.240244717092018, "y": -8.806589253052007, "z": -6.419154857552536}}` from `industrial/robot/sensor` topic
Received `{"temperature": 81.6254980952253, "position": {"x": -5.722946590492448, "y": -9.03693881691259, "z": -4.436963415897499}}` from `industrial/robot/sensor` topic
  1. Check that the sensor data is being successfully published to HiveMQ.

CTRL + Z will stop the pyhton script.


Execute subscribe.py

  1. Take a look at:

~/Streaming/HiveMQ4/scripts/subscribe.py

cd
cd ~/Streaming/HiveMQ4/scripts
cat subscribe.py
# python 3.10

import random

from paho.mqtt import client as mqtt_client

# MQTT settings
broker = 'localhost'
port = 1883
topic = "industrial/robot/sensor"
# Generate a Client ID with the subscribe prefix.
client_id = f'subscribe-{random.randint(0, 100)}'
# username = 'emqx'
# password = 'public'

# Connect to MQTT broker
def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    # client = mqtt_client.Client(client_id)
    client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1)
    # client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

# Subscribe sensor data
def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

    client.subscribe(topic)
    client.on_message = on_message

# Main function
def run():
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()

if __name__ == '__main__':
    run()
  1. Execute sensor.py script.

cd
cd ~/Streaming/HiveMQ4/scripts
python3 subscribe.py
/home/pentaho/Streaming/HiveMQ4/scripts/sensor.py:34: DeprecationWarning: Callback API version 1 is deprecated, update to latest version
  client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1)
Sent `{"temperature": 65.6060308859324, "position": {"x": 5.196954639657465, "y": -5.148951217854867, "z": 7.25115088212948}}` to topic `industrial/robot/sensor`
Connected to MQTT Broker!
Sent `{"temperature": 35.29565078000626, "position": {"x": -4.7272734097139875, "y": -1.6080980541298366, "z": 1.2319865862071566}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 60.21167757015797, "position": {"x": 8.249639900333282, "y": -8.689339716964462, "z": -0.3634263425774815}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 29.38714431589844, "position": {"x": 2.680389418417885, "y": 9.119972168648552, "z": -3.499766582369035}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 62.49045227458001, "position": {"x": 8.70609818000656, "y": -4.170991834771276, "z": -5.241184819883387}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 59.84193372465427, "position": {"x": 7.965218766387451, "y": 7.143657046592622, "z": -2.240637293748426}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 29.876349256802914, "position": {"x": -5.748307038999321, "y": 0.24214771476402497, "z": 1.9866611587661822}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 70.68207314566436, "position": {"x": 2.309473028696079, "y": 0.29885174801980163, "z": 6.721634090924283}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 25.96861468369226, "position": {"x": 8.031650944821799, "y": -3.8632965993350403, "z": 9.137809914072445}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 85.06901713629799, "position": {"x": 2.258189129745281, "y": 8.693555870184714, "z": -7.451199925102903}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 77.13264612691731, "position": {"x": -9.194413714063014, "y": -8.213353835851764, "z": -9.207292840168185}}` to topic `industrial/robot/sensor`
...
  1. Take a look at the stats.

The data will also be processed within different timeframes:

Level 0: This is the level of sensors, signals, machine, and real-time capabilities. At this level, microseconds and milliseconds are extremely important.

Level 1: On the PLC (programmable logic controllers) or control level, sensing and manipulating takes place within seconds. Real-time is also important here for production processes predictability.

Level 2: This level handles process management for monitoring and supervising. Here, SCADA (supervisory control and data acquisition) systems and HMI (human-machine interface) give operators a first visualization of what’s going on within the factory.

Level 3: The next level is for MES (manufacturing execution systems). This is where manufacturing operations management takes place. Here, we move off the factory floor into the top floor and measure activity in hours.

Level 4: Finally, at the top, there are ERP (enterprise resource planning) systems for business planning and logistics. Processes on this level are typically calculated in days and months.

To achieve integration, the data flow must work between systems on the same level as well as between the levels.

Pentaho Data Integration

Lets subscribe to the industrial/robot/sensor topic retained in HiveMQ and use Pentaho Data Integration to process the data.

  1. Start Pentaho Data Integration:

cd
cd Pentaho/design-tools/data-integration
./spoon.sh

Again we'll use the sensor.py to generate the data and consume in Pentaho Data Integration.

  1. Execute sensor.py script.

cd
cd ~/Streaming/HiveMQ4/scripts
python3 sensor.py
  1. Log into HiveMQ Control Center.

User

admin

Password

admin

  1. Check the inbound connections and traffic.

  1. You can also view the stream in MQTT Explorer.

The PDI client can pull streaming data from an MQTT broker or clients through an MQTT transformation. The parent MQTT Consumer step runs a child transformation that executes according to the message batch size or duration, allowing you to process a continuous stream of records in near real-time. The child transformation must start with the Get records from stream step.

Additionally, from the MQTT Consumer step, you can select a step in the child transformation to stream records back to the parent transformation. This capability allows records processed by an MQTT Consumer step in a parent transformation to be passed downstream to any other steps included within the same parent transformation.

  1. Open the following transformation:

~/Streaming/HiveMQ4/tr_hive_consumer.ktr

  1. Double click on the MQTT Consumer step.

Transformation

${Internal.Entry.Current.Directory}/tr_process_sensor_data.ktr

This step returns records that were previously generated by another transformation in a job. The records were passed to this step using either the Copy rows to result step or the Transformation Executor step. You can enter the metadata of the fields you are expecting from the previous transformation in a job.

  1. Open the following transformation:

~/Streaming/HiveMQ4/tr_process_sensor_data.ktr

  1. Double-click on the JSON Input step and configure with the following settings.

  • Pull the 'message' - records from the stream

  • Add a timestamp

  • Read the JSON stream

  • Append a file - Project/HiveMQ directory

  1. Open the following file:

~/Streaming/HiveMQ4/HiveMQ4/output/robot_sensor.txt

Notice the records are being appended.

➡️ Next:

of the Paho Python MQTT includes breaking changes (11th Feb 2024)

➡️ Next:

For further details on the settings:

➡️ Next:

Release 2.0.0
Generate industrial robot sensor data
Consume the data stream in PDI
Process the sensor records
HiveMQ Introduction :: HiveMQ Documentation
Link to HiveMQ documentation
http://localhost:9090localhost
Link to HiveMQ Control Center
http://localhost:9090localhost
Link to HiveMQ Control Center
https://localhost:9443/#!/homelocalhost
Link to Portainer
MQTT Consumer
SCADA
UNS
HiveMQ Control Center
HiveMQ - Dashboard
Inbound - Outbound messages
HiveMQ Control Center - Connections
MQTT Explorer - industrial / robot / sensor
MQTT Consumer - industrial/robot/sensor
Get records - industrial/robot/sensor
JSON Input - Fields
robot_sensor
Logo