Module : streams

Module overview

The Streams module provides the stream processing capabilities to Ballerina:

Note: Ballerina Streaming capabilities are shipped as an experimental feature in the latest release. Please use the --experimental flag when compiling Ballerina files which have streaming constructs.

The following topics explain the high-level concepts of Ballerina streaming.

Stream

A stream is a logical series of events ordered in time. Its schema is defined/constrained via the record definition. A record definition contains a unique name and a set of uniquely-identifiable attributes with specific types within the record. All the events of a specific stream have the same schema (i.e., have the same attributes in the same order).

Syntax

The syntax for defining a new stream is as follows.

type <record name> record {
    <attribute type> <attribute name>;
    <attribute type> <attribute name>;
    <attribute type> <attribute name>;
    ...
};

stream<record name> <stream name> = new;

The following parameters are configured in a stream definition.

ParameterDescription
stream nameThe name of the created stream.
record nameThe name of the record that constrains the stream.
attribute nameThe uniquely-identifiable attribute name. The schema of a record is defined by its attributes.
attribute typeThe type of each attribute defined in the record.
Example
type Employee record {
    string name;
    int age;
    string status;
};

stream<Employee> employeeStream = new;

The code given above creates a stream named employeeStream that is constrained by the Employee type with the following attributes.

Forever Statement

The forever statement block can include one or more streaming queries defining stream processing and complex event processing rules. The forever statement block lets streaming queries to run continuously till the Ballerina program is exited. Here each streaming query within the forever block executes as an independent isolated processing unit.

Sample query

This query filters out the sensor events, which have the temperature greater than 30 celsius, and for every 100 sensor events, it groups them based on their type, count, number of sensor events for each type. Next, it publishes all the types that have more than one event to the highTemperatureSensorStream stream.

    forever {
        from sensorTemperatureStream
            where sensorTemperatureStream.temperature > 30
            window lengthBatch (100)
        select sensorTemperatureStream.type, count() as totalCount
            group by sensorTemperatureStream.type
            having totalCount > 1
        =>  (HighTemperature [] values) {
                foreach var value in values {
                    highTemperatureSensorStream.publish(value);
                }
            }
    }

Query

Each streaming query can consume one or more streams, process the events continuously in a streaming manner, and generate the output simultaneously. A query enables you to perform complex event processing and stream processing operations by processing incoming events one by one in the order they arrive.

Syntax

Each query contains an input and an output section. Some also contain a projection section. The following is a simple query with all three sections.

from <input stream>
select <attribute name>, <attribute name>, ...
=> (<array type> <parameter name>) {
      ...
      ...
}
Example

This query consumes events from the tempStream stream (that is already defined) and sends the room temperature and the room number to the roomTempStream stream as the output.

type temperature record {
  int deviceID;
  int roomNo;
  float value;
};

type roomTemperature record {
  int roomNo;
  float value;
};

stream<temperature> tempStream = new;
stream<roomTemperature> roomTempStream = new;

public function initQuery() {
    forever {
        from tempStream
        select tempStream.roomNo, tempStream.value
        => (roomTemperature[] temperatures) {
            foreach var value in temperatures {
                roomTempStream.publish(value);
            }
        }
    }
}

Records

SnapshottableStreamEvent This record represents a stream event which can be persisted.

Objects

AbstractOperatorProcessor

Abstract processor encapsulating operator functions.

AbstractPatternProcessor

Abstract processor encapsulating pattern processor functions.

Aggregator

Abstract object, which should be implemented in order to create a new aggregator.

AndOperatorProcessor

Processor to perform AND stream operations.

Average

Aggregator to calculate average in streams.

CompoundPatternProcessor

Processor to perform compound stream operations.

Count

Aggregator to count events in streams.

DelayWindow

