Below example provides insight into the Streaming mechanism detailed in Open API Streaming page and describes how to perform below activities:
- How to make a streaming connection.
- Adding a new subscription.
- Removing the subscription.
Initiate Streaming:
On clicking "Connect" button, application makes a streaming connection with the server. As soon as streaming connection is made, contextId is shown and "Subscribe" button is enabled. This is the code, which makes a streaming connection.
//Call to initiate a connection
stream = new OpenApiStream(streamingUrl, token);
stream.observeState(stateChangeCallback);
stream.connect();
//Details - When we initialize new OpenApiStream object connection via signalR is made.
this.connection = signalR(this.connectUrl, { authorization: this.token, context: this.contextId });
Make Subscription:
Once we have connection established, "Subscribe" button becomes visible. On clicking "Subscribe" button, one can make the subscription. In this example, we have made subscription to the "Post /openapi/trade/v1/infoprices/subscriptions/active" endpoint. In the response, a snapshot is returned which is shown in "Snapshot" div and streamed data from the server is shown in the "Messages" div. Below is the code snippet, which makes subscription.
// Subscribe call for streaming and updates.
OpenApiSampleLogic.subscribe(this.endpoint.value, JSON.parse(this.arguments.value), onActive, onMessage)
//Details - Actual code snippet from example which eventually call api to create subscription
function enable(callback)
{
if (typeof callback !== 'function') throw new TypeError('callback must be a function');
this.onSubscribeCallback = callback;
var payload = {
Arguments: this.arguments,
ContextId: this.stream.contextId,
ReferenceId: this.referenceId
};
if (this.refreshRate) payload.RefreshRate = this.refreshRate;
if (this.tag) payload.Tag = this.tag;
$.ajax({
method: "POST",
url: this.baseUrl + '/active',
headers: {
authorization: 'Bearer ' + this.stream.token,
accept: 'application/json',
'content-type': 'application/json'
},
data: JSON.stringify(payload),
xhrFields: {
withCredentials: true
}
}).then(onSubscribeSuccess.bind(this), onSubscribeFail.bind(this));
}
Handling Snapshot and stream data:
Below code block depicts how to handle snapshot and updates:
function subscribe(subscribeEndpoint, args, snapshotCallback, msgCallback)
{
// Create a subscription representation
subscription = new OpenApiSubscription(stream, subscriptionBaseUrl + subscribeEndpoint, args, 1000);
//observe messages for the subscription
subscription.observe(function (message)
{
// If the type of message is object then we received data
if (typeof message == 'object')
{
// Update the snapshots data
for (var i = 0, item; item = message[i]; i++)
{
//Uic not found in map, this is an add situation
if (!snapshotItemMap[item.Uic])
{
snapshotItemMap[item.Uic] = item;
snapshot.push(item);
}
//this is a delete situation
else if (item.__meta_deleted)
{
//remove object from map
delete snapshotItemMap[item.Uic];
//remove object from snapshot
for (var j = 0, l = snapshot.length; j < l; j++)
{
if (snapshot[j].Uic == item.Uic)
{
snapshot.splice(j, 1);
}
}
}
//apply delta
else
{
applyDelta(snapshotItemMap[item.Uic], item);
}
}
// Send the data back to the ui
msgCallback(this.subscription.referenceId, this.subscription.messages.length, message, snapshot);}
//subscription has been reset, create a new and hook it up
else if (message == 'reset')
{
subscribe(this.subscribeEndpoint, this.args, this.snapshotCallback, this.msgCallback);
}
else if (message == 'disabled')
{
// Subscription was disabled permanently on the serverside, don't try to re-subscribe
msgCallback(this.subscription.referenceId, this.subscription.messages.length, message);
}
//message is either 'slow' or 'heartbeat'
else
{
msgCallback(this.subscription.referenceId, this.subscription.messages.length, message);
}
}.bind({
subscription: subscription,
subscribeEndpoint: subscribeEndpoint,
args: args,
snapshotCallback: snapshotCallback,
msgCallback: msgCallback
}));
// Enable the subscription and get the initial data snapshot
subscription.enable(function (snapshotData)
{
// "this" is set to the OpenApiSubscription object
var refId = this.referenceId;
var numberOfMessages = this.messages.length;
snapshot = snapshotData;
// Create a map with Uics as keys for easilyb update the snapshot when messages arrive
for (var i = 0, item; item = snapshot[i]; i++)
{
snapshotItemMap[item.Uic] = item;
}
// Send data back to the ui code
snapshotCallback(refId, numberOfMessages, snapshot);
}.bind(subscription));
}
Delete Subscription
To delete the active subscription, click on the "Unsubscribe" button. On deleting subscription, the server stops streaming data to the client so there is no update on the screen. Below is the code which delete subscription. As we can see it is a simple delete call to the subscription server with contextId and referenceid in the Url as route parameters.
//Delete Subscription
OpenApiSampleLogic.unsubscribe();
//Details - Actual code snippet from example which eventually call api to delete subscription
$.ajax ({
method: "DELETE",
url: this.baseUrl + '/' + this.stream.contextId + '/' + this.referenceId,
headers: {
authorization: 'Bearer ' + this.stream.token,
},
xhrFields: {
withCredentials: true
}
});



