import ballerina/http;
import ballerina/log;
import ballerina/math;
import ballerina/transactions;
@http:ServiceConfig {
    basePath: "/"
}
service InitiatorService on new http:Listener(8080) {    @http:ResourceConfig {
        methods: ["GET"],
        path: "/"
    }
    resource function init(http:Caller conn, http:Request req) {
        http:Response res = new;
        log:printInfo("Initiating transaction...");
        transaction {
            log:printInfo("Started transaction: " +
                             transactions:getCurrentTransactionId());
            boolean successful = callBusinessService();
            successful = callBusinessService();
            if (successful) {
                res.statusCode = http:STATUS_OK;
            } else {
                res.statusCode = http:STATUS_INTERNAL_SERVER_ERROR;
                abort;
            }
        } committed {
            log:printInfo("Initiated transaction committed");
        } aborted {
            log:printInfo("Initiated transaction aborted");
        }        var result = conn->respond(res);
        if (result is error) {
            log:printError("Could not send response back to client",
                            err = result);
        } else {
            log:printInfo("Sent response back to client");
        }
    }
}function callBusinessService() returns boolean {
    http:Client participantEP = new("http://localhost:8889/stockquote/update");    float price = <int>math:randomInRange(200, 250) + math:random();
    json bizReq = { symbol: "GOOG", price: price };
    http:Request req = new;
    req.setJsonPayload(bizReq);
    var result = participantEP->post("", req);
    log:printInfo("Got response from bizservice");
    if (result is error) {
	log:printInfo(result.toString());
        return false;
    }  else {
        return (result.statusCode == http:STATUS_OK);
    }
}import ballerina/http;
import ballerina/io;
import ballerina/log;
import ballerina/transactions;
@http:ServiceConfig {
    basePath: "/stockquote"
}
service ParticipantService on new http:Listener(8889) {    @http:ResourceConfig {
        path: "/update"
    }
    @transactions:Participant {
        oncommit:printParticipantCommit,
        onabort:printParticipantAbort
    }
    resource function updateStockQuote(http:Caller conn, http:Request req) {
        log:printInfo("Received update stockquote request");
        http:Response res = new;
        log:printInfo("Joined transaction: " +
                       transactions:getCurrentTransactionId());        var updateReq = <@untainted> req.getJsonPayload();
        if (updateReq is json) {
            string msg =
                io:sprintf("Update stock quote request received.
                            symbol:%s, price:%s",
                            updateReq.symbol,
                            updateReq.price);
            log:printInfo(msg);            json jsonRes = { "message": "updating stock" };
            res.statusCode = http:STATUS_OK;
            res.setJsonPayload(jsonRes);
        } else {
            res.statusCode = http:STATUS_INTERNAL_SERVER_ERROR;
            res.setPayload(updateReq.reason());
            log:printError("Payload error occurred!", err = updateReq);
        }        var result = conn->respond(res);
        if (result is error) {
            log:printError("Could not send response back to initiator",
                                 err = result);
        } else {
            log:printInfo("Sent response back to initiator");
        }
    }
}
function printParticipantAbort(string transactionId) {
    log:printInfo("Participated transaction: " + transactionId + " aborted");
}
function printParticipantCommit(string transactionId) {
    log:printInfo("Participated transaction: " + transactionId + " committed");
}

Distributed Transactions

To improve the reliability of microservice-based applications, a series of microservice invocations can be bound into a single unit of work. The underlying mechanisms are that of infection and agreement protocols. This example demonstrates the Ballerina distributed transactions protocol in action. Ballerina transactions are at experimental stage, please use –experimental flag to enable them.