This window will delay the incoming events for a given amount of time. E.g. from inputStream window delay(4000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The delay window should only have one parameter ( delayTime)

DistinctCount

Aggregator to get the distinct counts of values in streams.

ExternalTimeBatchWindow

This is a batch (tumbling) time window based on external time, that holds events arrived during window time periods, and gets updated for every window time. E.g. from inputStream window externalTimeBatch(inputStream.timestamp, 1000, 500, 1200, true) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The externalTimeBatch window should only have two to five parameters (timestamp field, windowTime, startTime, timeout, replaceTimestampWithBatchEndTime)

ExternalTimeWindow

This is a sliding time window based on external time, that holds events for that arrived during last window time period from the external timestamp, and gets updated on every monotonically increasing timestamp. E.g. from inputStream window externalTime(inputStream.timestamp, 4000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The externalTime window should only have two parameters (timestamp field, windowTime)

Filter

The Filter object represents the where clause in a streaming query. This object takes two parameter for initialization. nextProcessorPointer is the function pointer of the next processor to be invoked once the filtering is complete. conditionFunc is a function pointer which return true if the given where clause evaluates to true.

FollowedByProcessor

Processor to perform FollowedBy stream operations.

HoppingWindow

The hopping window releases the events in batches defined by a time period every given time interval. The batch is also determined by the time period given in the window. When the time interval the events being released and the time period it hold the events are equal, the hopping window acts as a TimeBatch window. E.g. from inputStream window hopping(5000, 4000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } Hopping window should only have two parameters ( windowTime, hoppingTime)

IntSort

This class implements a merge sort algorithm to sort timestamp values for state persistence.

LengthBatchWindow

This is a batch (tumbling) length window, that holds up to the given length of events, and gets updated on every given number of events arrival. E.g. from inputStream window lengthBatch(5) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The lengthBatch window should only have one parameter ( windowBatchLength)

LengthWindow

The LengthWindow is a sliding length window, that holds last windowLength events, and gets updated on every event arrival and expiry. E.g. from inputStream window length(5) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The length window should only have one parameter ( windowLength)

LinkedList

The LinkedList object which represents the linked list data structure.

Max

Aggregator to find the maximum value in a stream.

MaxForever

The aggregator to keep the maximum value received so far. It is similar to Max aggregator, but it keeps the maximum value it received so far, forever.

MergeSort

This object implements the merge sort algorithm to sort the provided value arrays. fieldFuncs are function pointers which returns the field values of each stream event's data map's values. sortTypes are an array of ( streams:ASCENDING or streams:DESCENDING).

Min

Aggregator to find the minimum value in a stream.

MinForever

The aggregator to keep the minimum value received so far. It is similar to Min aggregator, but it keeps the minimum value it received so far, forever.

Node

The Node object represents a node in the linkedlist data structure.

NotOperatorProcessor
OperandProcessor

Processor to perform operand processor operations.

OrOperatorProcessor

Processor to perform OR stream operations.

OrderBy

The OrderBy object represents the desugared code of order by clause of a streaming query. This object takes 3 parameters to initialize itself. nextProcessPointer is the process method of the next processor. fieldFuncs is an array of function pointers which returns the field values to be sorted. sortTypes is an array of string specifying whether the sort order (ascending or descending). Internally this processor uses a MergeSort object to sort.

OutputProcess

The OutputProcess object is responsible for sending the output (only the events of type streams:CURRENT to the destination stream. It takes a function pointer outputFunc which actually has the logic to process the output.

Scheduler

The Scheduler object is responsible for generating streams:TIMER events at the given timestamp. Once the event is generated, the timer event is passed to the provided processFunc function pointer. The function pointer is the process function of the target processor, to which the timer event should be sent.

Select

The Select object represents the select clause. Anything that comes under select clause like aggregator function invocations are also handled in this processor. Further, grouping of the events (provided by the groupby clause) is also performed in this processor. aggregatorArr is an array of aggregators which are used in the select clause. groupbyFuncArray is an array of function pointers which returns the values being grouped. selectFunc is a function pointer to a lambda function which creates the data field of the output stream event. scopeName is used as a breadcrumb to identify the select clause if there are multiple forever blocks.

Snapshotable

Abstract Snapshotable to be referenced by all snapshotable objects.

SortWindow

The sort window hold a given number of events and emit the expired events in the ordered by the given fields. E.g. from inputStream window sort(10, inputStream.age, "ascending", inputStream.name, "descending") select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The sort window should have three or more odd no of parameters ( windowLength, stream field, order1, stream field, order2, ...)

StateMachine

StateMachine which performs stream pattern processing.

StdDev

The aggregator object to calculate standard deviation.

StreamEvent

The StreamEvent object is a wrapper around the actual data being received to the input stream. If a record is receive to a input stream, that record is converted to a map of anydata values and set that map to a field called data in a new StreamEvent object. StreamEvent is only used internally to transmit event data from one processor to another processor. At the time the record is converted to a map, the timestamp is set. If the record is first received by the input stream, the eventType is set to streams:CURRENT. Other than stream events of type streams:CURRENT, there are 3 types of StreamEvents. They are streams:EXPIRED, streams:RESET, streams:TIMER. An expired event is used to remove the state of its respective current event. A reset event is used to completely wipe the state and a timer event is used to trigger the process method of a particular processor in timely manner.

StreamJoinProcessor

The StreamJoinProcessor object is responsible for performing SQLish joins between two or more streams. The onConditionFunc is the lambda function which represents the where clause in the join clause. The joining happens only if the condition is statified. nextProcessor is the process function of the next processor, which can be a Select processor, Aggregator processor, Having processor.. etc. The lhsStream is the left hand side stream of the join and its attached window is 'lhsWindow. The rhsStream is the right hand side stream of the join and its attached window is 'rhsWindow. The unidirectionalStream stream defines the stream by which the joining is triggered when the events are received. Usually it is lhsStream, in rare cases it can be rhsStream. The joinType is the type of the join and it can be any value defined by streams:JoinType.

Sum

Aggregator to perform summation of values in a stream.

TableJoinProcessor

The TableJoinProcessor object handles joining streams with in-memory tables in ballerina. nextProcessor is the process function of the next processor, which can be a Select processor, Aggregator processor, Having processor.. etc. The streamName is the stream of the join and its attached window is 'windowInstance. The tableName is the name of the table with which the stream is joined. The joinType is the type of the join and it can be any value defined by streams:JoinType.

TimeAccumulatingWindow

The TimeAccumulatingWindow holds the events but if the events are not received for a specific time period, the collected events are released, at the point the time exceeds the given time period from the time when the last event is received. E.g. from inputStream window timeAccum(4000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } Time accumulating window should only have one parameter ( timePeriod)

TimeBatchWindow

This is a batch (tumbling) time window, that holds events arrived between window time periods, and gets updated for every window time. E.g. from inputStream window timeBatch(5000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The timeBatch window should only have one parameter ( windowBatchTime)

TimeLengthWindow

This is a sliding time window that, at a given time holds the last windowLength events that arrived during last windowTime period, and gets updated for every event arrival and expiry. E.g. from inputStream window timeLength(4000, 10) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The timeLength window should only have two parameters ( windowTime, windowLength)

TimeOrderWindow

The TimeOrderWindow sorts the events to be expired by the given timestamp field by comparing that timestamp value to engine system time. E.g. from inputStream window timeOrder(inputStream.timestamp, 4000, true) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } timeOrder window should only have three parameters (timestamp field, windowTime, dropOlderEvents)

TimeWindow

The TimeWindow is a sliding time window, that holds events for that arrived during last windowTime period, and gets updated on every event arrival and expiry. E.g. from inputStream window time(5000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The time window should only have one parameter ( windowTime)

UniqueLengthWindow

This is a length window which only keeps the unique events. E.g. from inputStream window uniqueLength(inputStream.timestamp, 4000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The uniqueLength window should only have two parameters (stream field, windowLength)

Window

The Window abstract objects is the base object for implementing windows in Ballerina streams. The process function contains the logic of processing events when events are received. getCandidateEvents function is used inside the Select object to return the events in the window to perform joining. The window names in the window objects cannot be used in the queries. Always a function which returns the specific window has to be used in streaing query. E.g. If LengthWindow has to be used in a streaming query, the function streams:length has to be used for streaming query without the module identifier streams. An example is shown below.

from inputStream window length(5) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } }

Functions

avg

Returns a Average aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

buildStreamEvent

Creates streams:StreamEvent object array for a record t received by the stream denoted by the name streamNme.

count

Returns a Count aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

createAndOperatorProcessor

Creates and returns a AndOperatorProcessor instance.

createCompoundPatternProcessor

Creates and returns a CompoundPatternProcessor instance.

createFilter

Creates a Filter object and return it.

createFollowedByProcessor

Creates and returns a FollowedByProcessor instance.

createNotOperatorProcessor

Creates and returns a NotOperatorProcessor instance.

createOperandProcessor

Creates and returns a OperandProcessor instance.

createOrOperatorProcessor

Creates and returns a OrOperatorProcessor instance.

createOrderBy

Creates an OrderBy object and return it.

createOutputProcess

Creates and return a OutputProcess object.

createResetStreamEvent

Creates a RESET event from a given event.

createSelect

Creates and returns a select clause.

createStateMachine

Creates and returns a StateMachine object.

createStreamJoinProcessor

Creates a StreamJoinProcessor and returns it.

createTableJoinProcessor

Creates a TableJoinProcessor and return it.

delay

The delay function creates a DelayWindow object and returns it.

distinctCount

Returns a DistinctCount aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

externalTime

The externalTime function creates a ExternalTimeWindow object and returns it.

externalTimeBatch

The externalTimeBatch function creates a ExternalTimeBatchWindow object and returns it.

getStreamEvent

Get the stream event from any? field. This function can only be used only if we are sure that the anyEvent is a streams:StreamEvent.

hopping

The hopping function creates a HoppingWindow object and returns it.

initPersistence

Function to initialize and start snapshotting.

length

The length function creates a LengthWindow object and returns it.

lengthBatch

The lengthBatch function creates a LengthBatchWindow object and returns it.

max

Returns a Max aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

maxForever

Returns a MaxForever aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

min

Returns a Min aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

minForever

Returns a MinForever aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

registerSnapshotable

Function to register Snapshotables.

removeState

Function to clear an existing state.

restoreState

Function to restore state of a given object.

sort

The sort function creates a SortWindow object and returns it.

stdDev

Returns a StdDev aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

sum

Returns a Sum aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

time

The time function creates a TimeWindow object and returns it.

timeAccum

The timeAccum function creates a TimeAccumulatingWindow object and returns it.

timeBatch

The timeBatch function creates a TimeBatchWindow object and returns it.

timeLength

The timeLength function creates a TimeLengthWindow object and returns it.

timeOrder

The timeOrder function creates a TimeOrderWindow object and returns it.

toSnapshottableEvent

Convert a single streams:StreamEvent object to streams:SnapshottableStreamEvent object.

toSnapshottableEvents

Converts a given array of streams:StreamEvent objects to an array of streams:SnapshottableStreamEvent.

toStreamEvent

Convert a single streams:SnapshottableStreamEvent object to streams:StreamEvent object.

toStreamEvents

Converts a given array of snapshotable events to an array of streams:StreamEvent objects.

uniqueLength

The uniqueLength function creates a UniqueLengthWindow object and returns it.

Constants

OUTPUT
RESET

The reset event type

EXPIRED

The expired event type

CURRENT

The current event type.

TIMER

The timer event type.

DEFAULT

The default key to group by if the group by clause is not used in query

DELIMITER
DELIMITER_REGEX
ASCENDING
DESCENDING