Streaming Data Through a MATLAB System Object Using Apache Kafka
By Peter Webb, MathWorks
As the use of Apache® Kafka® and similar event streaming platforms continues to grow, more engineering teams are looking for ways to implement streaming analytics in MATLAB®. Applications that ingest streaming data from a Kafka topic, process it in MATLAB, and publish it to another topic can be used in a wide variety of use cases and workflows—including the filtering, transformation, and analysis of data from sensors, IoT devices, or other sources.
When working in MATLAB, System objects and the Streaming Data Framework for MATLAB Production Server™ simplify the development of applications that read from and write to Kafka. System objects are optimized for iterative computations that process streams of data in segments, so they work well with the Streaming Data Framework, which delivers Kafka data to MATLAB applications in segments. Just as important, System objects maintain state, making it possible to smooth or eliminate artificial transients that can arise at the frame boundaries of a segmented signal or data stream.
This article describes a workflow for developing MATLAB applications that process streaming data from Kafka using System objects, as well as deploying those applications to MATLAB Production Server. The workflow consists of three phases:
- Develop and test the data processing algorithm.
- Embed the algorithm in a streaming analytic function and validate it with a local test server.
- Package the algorithm and deploy it to MATLAB Production Server.
To better understand the key steps in each of these phases, it will help to see them in the context of a sample application. The sample described here is based on a signal processing example from DSP System Toolbox™, which uses the dsp.LowpassFilter
System object™ to apply a low-pass filter to a noisy sine wave (Figure 1). The sample code for this article—which is available for download—adapts the example to work with Kafka, reading a sine wave signal consisting of 4 million samples from a Kafka topic as 1,000 segments of 4,000 samples each. It is important to note here that this System object and Streaming Data Framework workflow is not limited to signal processing applications: It applies generally to dynamic systems with time-varying inputs.
Requirements
To run the example code for all three phases of the workflow, you will need:
- MATLAB (Release 22b or later)
- DSP System Toolbox
- Streaming Data Framework for MATLAB Production Server
- MATLAB Compiler™ and MATLAB Compiler SDK™
- MATLAB Production Server
To run the example code for just the first phase—which involves developing an algorithm that processes streaming data from Kafka in MATLAB—you need only MATLAB, DSP System Toolbox, and Streaming Data Framework for MATLAB Production Server. Additionally, if you want to test against a live Kafka stream, you’ll need a Kafka cluster, of course. Or, use the Streaming Data Framework’s TestStream
object to develop and test without a Kafka cluster. The example code assumes you have access to a Kafka cluster.
Developing and Testing the Algorithm
The first phase in the workflow is focused solely on developing and testing a MATLAB algorithm that consumes data from Kafka. We deliberately chose an extremely simple algorithm so the example could focus on how to connect System objects to Kafka streams.
In the example file filterNoise.m
, the following lines of code create a dsp.LowpassFilter
System object and a stream object for reading from the Kafka topic:
FIRLowPass = dsp.LowpassFilter('PassbandFrequency',5000,'StopbandFrequency',8000); % Set parameters for connection to Kafka server host = "mpskafka2936glnxa64.mathworks.com"; port = 9092; topic = “NoisySineWave” inKS = kafkaStream(host, port, topic, Rows=frameSize, Order="IngestTime", BodyFormat="Binary");
filterNoise.m
Note that the code above is for a Kafka topic named NoisySineWave
. You’ll need to create your own topic to test your algorithm. You can use the code in signalToStream.m
to populate a Kafka topic with noisy sine wave data. Later, in filterNoise.m
, there is a for loop that reads a frame from the topic, processes the frame with the System object, and displays the results in a spectrum analyzer. The readtimetable
function converts each event in the Kafka event stream into a row of the timetable. The original DSP System Toolbox example code generates a frame of the sine wave data each iteration of the loop. In this code, you replace that one line with the call to readtimetable
. That’s an important pattern to remember: To integrate with streaming data, change your code that reads from files or generates synthetic data to call readtimetable
.
for f = 1:frameCount ... % Read one entire frame from the Kafka topic tt = readtimetable(inKS); % Process the frame with the low pass filter. y = FIRLowPass(tt.x); % Display the results with the spectrum analyzer. SpecAna(tt.x,y) end
filterNoise.m
System objects use internal states to store past behavior, so, each time the FIRLowPass
function is invoked, it processes the current frame’s data while taking into account all the frames already processed. As a result, frame-related transients are minimized.
If you run this code, you will see that over time the spectrum analyzer gradually shows the same result as the image in Figure 1, which matches the result from the original DSP System Toolbox example code. The key difference is in the implementation; the code adapted for this example reads data from a Kafka topic instead of a variable in the MATLAB workspace.
Embedding the Algorithm in a Streaming Analytic Function
Once you have tested your algorithm in MATLAB, the next step is to implement it as a streaming analytics function in preparation for deployment in a production environment. During development and testing the MATLAB code proactively called a function (readtimetable
) to consume data from a Kafka topic, but in production the streaming analytic function is called whenever data is available to be consumed. You may think of this in terms of pull and push paradigms. In the previous phase, data is pulled from Kafka by the MATLAB function, and in this phase, data is pushed from Kafka into the MATLAB function.
Another important aspect of streaming analytics functions to keep in mind is that they do not directly display their output. Rather, they write results to a log file, database, or some other destination—in this case a Kafka topic.
Streaming analytic functions may be stateless or stateful. Stateful functions require some program data to be preserved between data frames. Stateless ones do not. Since MATLAB System objects use state data to smooth the transition between data frames, they require stateful streaming analytic functions. Transforming a MATLAB System object–based algorithm into a streaming analytic is a five-step process:
- Create a streaming analytic function with two input and two output parameters.
- Extract the state required by your algorithm from the state input variable.
- Extract the time series data from the input timetable.
- Call your algorithm.
- Return the computed results and the state needed by the next data frame.
The Streaming Data Framework helps you test and deploy streaming analytic functions.
To see how this all works in the example application, start by looking at testFilterStream.m
. After setting up the input and output streams, the next key step in this file is the creation of an EventStreamProcessor
object that will apply the streaming analytic function filterStream
to the Kafka event stream.
% Use an EventStreamProcessor to apply the streaming analytic to the input % Kafka topic and write the result to the output Kafka topic. esp = eventStreamProcessor(inKS,@filterStream,@initFilterStream,OutputStream=outKS);
testFilterStream.m
In the example code, you can see the first three arguments passed to eventStreamProcessor
are the input event stream and two functions: filterStream
and initFilterStream
.
The second function, initFilterStream
, is called just once to initialize the persistent state of the streaming analytic function. In the example, this function, defined in initFilterStream.m
, simply creates the dsp.LowpassFilter
System object that will be used to process the Kafka signal data. Note that this is exactly the same code used to create the low-pass filter in filterNoise.m
.
% Create a DSP LowPassFilter, which is a MATLAB system object. state.FIRLowPass = dsp.LowpassFilter('PassbandFrequency',5000,'StopbandFrequency',8000);
initFilterStream.m
The first function, the streaming analytic filterStream
, is called once per data frame. Since filterStream
is stateful, the EventStreamProcessor
object maintains state between calls to the function. When the function is called, it is passed to the current frame’s data as well as the current state (here the state is the dsp.LowpassFilter
System object). After the streaming analytic function processes the data (in this case by applying the low-pass filter), it returns the filtered signal as the first argument and the updated state as a second output argument. EventStreamProcessor
writes the filtered signal to the output topic and preserves the state change for the next function iteration.
function [result,state] = filterStream(signal,state) % filterStream Pass the input signal through a low pass filter. Capture the % resulting filtered signal in a timetable with the same timestamps as the % input signal. % Retrieve the DSP LowPassFilter object from inter-frame persistent % state. FIRLowPass = state.FIRLowPass; % Apply the low pass filter to the signal y = FIRLowPass(signal.x); % Create a timetable from the filtered signal and the input signal % timestamps. ts = signal.Properties.RowTimes; result = timetable(ts,y); % Preserve the low pass filter in inter-frame persistent state. This % ensures the state is loaded by whichever MPS worker processes the % next frame of the signal. state.FIRLowPass = FIRLowPass; end
filterStream.m
Returning to testFilterStream.m
, you can see that after the EventStreamProcessor
is set up, the code tests the streaming analytic function by processing 10 frames of data using the execute function of the EventStreamProcessor
object. It then reads both the input signal and the newly generated output signal from their respective Kafka topics and displays the results via a spectrum analyzer.
% Test the streaming analytic by processing 10 frames. This produces 10 % frames in the output topic. N = 10; execute(esp,N);
testFilterStream.m
By running tests like the one implemented in testFilterStream.m
, you can evaluate how your streaming analytic function will perform before deploying it to MATLAB Production Server. You can also test your streaming analytic function with the development version of MATLAB Production Server, which can be used as a local test server for debugging your code before it is deployed to an enterprise system.
Packaging and Deploying the Algorithm to MATLAB Production Server
The third phase of the workflow—packaging and deployment—is well-documented and relatively straightforward.
The example file deployFilterStream.m
shows the main steps:
- Create input and output streams.
- Set up the
EventStreamProcessor
object. - Call the package method to launch the Production Server Compiler app.
% Input and output streams inKS = kafkaStream("mpskafka2936glnxa64.mathworks.com", 9092,"NoisySineWave", Rows=frameSize); outKS = kafkaStream("mpskafka2936glnxa64.mathworks.com", 9092, "LowPassSineWave"); esp = eventStreamProcessor(inKS,@filterStream,@initFilterStream,OutputStream=outKS); prj = package(esp,StateStore="LocalRedis");
deployFilterStream.m
The package
function prepopulates the Production Server Compiler app with information for your streaming analytic function (Figure 2). When you click Package, MATLAB Compiler generates a deployable archive (a CTF file) for deployment to MATLAB Production Server. Next, follow the steps in Deploy Streaming Analytic Function to MATLAB Production Server to deploy your archive and start the Kafka Connector executable, which will pull data from your Kafka host and push it to your deployed archive.
Conclusion
With the Streaming Data Framework, you can easily modify your MATLAB System object–based algorithms to access data streaming in Kafka topics. Then, to scale your solution to high volumes of data, you can deploy a streaming analytic function to MATLAB Production Server.
Deployment can be a time-consuming process involving IT teams and their schedules as well as governance constraints. To minimize unnecessary deployments, you want to deploy applications that have been thoroughly tested and debugged. The ability to debug and test locally before deployment is a key advantage of the workflow outlined here for building MATLAB applications that use System objects and the Streaming Data Framework for MATLAB Production Server to seamlessly access Kafka data.
Published 2023