Below example provides insight into the Streaming mechanism detailed in Open API Streaming page and describes how to perform below activities:
- How to make a streaming connection.
- Adding a new subscription.
- Removing the subscription.
Initiate Streaming:
On clicking "Connect" button, application makes a streaming connection with the server. As soon as streaming connection is made, contextId is shown and "Subscribe" button is enabled. This is the code, which makes a streaming connection.
//Call to initiate a connection stream = new OpenApiStream(streamingUrl, token); stream.observeState(stateChangeCallback); stream.connect(); //Details - When we initialize new OpenApiStream object connection via signalR is made. this.connection = signalR(this.connectUrl, { authorization: this.token, context: this.contextId });
Make Subscription:
Once we have connection established, "Subscribe" button becomes visible. On clicking "Subscribe" button, one can make the subscription. In this example, we have made subscription to the "Post /openapi/trade/v1/infoprices/subscriptions/active" endpoint. In the response, a snapshot is returned which is shown in "Snapshot" div and streamed data from the server is shown in the "Messages" div. Below is the code snippet, which makes subscription.
// Subscribe call for streaming and updates. OpenApiSampleLogic.subscribe(this.endpoint.value, JSON.parse(this.arguments.value), onActive, onMessage) //Details - Actual code snippet from example which eventually call api to create subscription function enable(callback) { if (typeof callback !== 'function') throw new TypeError('callback must be a function'); this.onSubscribeCallback = callback; var payload = { Arguments: this.arguments, ContextId: this.stream.contextId, ReferenceId: this.referenceId }; if (this.refreshRate) payload.RefreshRate = this.refreshRate; if (this.tag) payload.Tag = this.tag; $.ajax({ method: "POST", url: this.baseUrl + '/active', headers: { authorization: 'Bearer ' + this.stream.token, accept: 'application/json', 'content-type': 'application/json' }, data: JSON.stringify(payload), xhrFields: { withCredentials: true } }).then(onSubscribeSuccess.bind(this), onSubscribeFail.bind(this)); }
Handling Snapshot and stream data:
Below code block depicts how to handle snapshot and updates:
function subscribe(subscribeEndpoint, args, snapshotCallback, msgCallback) { // Create a subscription representation subscription = new OpenApiSubscription(stream, subscriptionBaseUrl + subscribeEndpoint, args, 1000); //observe messages for the subscription subscription.observe(function (message) { // If the type of message is object then we received data if (typeof message == 'object') { // Update the snapshots data for (var i = 0, item; item = message[i]; i++) { //Uic not found in map, this is an add situation if (!snapshotItemMap[item.Uic]) { snapshotItemMap[item.Uic] = item; snapshot.push(item); } //this is a delete situation else if (item.__meta_deleted) { //remove object from map delete snapshotItemMap[item.Uic]; //remove object from snapshot for (var j = 0, l = snapshot.length; j < l; j++) { if (snapshot[j].Uic == item.Uic) { snapshot.splice(j, 1); } } } //apply delta else { applyDelta(snapshotItemMap[item.Uic], item); } } // Send the data back to the ui msgCallback(this.subscription.referenceId, this.subscription.messages.length, message, snapshot);} //subscription has been reset, create a new and hook it up else if (message == 'reset') { subscribe(this.subscribeEndpoint, this.args, this.snapshotCallback, this.msgCallback); } else if (message == 'disabled') { // Subscription was disabled permanently on the serverside, don't try to re-subscribe msgCallback(this.subscription.referenceId, this.subscription.messages.length, message); } //message is either 'slow' or 'heartbeat' else { msgCallback(this.subscription.referenceId, this.subscription.messages.length, message); } }.bind({ subscription: subscription, subscribeEndpoint: subscribeEndpoint, args: args, snapshotCallback: snapshotCallback, msgCallback: msgCallback })); // Enable the subscription and get the initial data snapshot subscription.enable(function (snapshotData) { // "this" is set to the OpenApiSubscription object var refId = this.referenceId; var numberOfMessages = this.messages.length; snapshot = snapshotData; // Create a map with Uics as keys for easilyb update the snapshot when messages arrive for (var i = 0, item; item = snapshot[i]; i++) { snapshotItemMap[item.Uic] = item; } // Send data back to the ui code snapshotCallback(refId, numberOfMessages, snapshot); }.bind(subscription)); }
Delete Subscription
To delete the active subscription, click on the "Unsubscribe" button. On deleting subscription, the server stops streaming data to the client so there is no update on the screen. Below is the code which delete subscription. As we can see it is a simple delete call to the subscription server with contextId and referenceid in the Url as route parameters.
//Delete Subscription OpenApiSampleLogic.unsubscribe(); //Details - Actual code snippet from example which eventually call api to delete subscription $.ajax ({ method: "DELETE", url: this.baseUrl + '/' + this.stream.contextId + '/' + this.referenceId, headers: { authorization: 'Bearer ' + this.stream.token, }, xhrFields: { withCredentials: true } });