import ballerina/io;
import ballerina/runtime;type Employee record {
    int id;
    string name;
};public function main() {
    stream<Employee> employeeStream = new;
    employeeStream.subscribe(printEmployeeName);
    Employee e1 = { id: 1, name: "Jane" };
    Employee e2 = { id: 2, name: "Anne" };
    Employee e3 = { id: 3, name: "John" };    employeeStream.publish(e1);
    employeeStream.publish(e2);
    employeeStream.publish(e3);
    runtime:sleep(1000);
    stream<float> temperatureStream = new;
    temperatureStream.subscribe(printTemperature);
    temperatureStream.publish(28.0);
    temperatureStream.publish(30.1);
    temperatureStream.publish(29.5);
    runtime:sleep(1000);
    stream<anydata> updateStream = new;
    updateStream.subscribe(printEvent);
    updateStream.publish("Hello Ballerina!");
    updateStream.publish(1.0);
    updateStream.publish(e1);
    runtime:sleep(1000);
}
function printEmployeeName(Employee employee) {
    io:println("Employee event received for Employee Name: ",
        employee.name);
}
function printTemperature(float temperature) {
    io:println("Temperature event received: ",
        temperature.toString());
}
function printEvent(anydata event) {
    io:println("Event received: ", event);
}

Streams

The stream type represents a stream of values in which, the constraint type of the stream defines the type of values accepted by the stream. Functions allowed on a stream include publishing values to the stream and/or subscribing to the stream to receive values that are published to the stream. Ballerina Streams is an experimental feature. Therefore, it requires setting the --experimental flag when compiling Ballerina files, which have streaming constructs.

import ballerina/io;
import ballerina/runtime;
type Employee record {
    int id;
    string name;
};
public function main() {
    stream<Employee> employeeStream = new;

Defines a stream, which is constrained by the Employee type.

    employeeStream.subscribe(printEmployeeName);

Subscribes to the employeeStream using a function that accepts Employee values.

    Employee e1 = { id: 1, name: "Jane" };
    Employee e2 = { id: 2, name: "Anne" };
    Employee e3 = { id: 3, name: "John" };

Publishes Employee values to the stream.

    employeeStream.publish(e1);
    employeeStream.publish(e2);
    employeeStream.publish(e3);
    runtime:sleep(1000);

Allows receipt by subscribers. The printEmployeeName() function should be invoked for each published value.

    stream<float> temperatureStream = new;

Defines a stream, which is constrained by the float type.

    temperatureStream.subscribe(printTemperature);

Subscribes to the temperatureStream using a function that accepts float values.

    temperatureStream.publish(28.0);
    temperatureStream.publish(30.1);
    temperatureStream.publish(29.5);

Publishes float values to the stream indicating the temperature.

    runtime:sleep(1000);

Waits for the subscriber to receive the values. The printTemperature() function should be invoked for each published value.

    stream<anydata> updateStream = new;

Defines a stream, which accepts values of the anydata type.

    updateStream.subscribe(printEvent);

Subscribes to the stream using a function that accepts values of the anydata type.

    updateStream.publish("Hello Ballerina!");
    updateStream.publish(1.0);
    updateStream.publish(e1);

Publishes values to the stream.

    runtime:sleep(1000);
}

Wait for the subscriber to receive the values. The printEvent() function should be invoked for each published value.

function printEmployeeName(Employee employee) {
    io:println("Employee event received for Employee Name: ",
        employee.name);
}

This function accepts Employee values and is used to subscribe to a stream.

function printTemperature(float temperature) {
    io:println("Temperature event received: ",
        temperature.toString());
}

This function accepts float values and is used to subscribe to a stream.

function printEvent(anydata event) {
    io:println("Event received: ", event);
}

This function accepts values of anydata type and is used to subscribe to a stream.

# To run this sample, navigate to the directory that contains the
# `.bal` file, and execute the `ballerina run` command.
$ ballerina run --experimental streams.bal
Employee event received for Employee Name: Jane
Employee event received for Employee Name: Anne
Employee event received for Employee Name: John
Temperature event received: 28.0
Temperature event received: 30.1
Temperature event received: 29.5
Event received: Hello Ballerina!
Event received: 1.0
Event received: id=1 name=Jane