Main Content

kafkaStream

Create connection to event stream in Kafka topic

    This object requires Streaming Data Framework for MATLAB® Production Server™.

    Description

    The kafkaStream function creates a KafkaStream object that connects to a Kafka® topic and reads and writes event streams from that topic.

    An event consists of three parts:

    • Key — Identifies event source

    • Timestamp — Indicates time at which event occurred

    • Body — Contains event data specified as an unordered set of (name, value) pairs

    After creating a KafkaStream object, use the readtimetable function to read the events into a timetable or the writetimetable function to write a timetable to the stream.

    readtimetable converts events into rows of a timetable. The names in the event body become the timetable column names, the value associated with each name becomes the column value in the event row, and the event timestamp becomes the row timestamp. writetimetable converts rows of a timetable into events in a stream. The object Properties and Name-Value Arguments let you specify how events are converted to and from timetables.

    Creation

    Description

    ks = kafkaStream(host,port,topic) creates an object connected to a Kafka topic.

    example

    ks = kafkaStream(host,port,topic,propname1,propval1,...,propnameN,propvalN) specifies Kafka provider properties when creating an object connected to a Kafka topic.

    Specify Kafka provider properties and their corresponding values using one or more propname,propval argument pairs. Use single or double quotes around propname. You can specify several properties and their values in any order as propname1,propval1,...,propnameN,propvalN.

    example

    ks = kafkaStream(___,Name=Value) uses any of the earlier syntaxes and additionally specifies event stream options when creating an object connected to a Kafka topic.

    Specify event stream options using one or more Name=Value arguments. Name can also be a property name, with Value as the corresponding value. You can specify several name-value arguments in any order as Name1=Value1,...,NameN=ValueN.

    Input Arguments

    expand all

    Hostname of the Kafka server, specified as a character vector or string scalar.

    Example: '144.213.5.7' or 'localhost'

    Data Types: char | string

    Port number of the Kafka server, specified as an integer in the range [0, 65,535].

    Example: 9092

    Kafka topic name, specified as a character vector or string scalar.

    Example: "CoolingFan"

    Data Types: char | string

    Name of Kafka provider property, specified as a character vector or string scalar. Use single or double quotes around propname. Kafka property names always contain at least one dot character, for example, retention.ms. For a list of Kafka properties, see the Kafka documentation: https://kafka.apache.org/documentation/#configuration.

    The value of the property, propval, must follow the property name. Specify the property name and its corresponding value as a comma-separated pair.

    Example: kafkaStream(host,port,topic,"security.protocol","SASL_SSL") sets the Kafka configuration property security.protocol to SASL_SSL.

    Value of Kafka provider property. For a list of Kafka properties and their values, see the Kafka documentation: https://kafka.apache.org/documentation/#configuration.

    The value of the property must follow the property name, propname. Specify the property name and its corresponding value as a comma-separated pair. You can specify propval as any supported MATLAB data type, but it must be possible to convert that value to a string.

    Example: kafkaStream(host,port,topic,"sasl.mechanism","SCRAM-SHA-512") sets the value of the Kafka configuration property sasl.mechanism to SCRAM-SHA-512.

    Name-Value Arguments

    Specify optional pairs of arguments as Name1=Value1,...,NameN=ValueN, where Name is the argument name and Value is the corresponding value. Name-value arguments must appear after other arguments, but the order of the pairs does not matter.

    Event Window Size

    expand all

    Timestamp span in the event window, specified as a duration scalar. Duration determines the events that the readtimetable function returns based on their timestamp. Duration specifies the difference between the last and first timestamps of events in the event window.

    readtimetable does not return until it processes all events in the window, so windows with large durations can block other processes from continuing. To configure a timeout period to prevent blocking, use the ReadLimit property.

    You can specify either the Duration property or the Rows property, but not both.

    Example: Duration=minutes(1) specifies that each call to readtimetable returns a timetable that has one minute's worth of events, where the timestamp of the last event is no more than one minute later than the timestamp of the first event.

    Data Types: duration

    Number of events in the event window, specified as a positive integer. Rows specifies the number of rows that a call to the readtimetable function returns. If there are less than the number of specified rows available for reading, then readtimetable times out and returns an empty timetable.

    readtimetable does not return until it processes all events in the window, so windows with large row values can block other processes from continuing. To configure a timeout period to prevent blocking, use the ReadLimit property.

    You can specify either the Duration property or the Rows property, but not both.

    Example: Rows=500 specifies that each call to readtimetable returns a timetable with 500 rows.

    Data Types: single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64

    Schema

    expand all

    Rules for converting event data to MATLAB data types, specified as a JSON string in event schema format. You can specify an event schema more easily using the ImportOptions property.

    Rules for converting MATLAB data types to event data, specified as a JSON string in event schema format. You can specify an event schema more easily using the ExportOptions property.

    Flag to indicate whether the export schema is written to the output stream, specified as a logical scalar.

    The schema is embedded in each event, which can significantly increase the size of the event. If downstream applications do not require the schema, set this flag to false to reduce the number of bytes in your stream.

    Data Types: logical

    Properties

    expand all

    Kafka consumer group ID, specified as a character vector or string scalar.

    Multiple Kafka consumers can belong to the same consumer group. In that case, Kafka shares data between the consumers in the group so that no two consumers in the same group ever receive the same messages. By default, every kafkaStream object has a unique consumer group ID, which allows multiple consumers to read from the same topic independently.

    Data Types: char | string

    Name of the key variable in the event stream, specified as a string scalar or character vector.

    Data Types: string | char

    Kafka topic name, specified as a string scalar or character vector.

    Example: CoolingFan

    Data Types: string | char

    Strategy to order events in the stream, specified as one of these values:

    • "EventTime" — Order events based on the time that they occur. Ensures event-time chronological order even when events arrive out of order at the Kafka server.

    • "IngestTime" — Order events based on the time that they appear in the stream.

    You cannot set the value of this property after object creation.

    Data Types: string | char

    Unit of event timestamp, specified as one of these values:

    • "Milliseconds"

    • "Seconds"

    • "Minutes"

    • "Hours"

    • "Days"

    Interpret the event timestamp as the number of corresponding units before or after the Unix epoch.

    Data Types: string | char

    This property is read-only.

    Event window size, specified by a fixed amount of time (using the Duration argument) or a fixed number of messages (using the Rows argument).

    Data Types: duration | single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64

    Import and Export Options

    Rules for transforming stream events into MATLAB data, specified as an ImportOptions object. This object controls the import of stream events into MATLAB.

    Rules for transforming MATLAB data into stream events, specified as an ExportOptions object. This object controls the export of MATLAB data into streams.

    Connection and Request Timeouts

    Number of seconds that a client waits for the initial response from the Kafka host, specified as a positive integer.

    Data Types: single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64

    Number of seconds to wait before terminating a request, specified as a positive integer. The wait time includes connecting to the Kafka host as well as data transfer between the Kafka host and the client.

    Data Types: single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64

    Strategy to wait for a response from the stream, specified as one of these values:

    • "Size" — Client prioritizes filling the event window. Using this strategy, the client might wait longer than the RequestTimeout time period as long as it is still receiving the expected number of messages. The default number of messages is 50. If the client receives no messages within the RequestTimeout time period, it no longer waits.

    • "Time" — Client strictly adheres to the RequestTimeout limit, even if it has not received the expected number of messages. RequestTimeout specifies the amount of time the stream object waits between receiving events. If the stream is actively receiving data, it does not time out during that operation.

    Event Key and Body Encoding

    Character encoding format used to interpret the bits in the event body, specified as one of the following:

    • utf8 — UTF-8 encoding format

    • utf16 — UTF-16 encoding format

    • base64— Base 64 encoding format

    • uint8 — Eight-bit unsigned binary bytes

    This property determines the size and encoding of the bytes used in the event body, which are in the format specified by BodyFormat.

    Format of bytes in event body, specified as one of the following:

    • JSON — JSON string

    • Array — MATLAB array

    • Text — String data

    • Binary — Binary data

    Depending on the encoding specified by BodyEncoding, bytes can be larger than eight bits.

    Order for storing bits in the event key, specified as one of the following.

    • LittleEndian — Least significant bit is stored first

    • BigEndian — Most significant bit is stored first

    • MatchHost— Bits are stored in the same order as is used by the host computer on which the streaming data framework is running

    • NotApplicable — Not an integer key

    This property is applicable only for integer keys and not applicable to floating point or text keys.

    Character encoding format used to interpret the bits in an event key, specified as one of the following:

    • utf8 — UTF-8 encoding format

    • utf16 — UTF-16 encoding format

    • base64— Base 64 encoding format

    • uint8 — Eight-bit unsigned binary bytes

    If KeyEncoding is utf8 or utf16, then the KeyType property must be text. If KeyEncoding is base64 or uint8, then KeyType must be one of the numeric encoding formats.

    Character encoding scheme used to interpret the bytes in an event key, specified as one of these values:

    • uint8 — One-byte unsigned integer

    • int8 — One-byte signed integer

    • uint16 — Two-byte unsigned integer

    • int16 — Two-byte signed integer

    • uint32 — Four-byte unsigned integer

    • int32 — Four-byte signed integer

    • uint64 — Eight-byte unsigned integer

    • int64 — Eight-byte signed integer

    • single — Single-precision IEEE 754 floating point number

    • double — Double-precision IEEE 754 floating point number

    • text — String

    If KeyType is text, then the KeyEncoding property must be either utf8 or utf16. If KeyType is any of the other numeric encoding formats, then KeyEncoding must be either base64 or uint8.

    Object Functions

    expand all

    readtimetableRead timetable from event stream
    writetimetableWrite timetable to event stream
    seekSet read position in event stream
    previewPreview subset of events from event stream
    identifyingNameEvent stream name
    detectImportOptionsCreate import options based on event stream content
    detectExportOptionsCreate export options based on event stream content
    readeventsRead raw events from Kafka stream without schema processing applied
    flushReset read window boundaries
    stopStop processing event streams from Kafka topic
    loggederrorError information for Kafka stream operation
    createTopicCreate topic in Kafka cluster
    deleteTopicRemove topic from Kafka cluster
    categoryListKafka stream provider property list
    getProviderPropertiesKafka stream configuration property data
    isPropertyDetermine if Kafka stream provider property is set

    Examples

    collapse all

    Assume that you have a Kafka server running at the network address kafka.host.com:9092 that has a topic CoolingFan.

    Assume that the Kafka host is configured to use SSL. To configure SSL communication between the Kafka host and the client, provide SSL configuration settings when creating an object for reading and writing to the Kafka topic.

    ks = kafkaStream("kafka.host.com",9092,"CoolingFan", ...
                    "security.protocol","SASL_SSL", ...
                    "ssl.truststore.type","PEM", ...
                    "ssl.truststore.location","prodserver.pem")
    ks = 
    
      KafkaStream with properties:
    
                      Topic: "CoolingFan"
                      Group: "da576775-49c9-4de3-9955-2bdd9f963aa0"
                      Order: EventTime
                       Host: "kafka.host.com"
                       Port: 9092
          ConnectionTimeout: 30
             RequestTimeout: 61
              ImportOptions: "Import to MATLAB types"
              ExportOptions: "Source: function eventSchema"
              PublishSchema: "true"
                 WindowSize: 50
                KeyVariable: "key"
                KeyEncoding: "utf16"
                    KeyType: "text"
               KeyByteOrder: "BigEndian"
               BodyEncoding: "utf8"
                 BodyFormat: "JSON"
                  ReadLimit: "Size"
        TimestampResolution: "Milliseconds"

    Confirm which properties are set.

    props = getProviderProperties(ks);
    unique({props.name}')
    ans =
    
      7×1 cell array
    
        {'auto.offset.reset'      }
        {'retention.ms'           }
        {'sasl.jaas.config'       }
        {'sasl.username'          }
        {'security.protocol'      }
        {'ssl.truststore.location'}
        {'ssl.truststore.type'    }

    Assume that you have a Kafka server running at the network address kafka.host.com:9092 that has a topic CoolingFan.

    Create an object connected to the CoolingFan topic and request only 10 messages instead of the default.

    ks = kafkaStream("kafka.host.com",9092,"CoolingFan",Rows=10)
    ks = 
    
      KafkaStream with properties:
    
                      Topic: "CoolingFan"
                      Group: "da576775-49c9-4de3-9955-2bdd9f963aa0"
                      Order: EventTime
                       Host: "kafka.host.com"
                       Port: 9092
          ConnectionTimeout: 30
             RequestTimeout: 61
              ImportOptions: "Import to MATLAB types"
              ExportOptions: "Source: function eventSchema"
              PublishSchema: "true"
                 WindowSize: 10
                KeyVariable: "key"
                KeyEncoding: "utf16"
                    KeyType: "text"
               KeyByteOrder: "BigEndian"
               BodyEncoding: "utf8"
                 BodyFormat: "JSON"
                  ReadLimit: "Size"
        TimestampResolution: "Milliseconds"

    Use the object to read 10 messages from the event stream into a timetable.

    tt = readtimetable(ks)
    tt =
    
      10×11 timetable
    
             timestamp          vMotor    wMotor    Tmass     
        ____________________    ______    ______    ______    
    
        31-Oct-2020 00:00:00    1.0909         0        25            
        31-Oct-2020 00:00:00    1.1506     100.5     25.17            
        31-Oct-2020 00:00:00    1.1739     190.9    25.223             
        31-Oct-2020 00:00:00    1.1454    330.61     25.15             
        31-Oct-2020 00:00:00    1.1346    382.77    25.122           
        31-Oct-2020 00:00:00    1.1287    420.88    25.106             
        31-Oct-2020 00:00:00    1.1253    454.55    25.096             
        31-Oct-2020 00:00:00    1.1232     478.1     25.09            
        31-Oct-2020 00:00:00    1.1217    500.16    25.086    ...        

    Version History

    Introduced in R2022b