If you are dealing with the streaming analysis of your data, there are some tools which can offer performing and easy-to-interpret results. First, we have Kafka, which is a distributed streaming platform which allows its users to send and receive live messages containing a bunch of data (you can read more about it here). We will use it as our streaming environment. Then, if we want to visualize, in real time, our results, we need a tool which can capture our data and predictions: it is Grafana, and among its data sources, it can be connected to InfluxDB, an open source time series database. So, through this article we will build an ML algorithm which can extract information and make predictions, in real time, on our data, throughout the following steps:
- Loading our data and preparing them to be processed
- Building the algorithm and save it to spark environment
- Writing a script which reads our data and sends them to a Kafka topic as it receives them, so that we are simulating a streaming gathering
- Writing a script which collects data from Kafka topic and processes them through the algorithm we trained and saved. Then, it sends the results to another Kafka topic
- Finally, writing a third script which reads from the second topic and sends the message directly to InfluxDB, so that we can plot our results on Grafana.
So let’s start. The task I want to simulate is the following: imagine we are provided with some data about the temperature of a working machine. We know that this machine works on different regimes, which correspond to different temperature levels and fluctuations. Hence, we first want to identify those work regimes (which are unknown), then train a classification algorithm which can make predictions on real-time data and predict, based on the actual temperature, the correspondent working regime. As you might have guessed, we are facing an unsupervised task which we are converting into a supervised one.
First, I’m going to create a random dataset containing my temperature variable. To code these lines (as well as those which will be following), I’ve used Apache Zeppelin with the %pyspark interpreter.
%pyspark import pandas as pd import numpy as np df = pd.DataFrame({'Temperature': np.random.randint(40,70, 50000)})
Then, I converted it into a spark dataframe, so that we can connect it to our Spark environment, through a Spark Context I’ve initialized:
%pyspark from pyspark import SparkContext sc = SparkContext("local[*]", "Example") from pyspark.sql import SQLContext sqlContext = SQLContext(sc) spark_df = sqlContext.createDataFrame(df)
Then, in order for our dataset to be correctly processed by our algorithm, we have to vectorize it. The idea is that you want to convert the columns you will use as features to a vector of values, stored in a new column called, of course, ‘Feature’. Note that here we will use only Temperature as a feature, however the format needs to be a vector anyway:
%pyspark from pyspark.ml.feature import VectorAssembler vectorAssembler = VectorAssembler(inputCols = ['Temperature'], outputCol = 'Feature') vspark_df = vectorAssembler.transform(spark_df) vspark_df.show()
As you can see, the format of our variable changed to a vector type. Now our feature is ready to be processed.
The first thing we want to do, since we are dealing with unlabelled data, is clustering them with an unsupervised algorithm. For this purpose, I will use K-means (you can read my article here if you want to know something more about this algorithm).
Hence, I will start identifying the number of centroids I want to set with the Elbow method:
%pyspark import matplotlib.pyplot as plt from pyspark.ml.clustering import KMeans from pyspark.ml.evaluation import ClusteringEvaluator cost = np.zeros(20) for k in range(2,20): kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("Feature") model = kmeans.fit(vspark_df.sample(False,0.1, seed=42)) cost[k] = model.computeCost(vspark_df) fig, ax = plt.subplots(1,1, figsize =(8,6)) ax.plot(range(2,20),cost[2:20]) ax.set_xlabel('k') ax.set_ylabel('cost')
It seems that, after the sixth centroid, the within-cluster variance is not decreasing significantly. Hence, we can set K=6. Let’s apply our K-means to our dataset:
%pyspark from pyspark.ml.clustering import KMeans k = 6 kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("Feature") model = kmeans.fit(vspark_df) from pyspark.sql.functions import col df_transformed=model.transform(vspark_df).select('Temperature','Feature', col('prediction').alias('label')) df_transformed.show(2)
As you can see, we now have a third column, which is our target, our label. Hence, we can now train a classification algorithm in a supervised way and, more specifically, we will train a Neural Network with 2 hidden layers (with input=1 since we have only one feature, and output=6 since we have 6 clusters).
%pyspark from pyspark.ml.classification import MultilayerPerceptronClassifier from pyspark.ml.evaluation import MulticlassClassificationEvaluator # Split the data into train and test splits = df_transformed.randomSplit([0.6, 0.4], 1234) train = splits[0] test = splits[1] layers = [1, 6, 8, 6] # create the trainer and set its parameters trainer = MultilayerPerceptronClassifier(featuresCol='Feature', labelCol='label', maxIter=200, layers=layers, blockSize=128, seed=1234) # train the model model_NN = trainer.fit(train) # compute accuracy on the test set result = model_NN.transform(test) predictionAndLabels = result.select("prediction", "label") evaluator = MulticlassClassificationEvaluator(metricName="accuracy") print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels))) Output: Test set accuracy = 0.802813400934
The accuracy of our NN is around 80%, so we are satisfied and we can save our model like so:
%pyspark from pyspark.ml.classification import MultilayerPerceptronClassificationModel model_NN.save("file:///home/cloudera/Desktop/Neural_Network")
To save our dataset too, we first need to parse it to a json format (the key-value type):
%pyspark from pyspark.sql import functions as F json_file=final_df.select(F.to_json(F.struct([final_df[x] for x in final_df.columns])).alias("value")) json_file.show()
Nice. Now let’s move to our Pyspark scripts. First, I’m writing the program which will collect data from our mydata.json and send them, as in streaming, to our Kafka consumer, under the topic ‘example_1’.
#! /usr/bin/env python2.7 import json from pprint import pprint from time import sleep from json import dumps from kafka import SimpleProducer, KafkaClient kafka = KafkaClient('quickstart.cloudera:9092') producer = SimpleProducer(kafka) with open('mydata.json', 'r') as f: for line in f: data = json.loads(line) for val in data.values(): producer.send_messages('example_1', val.encode('utf-8')) sleep(5) #it will send a line to kafka every 5 seconds
Now, I’m writing a script which reads data from Kafka topic ‘example_1’. To do that, I’m using the Spark Structured Streaming approach, very useful if you want your streaming flow to be structured rather than unstructured (which is the typical format streaming data are collected through a Spark Streaming Context). Here there is the full guide of Spark Structured Streaming with Kafka Integration.
Then, once collected my data, I’m using my stored ML model to extract the work regime from my feature Temperature, and then collect the output in a new column called ‘prediction’.
Finally, I will write my final dataframe to Kafka again, but in a different topic called ‘example_2’ (indeed, ‘example_1’ is already taken by the queue of my initial dataframe’s rows).
#! /usr/bin/env python2.7 #initializing my Spark Session from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[2]")\ .appName("Influx").getOrCreate() from pyspark.sql.functions import col, from_json, to_json from pyspark.sql.types import * #initializing the schema of my json file schema = StructType([ StructField("Temperature", IntegerType()), StructField("label", IntegerType())]) #reading structured streaming data from kafka topic 'example_1' df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "quickstart.cloudera:9092") \ .option("subscribe", "example_1") \ .option("failOnDataLoss", False)\ .load() #extracting from the df our values through the json schema df_2=df.selectExpr("CAST(value AS STRING)")\ .select(from_json(col("value"), schema)\ .alias("tmp"))\ .select("tmp.*") #vectorize our feature from pyspark.ml.feature import VectorAssembler vectorAssembler = VectorAssembler(inputCols = ['Temperature'], outputCol = 'Feature') vspark_df = vectorAssembler.transform(df_2.dropna()) #loading our NN from pyspark.ml.classification import MultilayerPerceptronClassificationModel NN_model=MultilayerPerceptronClassificationModel.load("file:///home/cloudera/Desktop/Neural_Network") #predicting the work regime of our data NN_predictions = NN_model.transform(vspark_df) final_df=NN_predictions.select("prediction","Temperature","Feature") #writing our final df to kafka topic 'example_2' ds=final_df.select(to_json(struct("prediction","Temperature")).alias("value"))\ .writeStream\ .format("kafka")\ .option("kafka.bootstrap.servers", "quickstart.cloudera:9092")\ .option("topic","example_2")\ .option("checkpointLocation", "file:///home/cloudera/Desktop/checkpoint")\ .start() ds.awaitTermination()
Finally, I will create a script which reads from Kafka topic ‘example_2’ and sends its content straight to InfluxDB.
#! /usr/bin/env python2.7 import json from influxdb import InfluxDBClient client = InfluxDBClient(host='localhost', port=8087, database='example') client.create_database('example') import kafka from kafka import KafkaConsumer consumer = KafkaConsumer('example_2', bootstrap_servers=['quickstart.cloudera:9092']) from pyspark.sql.functions import col, from_json from pyspark.sql.types import * for message in consumer: json_body = [ { "measurement": "new_table", #this table will be automatically created "tags": { }, "fields":json.loads(message.value) } ] client.write_points(json_body)
Now, in three different terminals, we run our scripts:
#this is a spark job, hence we run it through spark-submit [root@quickstart] ./bin/spark-submit --packages\ org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0, \ org.apache.kafka:kafka-clients:0.10.2.1, \ org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.0 reading_from_kafka.py #then we write our two further scripts: ./writing_to_kafka.py ./writing_to_influx.py
Now let’s jump to Grafana. After having connected our dashboard to InfluxDB (you can easily see how to do that on Grafana website), we can create our graph, which shows both the actual temperate and the work regime predicted by our NN.
The green line is our temperature, while the yellow one is the correspondent work regime.
You can add many variants to your graphs: it is possible to aggregate, group or compute operations on your data by adding as many queries as your want. Furthermore, even though here I’ve been working with time series, there is plenty of graphs Grafana offers you, depending on the task. With the integration of these three tools, you can reach the goal of not only building real-time models, but also efficiently presenting them.