In this tutorial we'll cover subscriptions; their creation and parameters.
This section assumes a connection has been established; see the getting started tutorial as necessary.
Subscriptions are used to register ongoing interest in one or more topics. Once subscribed, a client application will receive a full topic refresh (image) followed by incremental updates (delta messages). Processing a refresh message and all updates will give the client a coherent view of a topic.
A canonical subscription requests a subscription to a single topic.
Here's an example of processing a subscription to the CISQ.Q
record from the activ
data source using native
symbology.
/* global activOneApi */
// You may have to update the userId and password fields below.
const user = "__ACTIV_SESSION_USER_ID__";
const password = "__ACTIV_SESSION_PASSWORD__";
const host = "aop-ny4-replay.activfinancial.com";
(async function() {
try {
const session = await activOneApi.connect({
host,
user,
password,
onLogMessage(logType, message) {
console.log(message);
}
});
// Initiate a subscription request.
session.subscribe({
dataSourceId: activOneApi.DataSourceId.activ,
symbologyId: activOneApi.SymbologyId.native,
symbol: "CSIQ.Q"
},
{
onMessage(message) {
if (message.isRefresh) {
processRefresh(message);
} else {
processUpdate(message);
}
},
onSubscriptionStatus(message) {
if (activOneApi.StatusCode.success != message.statusCode) {
if (message.state == SubscriptionState.error) {
// Recoverable error.
console.error(`Error: ${activOneApi.StatusCode[response.statusCode]}`);
}
else (message.state == SubscriptionState.failure) {
// Non recoverable error.
console.error(`Failure: ${activOneApi.StatusCode[response.statusCode]}`);
}
}
}
});
} catch (e) {
console.error(`Failure: ${activOneApi.StatusCode[e]}`);
}
function processRefresh(message) {
console.log("Refresh received for " + message.symbol);
// Do something with the refresh.
}
function processUpdate(message) {
console.log("Update received for " + message.symbol);
// Do something with the update.
}
})();
Try it out in the playground
The subscribe method of a Session returns a SubscriptionHandle. Once subscribed, a client application will receive a full topic refresh (image) followed by incremental updates (delta messages). Processing a refresh message and all updates will give the client a coherent view of a topic.
A query subscription can be used to request a snapshot of all topics that match a given query.
// With a connected session request a subscription of all MSFT options using navigation.
session.querySubscription({
dataSourceId: activOneApi.DataSourceId.activ,
symbol: "symbol=MSFT. navigate=option"
},
{
onMessage (message) {
console.log(`Message received for ${message.symbol}`);
},
onSubscriptionStatus (message) {
if (activOneApi.StatusCode.success == message.statusCode) {
if (message.state == SubscriptionState.complete) {
// All available topics are subscribed and refreshes have been received. This is only seen for query subscriptions.
}
}
else {
if (message.state == SubscriptionState.error) {
// Recoverable error.
console.error(`Error: ${activOneApi.StatusCode[response.statusCode]}`);
}
else (message.state == SubscriptionState.failure) {
// Non recoverable error.
console.error(`Failure: ${activOneApi.StatusCode[response.statusCode]}`);
}
}
}
});
The results of a subscription are notified via the callback handler provided when the subscription was requested. The interface provides the following callbacks:
The data contained in the message can be accessed through the fieldData property. Details of how to use this can be found in the Field Data tutorial.
A message can be either a refresh or an update, this can be found by checking the isRefresh property. A refresh will contain all of the current state of each field in the fieldData property, while an update will only contain the data for the fields that have changed. Updates will also have the eventType and updateType properties populated, while these will be undefined for refreshes.
Once a message has been used it cannot be stored. Its contents will be deleted by the iteration to the next message, returning true in the callback to close the subscription, or by deleting the subscription handle.
The status callback will be called for changes to the state of the subscription request.
The state member can hold the following values:
Pending
The subscription has been requested and is pending. This is always the first callback after the request is made and when recovering from the error state.
Initial refresh messages are received while in this state. Updates can also be received in this state.
Complete
All available topics are subscribed and refreshes have been received.
Error
An error has occured. The subscription will be automatically retried. See Subscription Errors.
Failure
The subscription has failed and will not recover. See Subscription Failures.
Recoverable errors can occur due to:
Not connected
This will be received for each subscription if the connection to the platform has broken, either unexpectedly or due to a disconnect request.
Source not found
This will be received if there are no active sources for a topic. This would occur for a source failure if there are no redundant alternative sources or when the subscription is for a non-existent or deleted topic.
Service not available
This will be received for a query subscription when required platform services for discovering topics are not available.
Navigation not found
This will be received for a query subscription when the navigation records cannot be found for the source symbol.
Interrupted
This will be returned by the platform when a required service connection is unexpectedly broken during subscription setup.
Timeout
This will be returned by the platform when a required service fails to respond in time during subscription setup.
Unrecoverable failures can occur due to:
Some subscriptions are to map topics, as indicated by the value of topicType property being set to TopicType.map. This will be a single topic that contains several entries identified by the mapKey property. The map key is used to uniquely identify each order within a map subscription. Note that map keys are not necessarily unique between subscriptions to different topics.
The property updateType is used to distinguish between whether the fields of a SubscriptionMessage should be used to add, remove or update an entry in the map. Fields that are common to the entire map are sent with an undefined mapKey and an updateType of common.
An example of a map subscription would be one to an order book. The subscription would be to MSFT.Q-MBO
which is for the entire Microsoft Nasdaq order book, and each order entry would be an entry in the map with a unique mapKey.
Parameters define options that will be used when processing subscription updates. For subscriptions this can be used for limiting the event types that result in updates, and also for configuring conflation.
Parameters are relatively expensive to construct so should be reused where possible for efficiency. Rather than defining parameters for each subscription request they are registered using the registerSubscriptionParameters method. This returns a Handle that can be passed to multiple subscribe method calls. The Handle also manages the lifetime of the parameters. The handle should be deleted when it is no longer required, and must be kept open while it is in use by any open subscription.
Here is an example demonstrating how to request a subscription for trade updates only for the CAJ.Q
record.
/* global activOneApi */
// You may have to update the userId and password fields below.
const user = "__ACTIV_SESSION_USER_ID__";
const password = "__ACTIV_SESSION_PASSWORD__";
const host = "aop-ny4-replay.activfinancial.com";
(async function() {
try {
const session = await activOneApi.connect({
host,
user,
password,
onLogMessage(logType, message) {
console.log(message);
}
});
// Register the subscription parameters.
const parameters = session.registerSubscriptionParameters({
eventTypeIncludeFilter: [
activOneApi.EventType.trade
]
});
// Initiate a subscription request with the parameters.
const subscriptionHandle = session.subscribe({
dataSourceId: activOneApi.DataSourceId.activ,
symbologyId: activOneApi.SymbologyId.native,
symbol: "CAJ.Q"
}, parameters);
// Asynchonously iterate over the messages.
for await (const response of subscriptionHandle) {
// Check that the response is a successful message.
if (activOneApi.StatusCode.success === response.statusCode) {
if (response.message.isRefresh) {
processRefresh(response.message);
} else {
processUpdate(response.message);
}
} else {
// Recoverable error.
console.error(`Error: ${activOneApi.StatusCode[response.statusCode]}`);
}
}
} catch (e) {
console.error(`Failure: ${activOneApi.StatusCode[e]}`);
}
finally {
// Delete the parameters once they are no longer needed. They must stay open for as long
// as there are subscriptions using them.
if (undefined !== parameters)
parameters.delete();
}
function processRefresh(message) {
console.log("Refresh received for " + message.symbol);
// Do something with the refresh.
}
function processUpdate(message) {
console.log("Update received for " + message.symbol + " with event type " + activOneApi.EventType[message.eventType]);
// Do something with the update.
}
})();
Try it out in the playground
Not all services support all of the subscription parameters, if an unsupported parameter is specified the subscription request that uses them will fail as not supported.
There are two types of event filter: the include filter and the exclude filter. These both take an array of event types that specify the event types that should be included/excluded from resulting in updates.
const includeParameters = session.registerSubscriptionParameters({
eventTypeIncludeFilter: [
activOneApi.EventType.trade,
activOneApi.EventType.quote
]
});
const excludeParameters = session.registerSubscriptionParameters({
eventTypeExcludeFilter: [
activOneApi.EventType.tradeNonRegular
]
});
Only one of eventTypeIncludeFilter and eventTypeExcludeFilter can be defined in a single set of subscription parameters.
Conflation configuration can be specified using ConflationParameters which require both the type of conflation and the conflation interval.
The API supports two modes of conflation: quote and total.
Quote conflation will merge consecutive quote updates. Flushing of the cached updates is either triggered by a trade, other non-quote event or a configured time period. The following event types will be conflated for ACTIV data sources:
Total conflation will conflate all event types for all data sources.
If multiple event types are conflated the delivered update will have event type set to EventType.none
.
Conflation is applied in the remote Gateway or local APId. For dedicated client installations the conflated event types for quote conflation can be customizedd.
The supported conflation intervals are configured on the Gateway/APId. The requested conflation interval must be one of these values or ConflationInterval.min. If ConflationInterval.min
is requested then the shortest available conflation interval will be used.
Here is an example requesting total conflation with an interval of 5000ms.
const parameters = session.registerSubscriptionParameters({
conflationParameters: {
conflationType: activOneApi.ConflationType.total,
// Conflation interval can be a number or activOneApi.ConflationInterval.min
conflationInterval: 5000
}
});
The SubscriptionHandle can also be used to process results of the snapshot.
This can be async iterated through as an async collection of SubscriptionResults. A subscription result contains a statusCode property that indicates if the next item in the collection is an error.
The result contains an optional message. For non-query subscriptions this will always be populated with a message if the status of the subscription is StatusCode.success
and otherwise it will be undefined. For a query subscription if the status code is success this will either be a message or undefined in the case that this is a result indicating the subscription is in the SubscriptionState.complete state.
The result also contains an optional statusMessage. For non-query subscriptions this will always be undefined. For a query subscription if the status code is non-success this will always be a status message with details of the errored symbol or the failure. If the status code is success and message is undefined this will contain a status message indicating the subscription is in the SubsciptionState.complete state.