Nowadays, plants use a wide selection of industrial sensors, each with a unique design, and application to collect and analyze data.
These Supervisory Control and Data Acquisition (SCADA) systems consist of both software and hardware components which enable remote and on-site gathering of data from the industrial equipment.
Pentaho Data Integration enables you to collect data from any source in real-time, augment data streams in a single interface and transform raw data into actionable manufacturing insights.
Lets take a brief look at a couple of the challenges that face implementing a SCADA system:
Data Silos: Brownfield factories will have manufacturing equipment and backend systems from a wide variety of vendors that produce data in proprietary formats. These formats often create data silos that hinder deep level analysis across the entire factory operation.
IT/OT Priorities: A successful modernization project needs to include experts from the operations side (OT) and the enterprise IT side (IT).
The goal is to connect the various functions of the factory across a standardized IIoT bus:
Automation area: Factory machines, sensors and gateways. Data needs to be able to flow between the machines and the sensors and gateways. The gateways are typically used to communicate with other areas in the factory architecture.
Manufacturing area: Systems used to control the factory equipment such as SCADA and MES systems.
Factory area: Systems used to manage the entire factory such as PLM (Product Lifecycle Management) and OEE (Overall Equipment Effectiveness) systems.
Cloud: Connectivity to the enterprise IT systems of the organization that allows for deeper integration between the OT and IT systems.
The majority of SCADA and MES systems on the market come with support for MQTT.
Unified Namespace (UNS) is a novel solution that allows you to collect data from various industrial IoT (IIoT) systems, add context to it, and transform it into a format that other systems can understand.
Remember to stop the mosquitto container.
Ensure the Mosquitto Broker has been stopped.
docker stop mosquitto
Copy over the required files.
cd
mkdir -p ~/Streaming/HiveMQ4 && cd "$_"
cp -R ~/Workshop--Data-Integration/Labs/'Module 7 - Workflows'/'Streaming Data'/HiveMQ/* .
Docker Network
As we're running quite a few containers on the same server, let's ensure that the containers for each How-To are isolated from each other.
docker network create -d bridge hivemq
HiveMQ Container
This HiveMQ deployment is not secure! It's lacking Authentication and Authorization.
Right now any MQTT client can connect to the broker with a full set of permissions.
For production usage, add an appropriate security extension and remove the hivemq-allow-all extension.
You can download security extensions from the HiveMQ Marketplace (https://www.hivemq.com/extensions/).
Let's publish some sensor data to our HiveMQ broker. Based on the factory area, the data needs to processed in a timely manner.
Integration of the automation and manufacturing areas (levels 0-3) requires reliable data exchange between various machines, PLCs, and sensors that have very little memory or computing power. The raw data is is used in positive and negative feedbacks to automatically trigger actuators and alerts.
Level 4 aggregated data is usually augmented to provide not only OT daily, weekly, monthly OEE reports but also OT across the organisation - for example the number of cars assembled based on engine / area factory output, manufacturing of chassis , and so on ..
Start the HiveMQ Broker.
docker start hivemq4
Take a look at:
~/Streaming/HiveMQ4/scripts/sensor.py
cd
cd ~/Streaming/HiveMQ4/scripts
cat sensor.py
# python 3.10
# Note: This script requires the 'paho-mqtt' package to be installed.
# pip3 install paho-mqtt python-etcd for V2
# pip3 install "paho-mqtt<2.0.0" for V1
import random
import time
import json
from paho.mqtt import client as mqtt_client
# username = 'emqx' not required as HiveMQ has no security
# password = 'public'
# MQTT settings
broker = 'localhost'
port = 1883
topic = "industrial/robot/sensor"
# Generate a Client ID with the subscribe prefix.
client_id = f'python-mqtt-{random.randint(0, 1000)}'
# Connect to MQTT broker
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
# For paho-mqtt 2.0.0, you need to add the properties parameter.
# def on_connect(client, userdata, flags, rc, properties):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# Set Connecting Client ID
# client = mqtt_client.Client(client_id)
# For paho-mqtt 2.0.0, you need to set callback_api_version.
client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1)
# client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
# Publish sensor data
def publish(client):
while True:
temperature = random.uniform(20.0, 100.0) # Simulate temperature sensor data
position = {'x': random.uniform(-10.0, 10.0), 'y': random.uniform(-10.0, 10.0), 'z': random.uniform(-10.0, 10.0)} # Simulate position sensor data
message = json.dumps({'temperature': temperature, 'position': position})
result = client.publish(topic, message)
status = result[0]
if status == 0:
print(f"Sent `{message}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
time.sleep(1)
# Main function
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
# Execute the main function
if __name__ == '__main__':
run()
The sensor.py script:
loads required libraries
sets HiveMQ Broker connection details
connects to HiveMQ MQTT broker
sets client_id
generates random sensor data
builds message - data dictionary in JSON format
publish topic + message every 1 second
Execute sensor.py
Release 2.0.0 of the Paho Python MQTT includes breaking changes (11th Feb 2024)
Execute sensor.py script.
cd
cd ~/Streaming/HiveMQ4/scripts
python3 sensor.py
pentaho@pentaho-virtual-machine:~/Workshop--Data-Integration/Labs/Module 3 - Data Sources/Streaming Data/02 HiveMQ$ python3 subscribe.py
/home/pentaho/Workshop--Data-Integration/Labs/Module 3 - Data Sources/Streaming Data/02 HiveMQ/subscribe.py:25: DeprecationWarning: Callback API version 1 is deprecated, update to latest version
client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1)
Connected to MQTT Broker!
Received `{"temperature": 58.80126840794128, "position": {"x": -9.382200219332637, "y": 6.256113402110607, "z": 1.3091624489302767}}` from `industrial/robot/sensor` topic
Received `{"temperature": 80.70263220990923, "position": {"x": 7.8459965093831165, "y": -0.37157016433597967, "z": -4.3821611625288455}}` from `industrial/robot/sensor` topic
Received `{"temperature": 87.65337962266882, "position": {"x": 8.240244717092018, "y": -8.806589253052007, "z": -6.419154857552536}}` from `industrial/robot/sensor` topic
Received `{"temperature": 81.6254980952253, "position": {"x": -5.722946590492448, "y": -9.03693881691259, "z": -4.436963415897499}}` from `industrial/robot/sensor` topic
Check that the sensor data is being successfully published to HiveMQ.
CTRL + Z will stop the pyhton script.
Execute subscribe.py
Take a look at:
~/Streaming/HiveMQ4/scripts/subscribe.py
cd
cd ~/Streaming/HiveMQ4/scripts
cat subscribe.py
# python 3.10
import random
from paho.mqtt import client as mqtt_client
# MQTT settings
broker = 'localhost'
port = 1883
topic = "industrial/robot/sensor"
# Generate a Client ID with the subscribe prefix.
client_id = f'subscribe-{random.randint(0, 100)}'
# username = 'emqx'
# password = 'public'
# Connect to MQTT broker
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# client = mqtt_client.Client(client_id)
client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1)
# client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
# Subscribe sensor data
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
# Main function
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()
Execute sensor.py script.
cd
cd ~/Streaming/HiveMQ4/scripts
python3 subscribe.py
/home/pentaho/Streaming/HiveMQ4/scripts/sensor.py:34: DeprecationWarning: Callback API version 1 is deprecated, update to latest version
client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1)
Sent `{"temperature": 65.6060308859324, "position": {"x": 5.196954639657465, "y": -5.148951217854867, "z": 7.25115088212948}}` to topic `industrial/robot/sensor`
Connected to MQTT Broker!
Sent `{"temperature": 35.29565078000626, "position": {"x": -4.7272734097139875, "y": -1.6080980541298366, "z": 1.2319865862071566}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 60.21167757015797, "position": {"x": 8.249639900333282, "y": -8.689339716964462, "z": -0.3634263425774815}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 29.38714431589844, "position": {"x": 2.680389418417885, "y": 9.119972168648552, "z": -3.499766582369035}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 62.49045227458001, "position": {"x": 8.70609818000656, "y": -4.170991834771276, "z": -5.241184819883387}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 59.84193372465427, "position": {"x": 7.965218766387451, "y": 7.143657046592622, "z": -2.240637293748426}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 29.876349256802914, "position": {"x": -5.748307038999321, "y": 0.24214771476402497, "z": 1.9866611587661822}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 70.68207314566436, "position": {"x": 2.309473028696079, "y": 0.29885174801980163, "z": 6.721634090924283}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 25.96861468369226, "position": {"x": 8.031650944821799, "y": -3.8632965993350403, "z": 9.137809914072445}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 85.06901713629799, "position": {"x": 2.258189129745281, "y": 8.693555870184714, "z": -7.451199925102903}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 77.13264612691731, "position": {"x": -9.194413714063014, "y": -8.213353835851764, "z": -9.207292840168185}}` to topic `industrial/robot/sensor`
...
The data will also be processed within different timeframes:
Level 0: This is the level of sensors, signals, machine, and real-time capabilities. At this level, microseconds and milliseconds are extremely important.
Level 1: On the PLC (programmable logic controllers) or control level, sensing and manipulating takes place within seconds. Real-time is also important here for production processes predictability.
Level 2: This level handles process management for monitoring and supervising. Here, SCADA (supervisory control and data acquisition) systems and HMI (human-machine interface) give operators a first visualization of what’s going on within the factory.
Level 3: The next level is for MES (manufacturing execution systems). This is where manufacturing operations management takes place. Here, we move off the factory floor into the top floor and measure activity in hours.
Level 4: Finally, at the top, there are ERP (enterprise resource planning) systems for business planning and logistics. Processes on this level are typically calculated in days and months.
To achieve integration, the data flow must work between systems on the same level as well as between the levels.
Pentaho Data Integration
Lets subscribe to the industrial/robot/sensor topic retained in HiveMQ and use Pentaho Data Integration to process the data.
Start Pentaho Data Integration:
cd
cd Pentaho/design-tools/data-integration
./spoon.sh
Again we'll use the sensor.py to generate the data and consume in Pentaho Data Integration.
Execute sensor.py script.
cd
cd ~/Streaming/HiveMQ4/scripts
python3 sensor.py
Log into HiveMQ Control Center.
User
admin
Password
admin
Check the inbound connections and traffic.
You can also view the stream in MQTT Explorer.
The PDI client can pull streaming data from an MQTT broker or clients through an MQTT transformation. The parent MQTT Consumer step runs a child transformation that executes according to the message batch size or duration, allowing you to process a continuous stream of records in near real-time. The child transformation must start with the Get records from stream step.
Additionally, from the MQTT Consumer step, you can select a step in the child transformation to stream records back to the parent transformation. This capability allows records processed by an MQTT Consumer step in a parent transformation to be passed downstream to any other steps included within the same parent transformation.
This step returns records that were previously generated by another transformation in a job. The records were passed to this step using either the Copy rows to result step or the Transformation Executor step. You can enter the metadata of the fields you are expecting from the previous transformation in a job.
Open the following transformation:
~/Streaming/HiveMQ4/tr_process_sensor_data.ktr
Double-click on the JSON Input step and configure with the following settings.