import ballerina/http;
import ballerina/log;
import ballerina/math;
import ballerina/transactions;
@http:ServiceConfig {
    basePath: "/"
}
service InitiatorService on new http:Listener(8080) {

This is the initiator of the distributed transaction.

    @http:ResourceConfig {
        methods: ["GET"],
        path: "/"
    }
    resource function init(http:Caller conn, http:Request req) {
        http:Response res = new;
        log:printInfo("Initiating transaction...");
        transaction {

When the transaction statement starts, a distributed transaction context is created.

            log:printInfo("Started transaction: " +
                             transactions:getCurrentTransactionId());

Print the current transaction ID

            boolean successful = callBusinessService();

When a participant is called, the transaction context is propagated, and that participant gets infected and joins the distributed transaction.

            successful = callBusinessService();
            if (successful) {
                res.statusCode = http:STATUS_OK;
            } else {
                res.statusCode = http:STATUS_INTERNAL_SERVER_ERROR;
                abort;
            }

call the same service again.

        } committed {
            log:printInfo("Initiated transaction committed");
        } aborted {
            log:printInfo("Initiated transaction aborted");
        }

As soon as the transaction block ends, the 2-phase commit coordination protocol will run. All participants are prepared and depending on the joint outcome, either a notify commit or notify abort will be sent to the participants.

        var result = conn->respond(res);
        if (result is error) {
            log:printError("Could not send response back to client",
                            err = result);
        } else {
            log:printInfo("Sent response back to client");
        }
    }
}
function callBusinessService() returns boolean {
    http:Client participantEP = new("http://localhost:8889/stockquote/update");
    float price = <int>math:randomInRange(200, 250) + math:random();
    json bizReq = { symbol: "GOOG", price: price };
    http:Request req = new;
    req.setJsonPayload(bizReq);
    var result = participantEP->post("", req);
    log:printInfo("Got response from bizservice");
    if (result is error) {
	log:printInfo(result.toString());
        return false;
    }  else {
        return (result.statusCode == http:STATUS_OK);
    }
}
# To start the `initiator` service, navigate to the directory that contains the
# `.bal` file and use the `ballerina run` command.
$ ballerina run --experimental initiator.bal
[ballerina/http] started HTTP/WS listener 10.100.7.118:64337
[ballerina/http] started HTTP/WS listener 0.0.0.0:8080
import ballerina/http;
import ballerina/io;
import ballerina/log;
import ballerina/transactions;
@http:ServiceConfig {
    basePath: "/stockquote"
}
service ParticipantService on new http:Listener(8889) {

This service is a participant in the distributed transaction. It will get infected when it receives a transaction context from the participant. The transaction context, in the HTTP case, will be passed in as custom HTTP headers.

    @http:ResourceConfig {
        path: "/update"
    }
    @transactions:Participant {
        oncommit:printParticipantCommit,
        onabort:printParticipantAbort
    }
    resource function updateStockQuote(http:Caller conn, http:Request req) {
        log:printInfo("Received update stockquote request");
        http:Response res = new;

At the beginning of the transaction statement, since a transaction context has been received, this service will register with the initiator as a participant.

        log:printInfo("Joined transaction: " +
                       transactions:getCurrentTransactionId());

Print the current transaction ID

        var updateReq = <@untainted> req.getJsonPayload();
        if (updateReq is json) {
            string msg =
                io:sprintf("Update stock quote request received.
                            symbol:%s, price:%s",
                            updateReq.symbol,
                            updateReq.price);
            log:printInfo(msg);
            json jsonRes = { "message": "updating stock" };
            res.statusCode = http:STATUS_OK;
            res.setJsonPayload(jsonRes);
        } else {
            res.statusCode = http:STATUS_INTERNAL_SERVER_ERROR;
            res.setPayload(updateReq.reason());
            log:printError("Payload error occurred!", err = updateReq);
        }
        var result = conn->respond(res);
        if (result is error) {
            log:printError("Could not send response back to initiator",
                                 err = result);
        } else {
            log:printInfo("Sent response back to initiator");
        }
    }
}
function printParticipantAbort(string transactionId) {
    log:printInfo("Participated transaction: " + transactionId + " aborted");
}

The participant function that will get called when the distributed transaction is aborted

function printParticipantCommit(string transactionId) {
    log:printInfo("Participated transaction: " + transactionId + " committed");
}

The participant function that will get called when the distributed transaction is committed

