Python MQTT and Decision Tree Implementation

Introduction

This Python script uses the scikit-learn library for machine learning to create and train a decision tree classifier. The trained model is then saved for future use (eg. for classification of incomming data). This model can be trained and tested on Iris dataset or any other dataset that has extracted features and names which correlate to said features.

MQTT is the communication of choice in this example like in many others on our platform. MQTT offers simple and reliable implementation inside Sandbox environemnt, user can easily change this example and use any other communication available on our product for purpose of sending data inside Jupyter for machine learning.

Prerequisites

Jupyter Notebook installed on Sandbox, this can be done via Software-centerTo find out how to install services on Sandbox, please refer to this documentation: Services

/Example3/Decision-tree-flow: This flow is a part of @senlab/node-red-example3-senlab package and is used to transfer Iris dataset or any other dataset from inside Node-RED container to the Jupyter service. This is optional as user can choose their own way to stream data inside Jupyter.

To learn more about Verdaccio and how to install packages, please refer to this documentation: Verdaccio

Python Script Dependencies

The script requires several external libraries:

import paho.mqtt.client as mqtt
import time
from sklearn import tree
from joblib import dump

paho.mqtt.client: A client library for the MQTT protocol. time: Used for managing delays in the script. sklearn.tree: Contains decision tree models. joblib: Used for saving the trained model.

All these libraries come preinstalled inside Jupyter container. For using other Python libraries user must install them using the pip install command from inside the container, check this documentation to find out how to manage containers and services on Sandbox:

MQTT Configuration and Callbacks

MQTT Client Setup

Training set for training the decision-tree and then later samples which need to be classified are sent to the Python script through MQTT protocol. For this MQTT Client needs to be configured and callbacks defined.

mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.on_disconnect = on_disconnect

mqtt.Client(): Initializes an MQTT client. on_connect, on_message, on_disconnect: Callback functions that handle different MQTT events.

Connection to MQTT Broker

mqttc.connect("emqx", 1883, 60)

connect: Method to connect to the MQTT broker located at hostname ``emqx'' on port 1883.

This example assumes that EMQX is used as a broker on Sandbox, other option can be Mosquitto, port stays the same, for communication between containers on the same host names of services can be used instead of DNS or IP Adress.

Callback Functions

on_connect

def on_connect(client, userdata, flags, reason_code, properties):
    print(f"connected with the result code {reason_code}")
    client.subscribe("/decision-tree")
  • Triggered upon a successful connection with the MQTT broker.

  • Subscribes to the topic ``/decision-tree''. Configure this line to subscribe to the same topic as the publisher uses.

on_message

def on_message(client, userdata, msg):
    global dataset_array, targetset_array, should_continue
    decoded_payload = msg.payload.decode('utf-8')
    if decoded_payload.strip() == "":
        should_continue = False
        print("this happens OK")
        time.sleep(0.5)
        return
    features = decoded_payload.split(",")
    dataset_array.append(features[:-1])
    targetset_array.append(features[-1])
  • Called whenever a message is received on a subscribed topic.

  • Processes incoming data to form the feature set and labels for the decision tree.

on_disconnect

def on_disconnect(client, userdata, rc, properties=None, reasonCode=None):
    print("client has disconnected")
  • This callback function is called when client disconnects from the broker and is used to Log this event.

Decision Tree Model Creation and Training

x = dataset_array
y = targetset_array

clf = tree.DecisionTreeClassifier()
clf = clf.fit(x, y)
  • Prepares the dataset (x) and target (y).

  • Initializes and fits the decision tree classifier.

Visualization and Saving Model

tree.plot_tree(clf)
dump(clf, 'iris_model.joblib')
  • plot_tree: Visualizes the decision tree.

  • dump: Saves the trained model to a file named `iris_model.joblib'.

MQTT Event Loop

mqttc.loop_start()
while should_continue:
    pass
mqttc.loop_stop()
mqttc.disconnect()
  • loop_start and loop_stop: Control the asynchronous MQTT loop allowing the script to perform other operations concurrently.

  • disconnect: Properly disconnects the client from the MQTT broker.

This script is useful in applications where real-time data collection is combined with on-the-fly machine learning model training, such as in IoT (Internet of Things) environments. The use of MQTT makes it suitable for distributed systems where devices send data to a central server that performs the analysis.