Nowadays, plants use a wide selection of industrial sensors, each with a unique design, and application to collect and analyze data.
These Supervisory Control and Data Acquisition (SCADA) systems consist of both software and hardware components which enable remote and on-site gathering of data from the industrial equipment.
Pentaho Data Integration enables you to collect data from any source in real-time, augment data streams in a single interface and transform raw data into actionable manufacturing insights.
Lets take a brief look at a couple of the challenges that face implementing a SCADA system:
• Data Silos: Brownfield factories will have manufacturing equipment and backend systems from a wide variety of vendors that produce data in proprietary formats. These formats often create data silos that hinder deep level analysis across the entire factory operation.
• IT/OT Priorities: A successful modernization project needs to include experts from the operations side (OT) and the enterprise IT side (IT).
The goal is to connect the various functions of the factory across a standardized IIoT bus:
• Automation area: Factory machines, sensors and gateways. Data needs to be able to flow between the machines and the sensors and gateways. The gateways are typically used to communicate with other areas in the factory architecture.
• Manufacturing area: Systems used to control the factory equipment such as SCADA and MES systems.
• Factory area: Systems used to manage the entire factory such as PLM (Product Lifecycle Management) and OEE (Overall Equipment Effectiveness) systems.
• Cloud: Connectivity to the enterprise IT systems of the organization that allows for deeper integration between the OT and IT systems.
The majority of SCADA and MES systems on the market come with support for MQTT.
Unified Namespace (UNS) is a novel solution that allows you to collect data from various industrial IoT (IIoT) systems, add context to it, and transform it into a format that other systems can understand.
Let's publish some sensor data to our HiveMQ broker. Based on the factory area, the data needs to processed in a timely manner.
• Integration of the automation and manufacturing areas (levels 0-3) requires reliable data exchange between various machines, PLCs, and sensors that have very little memory or computing power. The raw data is is used in positive and negative feedbacks to automatically trigger actuators and alerts.
• Level 4 aggregated data is usually augmented to provide not only OT daily, weekly, monthly OEE reports but also OT across the organisation - for example the number of cars assembled based on engine / area factory output, manufacturing of chassis , and so on ..
Start the HiveMQ Broker.
dockerstarthivemq4
Take a look at:
~/Streaming/HiveMQ4/scripts/sensor.py
cdcd~/Streaming/HiveMQ4/scriptscatsensor.py
# python 3.10# Note: This script requires the 'paho-mqtt' package to be installed.# pip3 install paho-mqtt python-etcd for V2# pip3 install "paho-mqtt<2.0.0" for V1importrandomimporttimeimportjsonfrompaho.mqttimportclientasmqtt_client# username = 'emqx' not required as HiveMQ has no security# password = 'public'# MQTT settingsbroker='localhost'port=1883topic="industrial/robot/sensor"# Generate a Client ID with the subscribe prefix.client_id=f'python-mqtt-{random.randint(0, 1000)}'# Connect to MQTT brokerdefconnect_mqtt():defon_connect(client,userdata,flags,rc):# For paho-mqtt 2.0.0, you need to add the properties parameter.# def on_connect(client, userdata, flags, rc, properties):ifrc==0:print("Connected to MQTT Broker!") else:print("Failed to connect, return code %d\n",rc)# Set Connecting Client ID# client = mqtt_client.Client(client_id)# For paho-mqtt 2.0.0, you need to set callback_api_version.client=mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1)# client.username_pw_set(username, password)client.on_connect=on_connectclient.connect(broker,port)returnclient# Publish sensor datadefpublish(client):whileTrue:temperature=random.uniform(20.0,100.0) # Simulate temperature sensor dataposition={'x':random.uniform(-10.0,10.0),'y':random.uniform(-10.0,10.0),'z':random.uniform(-10.0,10.0)}# Simulate position sensor datamessage=json.dumps({'temperature': temperature, 'position': position})result=client.publish(topic,message)status=result[0]ifstatus==0:print(f"Sent `{message}` to topic `{topic}`")else:print(f"Failed to send message to topic {topic}") time.sleep(1)# Main functiondefrun():client=connect_mqtt()client.loop_start()publish(client)# Execute the main functionif__name__== '__main__':run()
The sensor.py script:
• loads required libraries
• sets HiveMQ Broker connection details
• connects to HiveMQ MQTT broker
• sets client_id
• generates random sensor data
• builds message - data dictionary in JSON format
• publish topic + message every 1 second
Execute sensor.py
Release 2.0.0 of the Paho Python MQTT includes breaking changes (11th Feb 2024)
Execute sensor.py script.
cdcd~/Streaming/HiveMQ4/scriptspython3sensor.py
pentaho@pentaho-virtual-machine:~/Workshop--Data-Integration/Labs/Module 3 - Data Sources/Streaming Data/02 HiveMQ$ python3 subscribe.py
/home/pentaho/Workshop--Data-Integration/Labs/Module 3 - Data Sources/Streaming Data/02 HiveMQ/subscribe.py:25: DeprecationWarning: Callback API version 1 is deprecated, update to latest version
client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1)
Connected to MQTT Broker!
Received `{"temperature": 58.80126840794128, "position": {"x": -9.382200219332637, "y": 6.256113402110607, "z": 1.3091624489302767}}` from `industrial/robot/sensor` topic
Received `{"temperature": 80.70263220990923, "position": {"x": 7.8459965093831165, "y": -0.37157016433597967, "z": -4.3821611625288455}}` from `industrial/robot/sensor` topic
Received `{"temperature": 87.65337962266882, "position": {"x": 8.240244717092018, "y": -8.806589253052007, "z": -6.419154857552536}}` from `industrial/robot/sensor` topic
Received `{"temperature": 81.6254980952253, "position": {"x": -5.722946590492448, "y": -9.03693881691259, "z": -4.436963415897499}}` from `industrial/robot/sensor` topic
Check that the sensor data is being successfully published to HiveMQ.
CTRL + Z will stop the pyhton script.
Execute subscribe.py
Take a look at:
~/Streaming/HiveMQ4/scripts/subscribe.py
cdcd~/Streaming/HiveMQ4/scriptscatsubscribe.py
# python 3.10import randomfrom paho.mqtt import client as mqtt_client# MQTT settingsbroker ='localhost'port =1883topic ="industrial/robot/sensor"# Generate a Client ID with the subscribe prefix.client_id =f'subscribe-{random.randint(0, 100)}'# username = 'emqx'# password = 'public'# Connect to MQTT brokerdefconnect_mqtt() -> mqtt_client:defon_connect(client,userdata,flags,rc):if rc ==0:print("Connected to MQTT Broker!")else:print("Failed to connect, return code %d\n", rc)# client = mqtt_client.Client(client_id) client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1)# client.username_pw_set(username, password) client.on_connect = on_connect client.connect(broker, port)return client# Subscribe sensor datadefsubscribe(client: mqtt_client):defon_message(client,userdata,msg):print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") client.subscribe(topic) client.on_message = on_message# Main functiondefrun(): client =connect_mqtt()subscribe(client) client.loop_forever()if__name__=='__main__':run()
The data will also be processed within different timeframes:
Level 0: This is the level of sensors, signals, machine, and real-time capabilities. At this level, microseconds and milliseconds are extremely important.
Level 1: On the PLC (programmable logic controllers) or control level, sensing and manipulating takes place within seconds. Real-time is also important here for production processes predictability.
Level 2: This level handles process management for monitoring and supervising. Here, SCADA (supervisory control and data acquisition) systems and HMI (human-machine interface) give operators a first visualization of what’s going on within the factory.
Level 3: The next level is for MES (manufacturing execution systems). This is where manufacturing operations management takes place. Here, we move off the factory floor into the top floor and measure activity in hours.
Level 4: Finally, at the top, there are ERP (enterprise resource planning) systems for business planning and logistics. Processes on this level are typically calculated in days and months.
To achieve integration, the data flow must work between systems on the same level as well as between the levels.
Pentaho Data Integration
Lets subscribe to the industrial/robot/sensor topic retained in HiveMQ and use Pentaho Data Integration to process the data.
Again we'll use the sensor.py to generate the data and consume in Pentaho Data Integration.
Execute sensor.py script.
cdcd~/Streaming/HiveMQ4/scriptspython3sensor.py
Log into HiveMQ Control Center.
User
admin
Password
admin
Check the inbound connections and traffic.
You can also view the stream in MQTT Explorer.
The PDI client can pull streaming data from an MQTT broker or clients through an MQTT transformation. The parent MQTT Consumer step runs a child transformation that executes according to the message batch size or duration, allowing you to process a continuous stream of records in near real-time. The child transformation must start with the Get records from stream step.
Additionally, from the MQTT Consumer step, you can select a step in the child transformation to stream records back to the parent transformation. This capability allows records processed by an MQTT Consumer step in a parent transformation to be passed downstream to any other steps included within the same parent transformation.
This step returns records that were previously generated by another transformation in a job. The records were passed to this step using either the Copy rows to result step or the Transformation Executor step. You can enter the metadata of the fields you are expecting from the previous transformation in a job.
Open the following transformation:
~/Streaming/HiveMQ4/tr_process_sensor_data.ktr
Double-click on the JSON Input step and configure with the following settings.