# To start the `participant` service, navigate to the directory that contains the
# `.bal` file and use the `ballerina run` command.
#Run this command to start the `participant` service.
$ ballerina run  --experimental participant.bal
[ballerina/http] started HTTP/WS listener 10.100.1.182:54774
[ballerina/http] started HTTP/WS listener localhost:8889
#Run this curl command to invoke the services. 
$ curl -v localhost:8080
Outputs similar to the following should be available from the initiator and participant.
Output from initiator:
[ballerina/http] started HTTP/WS listener 10.100.7.118:64337
[ballerina/http] started HTTP/WS listener 0.0.0.0:8080
2019-09-04 12:04:58,404 INFO  [ballerina/log] - Initiating transaction...
2019-09-04 12:04:58,419 INFO  [ballerina/log] - Created transaction: 2c9584d2-e5d0-4dad-bfc1-946cc4c00f6a
2019-09-04 12:04:58,423 INFO  [ballerina/log] - Started transaction: 2c9584d2-e5d0-4dad-bfc1-946cc4c00f6a:$anon$.$0
2019-09-04 12:04:58,833 INFO  [ballerina/log] - Registered remote participant: f677d80b-710f-4e9c-9d46-c30782086581:$anon$.$0 for transaction: 2c9584d2-e5d0-4dad-bfc1-946cc4c00f6a
2019-09-04 12:04:58,876 INFO  [ballerina/log] - Got response from bizservice
2019-09-04 12:04:58,877 INFO  [ballerina/log] - Running 2-phase commit for transaction: 2c9584d2-e5d0-4dad-bfc1-946cc4c00f6a:$anon$.$0
2019-09-04 12:04:58,899 INFO  [ballerina/log] - Preparing remote participant: f677d80b-710f-4e9c-9d46-c30782086581:$anon$.$0
2019-09-04 12:04:58,927 INFO  [ballerina/log] - Remote participant: f677d80b-710f-4e9c-9d46-c30782086581:$anon$.$0 prepared
2019-09-04 12:04:58,928 INFO  [ballerina/log] - Notify(commit) remote participant: http://10.100.7.118:64553/balcoordinator/participant/2pc/$anon$.$0
2019-09-04 12:04:58,941 INFO  [ballerina/log] - Remote participant: f677d80b-710f-4e9c-9d46-c30782086581:$anon$.$0 committed
2019-09-04 12:04:58,942 INFO  [ballerina/log] - Initiated transaction committed
2019-09-04 12:04:58,945 INFO  [ballerina/log] - Sent response back to client
Output from participant:
[ballerina/http] started HTTP/WS listener 10.100.7.118:64553
[ballerina/http] started HTTP/WS listener 0.0.0.0:8889
2019-09-04 12:04:58,683 INFO  [ballerina/log] - Registering for transaction: 2c9584d2-e5d0-4dad-bfc1-946cc4c00f6a:$anon$.$0 with coordinator: http://10.100.7.118:64337/balcoordinator/initiator/$anon$.$0/register
2019-09-04 12:04:58,847 INFO  [ballerina/log] - Registered with coordinator for transaction: 2c9584d2-e5d0-4dad-bfc1-946cc4c00f6a
2019-09-04 12:04:58,848 INFO  [ballerina/log] - participant registered: 2c9584d2-e5d0-4dad-bfc1-946cc4c00f6a
2019-09-04 12:04:58,851 INFO  [ballerina/log] - Received update stockquote request
2019-09-04 12:04:58,852 INFO  [ballerina/log] - Joined transaction: 2c9584d2-e5d0-4dad-bfc1-946cc4c00f6a:$anon$.$0
2019-09-04 12:04:58,858 INFO  [ballerina/log] - Update stock quote request received.
                            symbol:GOOG, price:249.7214191144464
2019-09-04 12:04:58,872 INFO  [ballerina/log] - Sent response back to initiator
2019-09-04 12:04:58,917 INFO  [ballerina/log] - Prepare received for transaction: 2c9584d2-e5d0-4dad-bfc1-946cc4c00f6a:$anon$.$0
2019-09-04 12:04:58,918 INFO  [ballerina/log] - Prepared transaction: 2c9584d2-e5d0-4dad-bfc1-946cc4c00f6a
2019-09-04 12:04:58,935 INFO  [ballerina/log] - Notify(commit) received for transaction: 2c9584d2-e5d0-4dad-bfc1-946cc4c00f6a:$anon$.$0
2019-09-04 12:04:58,936 INFO  [ballerina/log] - Participated transaction: 2c9584d2-e5d0-4dad-bfc1-946cc4c00f6a:$anon$.$0 committed