Unable to receive dynamic messages
Dear team
Client is developing API to consume data from our ERT in cloud service with Real-Time-SDK-2.0.0.L1.java. When subscribe for update messages, they receive the following message when uploading RICs and enable to subscribe for all US equity RICs. This is not happening if they only subscribe for 1 RIC.
StatusMsg
streamId="17"
domain="MarketPrice Domain"
state="Closed / Ok / None / 'Stream closed for batch'"
serviceName="ELEKTRON_DD"
StatusMsgEnd
Could you please advise:
1. How to subscribe ERT in could using Chain RIC like 0#NASCONS.0?
2. Which demo client can refer to when subscribe multiple RICs in our SDK?
3. How to unsubscribe cerntain RICs after API started?
Please find client configure and code segment below, and let me know if you need any further information.
Thank you.
configuration.xml
<?xml version="1.0" encoding="UTF-8"?>
<system>
<category name="USConfig" description="US登录">
<item name="source" value="0" description="行情源0:云行情,1:专线"></item>
<item name="username" value="xxxxxxx" description="账号"></item>
<item name="password" value="xxxxxx" description="密码"></item>
<item name="clientId" value="xxxxx" description="客户端ID"></item>
<item name="location" value="eu-west-1a" description="源站点"></item>
<item name="keyfile" value="xxxx/test_keystore.jks" description="证书"></item>
<item name="keypasswd" value="xxxxx" description="证书密码"></item>
<item name="USCode" value="0#UNIVERSE.NB" description="美股列表"></item>
<item name="refinitivHost" value="apac-1-t2.streaming-pricing-api.refinitiv.com" description="主机"></item>
</category>
<category name="Disruptor" description="Disruptor">
<item name="queue" value="65536" description="Logon"></item>
<item name="isMutilProducter" value="true" description="Logout"></item>
<item name="consumer" value="4" description="Heartbeat"></item>
</category>
<category name="staticData" description="静态数据相关配置">
<item name="exRightUrl" value="/data/" description="除权信息本地保存的路径"></item>
</category>
</system>
EmaConfig.xml
<Dictionary>
<Name value="Dictionary_1"/>
<!-- dictionaryType is optional: defaulted to ChannelDictionary" -->
<!-- possible values: ChannelDictionary, FileDictionary -->
<!-- if dictionaryType is set to ChannelDictionary, file names are ignored -->
<DictionaryType value="DictionaryType::FileDictionary"/>
<RdmFieldDictionaryFileName value="/xxxx/xxx/RDMFieldDictionary"/>
<EnumTypeDefFileName value="/xxxxx/xxx/enumtype.def"/>
</Dictionary>
ConsumerDemo.java
package com.xx.dc.service;
import com.refinitiv.ema.access.Map;
import com.refinitiv.ema.access.*;
import com.refinitiv.ema.rdm.EmaRdm;
import lombok.Data;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ConsumerDemo {
static class Configuration {
private static java.util.Map items = new HashMap();
private static void loadConfig(String path) {
try {
InputStream is = new FileInputStream(path);
if (is != null) {
SAXReader reader = new SAXReader();
Document document = reader.read(is);
Element systemElement = document.getRootElement();
List catList = systemElement.elements("category");
for (Iterator catIter = catList.iterator(); catIter.hasNext(); ) {
Element catElement = (Element) catIter.next();
String catName = catElement.attributeValue("name");
if (catName == null|| catName.isEmpty()) {
continue;
}
List itemList = catElement.elements("item");
for (Iterator itemIter = itemList.iterator(); itemIter.hasNext(); ) {
Element itemElement = (Element) itemIter.next();
String itemName = itemElement.attributeValue("name");
String value = itemElement.attributeValue("value");
if (itemName != null && !itemName.isEmpty()) {
items.put(catName + "." + itemName, value);
}
}
}
} else {
System.out.println("file is not found!");
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static String getString(String name) {
String value = (String) items.get(name);
return (value == null) ? "" : value;
}
public static String getString(String name, String defaultValue) {
String value = (String) items.get(name);
if (value != null && value.length() > 0)
return value;
else
return defaultValue;
}
public static int getInt(String name) {
String value = getString(name);
try {
return Integer.parseInt(value);
} catch (NumberFormatException ex) {
return 0;
}
}
public static int getInt(String name, int defaultValue) {
String value = getString(name);
try {
return Integer.parseInt(value);
} catch (NumberFormatException ex) {
}
return defaultValue;
}
public static boolean getBoolean(String name) {
String value = getString(name);
return Boolean.valueOf(value).booleanValue();
}
public static double getDouble(String name, double defaultValue) {
String value = getString(name);
try {
return Double.parseDouble(value);
} catch (NumberFormatException ex) {
}
return defaultValue;
}
public static int parseInt(String name) {
String value = (String) items.get(name);
return (value == null || "".equals(value.trim())) ? -1 : Integer.parseInt(value);
}
public static java.util.Map getItems() {
return items;
}
}
class LoginBean {
public LoginBean() {
this.setUserName(Configuration.getString("USConfig.username"));
this.setPassword(Configuration.getString("USConfig.password"));
this.setClientId(Configuration.getString("USConfig.clientId"));
this.setLocation(Configuration.getString("USConfig.location"));
this.setProxyHostName(Configuration.getString("USConfig.proxyHostName"));
this.setProxyPort(Configuration.getString("USConfig.proxyPort"));
this.setProxyUserName(Configuration.getString("USConfig.proxyUserName"));
this.setProxyPassword(Configuration.getString("USConfig.proxyPassword"));
this.setHost(Configuration.getString("USConfig.refinitivHost"));
this.setPort("14002");
}
private String userName;
private String password;
private String clientId;
private String proxyHostName;
private String proxyPort = "-1";
private String proxyUserName;
private String proxyPassword;
private String proxyDomain;
private String proxyKrb5Configfile;
private String host;
private String port;
private String location = "us-east";
}
class StaticDataCallback implements OmmConsumerClient, ServiceEndpointDiscoveryClient {
public final Set<Integer> fids = new HashSet<>();
public static final int nextPageFId = 815;
{
fids.add(800);
fids.add(801);
fids.add(802);
fids.add(803);
fids.add(804);
fids.add(805);
fids.add(806);
fids.add(807);
fids.add(808);
fids.add(809);
/*fids.add(809);
fids.add(809);
fids.add(809);
fids.add(809);*/
fids.add(810);
fids.add(811);
fids.add(812);
fids.add(813);
fids.add(815);
}
public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent consumerEvent) {
System.out.println(refreshMsg.name());
if (0 != refreshMsg.state().statusCode()) {
// LogFactory.getOptionLogger().logInfo(refreshMsg.toString());
}
if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()) {
for (FieldEntry fieldEntry : refreshMsg.payload().fieldList()) {
if (!fids.contains(fieldEntry.fieldId())) {
continue;
}
String value = fieldEntry.load().toString();
if (nextPageFId == fieldEntry.fieldId()) {
if (US_EMPTY.equalsIgnoreCase(value)) {
staticDataListNextPage = US_EMPTY;
} else {
staticDataListNextPage = fieldEntry.ascii().toString();
}
break;
}
if (US_EMPTY.equalsIgnoreCase(value)) {
continue;
}
String code = fieldEntry.ascii().toString();
//if (!USDataCenter.getInstance().containsKey(code)) {
ricCodes.putIfAbsent(code,Boolean.FALSE);
//}
}
}
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent consumerEvent) {
System.out.println(updateMsg.name());
}
public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent consumerEvent) {
System.out.println(statusMsg.name());
}
public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent consumerEvent) {
}
public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent consumerEvent) {
}
public void onAllMsg(Msg msg, OmmConsumerEvent consumerEvent) {
}
public void onSuccess(ServiceEndpointDiscoveryResp serviceEndpointResp, ServiceEndpointDiscoveryEvent event) {
}
public void onError(String errorText, ServiceEndpointDiscoveryEvent event) {
}
}
class DynamicDataCallback implements OmmConsumerClient, ServiceEndpointDiscoveryClient {
public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent consumerEvent) {
System.out.println("refreshMsg:" + refreshMsg.name());
}
public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent consumerEvent) {
System.out.println("updateMsg:" + updateMsg.name());
}
public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent consumerEvent) {
System.out.println("statusMsg:" + statusMsg.toString());
}
public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent consumerEvent) {
}
public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent consumerEvent) {
}
public void onAllMsg(Msg msg, OmmConsumerEvent consumerEvent) {
}
public void onSuccess(ServiceEndpointDiscoveryResp serviceEndpointResp, ServiceEndpointDiscoveryEvent event) {
}
public void onError(String errorText, ServiceEndpointDiscoveryEvent event) {
}
}
private List<Long> staticHandles = new ArrayList<>();
StaticDataCallback staticDataCallback = new StaticDataCallback();
DynamicDataCallback dynamicDataCallback = new DynamicDataCallback();
ServiceEndpointDiscovery serviceDiscovery = EmaFactory.createServiceEndpointDiscovery();
private Map configDb = EmaFactory.createMap();
OmmConsumerConfig ommConsumerConfig;
private String staticDataListNextPage;
LoginBean loginBean;
private OmmConsumer consumer;
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private ConcurrentHashMap<String,Boolean> ricCodes = new ConcurrentHashMap<>();
public static final String US_EMPTY = "(blank data)";
private String confPath;
private String emaConfPath;
private static String DEFAULT_USER_CONFIG_FILE_NAME = "D:\\work\\gitRepo\\sis-gw\\DataCollectionUSAggregator\\conf\\DataCollectionUS\\configuration.xml";
private static String DEFAULT_EMA_CONFIG_FILE_NAME = "D:\\work\\gitRepo\\sis-gw\\DataCollectionUSAggregator\\conf\\DataCollectionUS\\configuration.xml";
public ConsumerDemo(String userConfPath, String emaConfPath) {
if (userConfPath == null || userConfPath.isEmpty())
userConfPath = DEFAULT_USER_CONFIG_FILE_NAME;
if (emaConfPath == null || emaConfPath.isEmpty())
emaConfPath = DEFAULT_EMA_CONFIG_FILE_NAME;
this.confPath = userConfPath;
this.emaConfPath = emaConfPath;
Configuration.loadConfig(userConfPath);
this.staticDataListNextPage = Configuration.getString("USConfig.USCode");
this.ommConsumerConfig = EmaFactory.createOmmConsumerConfig(emaConfPath);
this.loginBean = new LoginBean();
}
public static void main(String[] args) {
ConsumerDemo demo = createDemo(args);
demo.createConsumer();
if (demo.consumer == null) {
System.out.println("consumer is null.");
return;
}
while (true) {
if (US_EMPTY.equalsIgnoreCase(demo.staticDataListNextPage)) {
break;
}
demo.lock.lock();
try {
demo.staticHandles.add(demo.consumer
.registerClient(EmaFactory.createReqMsg().serviceName("ELEKTRON_DD")
.name(demo.staticDataListNextPage), demo.staticDataCallback));
demo.condition.await();
} catch (Exception e) {
e.printStackTrace();
} finally {
demo.lock.unlock();
}
}
try {
if (!demo.staticHandles.isEmpty()) {
demo.unregister(demo.staticHandles);
}
} catch (Exception e) {
e.printStackTrace();
}
int stocksSize = demo.ricCodes.size();
if (stocksSize <= 0) {
System.out.println("code size is 0!");
return;
}
ElementList batch = EmaFactory.createElementList();
OmmArray array = EmaFactory.createOmmArray();
// subscribe all
int num = stocksSize;
Iterator<String> codeIterator = demo.ricCodes.keySet().iterator();
while (codeIterator.hasNext()) {
try {
array.add(EmaFactory.createOmmArrayEntry().ascii(codeIterator.next()));
num--;
if (num % 1000 == 0) {
batch.add(EmaFactory.createElementEntry().array(EmaRdm.ENAME_BATCH_ITEM_LIST, array));
demo.consumer.registerClient(EmaFactory.createReqMsg().serviceName("ELEKTRON_DD").payload(batch), demo.dynamicDataCallback);
array.clear();
batch.clear();
}
} catch (IllegalAccessError e) {
e.printStackTrace();
}
}
if (!array.isEmpty()) {
batch.add(EmaFactory.createElementEntry().array(EmaRdm.ENAME_BATCH_ITEM_LIST, array));
demo.consumer.registerClient(EmaFactory.createReqMsg().serviceName("ELEKTRON_DD").payload(batch), demo.dynamicDataCallback);
}
try {
Thread.sleep(1000000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static ConsumerDemo createDemo(String[] args)
{
try
{
int argsCount = 0;
String userConfPath = null;
String emaConfPath = null;
while (argsCount < args.length)
{
if ("-userConfPath".equals(args[argsCount]))
{
userConfPath = argsCount < (args.length-1) ? args[++argsCount] : null;
++argsCount;
}
else if ("-emaConfPath".equals(args[argsCount]))
{
emaConfPath = argsCount < (args.length-1) ? args[++argsCount] : null;
++argsCount;
}
}
return new ConsumerDemo(userConfPath, emaConfPath);
}
catch (Exception e) {
e.printStackTrace();
}
return null;
}
public boolean createConsumer() {
boolean success = false;
try {
if (consumer == null) {
createProgramaticConfig(configDb);
if ((loginBean.getProxyHostName() == null) || (loginBean.getProxyHostName().isEmpty()) || (loginBean.getProxyPort() == "-1"))
{
consumer = EmaFactory.createOmmConsumer(ommConsumerConfig.consumerName("Consumer_1")
.username(loginBean.getUserName()).password(loginBean.getPassword())
.clientId(loginBean.getClientId())
.config(configDb));
}
else
{
consumer = EmaFactory.createOmmConsumer(ommConsumerConfig.consumerName("Consumer_1")
.username(loginBean.getUserName()).password(loginBean.getPassword())
.clientId(loginBean.getClientId()).config(configDb).tunnelingProxyHostName(loginBean.getProxyHostName())
.tunnelingProxyPort(loginBean.getProxyPort())
.tunnelingCredentialUserName(loginBean.getProxyUserName())
.tunnelingCredentialPasswd(loginBean.getProxyPassword())
.tunnelingCredentialDomain(null)
.tunnelingCredentialKRB5ConfigFile(null));
}
}
success = true;
} catch (Exception e) {
e.printStackTrace();
}
return success;
}
private void createProgramaticConfig(Map configDb)
{
Map elementMap = EmaFactory.createMap();
ElementList elementList = EmaFactory.createElementList();
ElementList innerElementList = EmaFactory.createElementList();
innerElementList.add(EmaFactory.createElementEntry().ascii("Channel", "Channel_1"));
elementMap.add(EmaFactory.createMapEntry().keyAscii("Consumer_1", MapEntry.MapAction.ADD, innerElementList));
innerElementList.clear();
elementList.add(EmaFactory.createElementEntry().map("ConsumerList", elementMap));
elementMap.clear();
configDb.add(EmaFactory.createMapEntry().keyAscii("ConsumerGroup", MapEntry.MapAction.ADD, elementList));
elementList.clear();
innerElementList.add(EmaFactory.createElementEntry().ascii("ChannelType", "ChannelType::RSSL_ENCRYPTED"));
innerElementList.add(EmaFactory.createElementEntry().ascii("Host", loginBean.getHost()));
innerElementList.add(EmaFactory.createElementEntry().ascii("Port", loginBean.getPort()));
innerElementList.add(EmaFactory.createElementEntry().intValue("EnableSessionManagement", 1));
elementMap.add(EmaFactory.createMapEntry().keyAscii("Channel_1", MapEntry.MapAction.ADD, innerElementList));
innerElementList.clear();
elementList.add(EmaFactory.createElementEntry().map("ChannelList", elementMap));
elementMap.clear();
configDb.add(EmaFactory.createMapEntry().keyAscii("ChannelGroup", MapEntry.MapAction.ADD, elementList));
}
public void unregister(List<Long> handles) {
for (Long handle : handles) {
if (consumer != null) {
consumer.unregister(handle);
}
}
}
}
Best Answer
-
Hello @Xiaorong.Xu
You and the client can download the "Message API Java HTML Documentation Set (zip file)" which includes the Reference Guide document and all other EMA Java documents on the EMA Java Document page.
The EMA API and the server have their internal heartbeat message which they use to monitor the connection. The application cannot access this heartbeat message but the application can detect the disconnection in various ways. Please see more detail in the following posts:
1
Answers
-
Hello @Xiaorong.Xu
Please be informed that the Status message "Closed / Ok / None / 'Stream closed for batch'" is expected behavior when the application sends the Batch request message to Refinitiv Real-Time. Please see more explanation in this Why `Stream closed for batch` happens? post.
Question 1: How to subscribe ERT in could use Chain RIC like 0#NASCONS.0?
Answer:
Is the 0#NASCONS.0 valid? I have tried it and it returns '**The record could not be found' which means the item does not exist or invalid name. However, the client can find the explanation of how to subscribe Chain (and its sub-chain) data in the following resources:
Question 2: Which demo client can refer to when subscribing multiple RICs in our SDK?
Answer: The client can refer to EMA Java ex370_MP_Batch example. The other useful resource is EMA Java Batch and View features article.
Question 3: How to unsubscribe certain RICs after API started?
Answer: The application can call OmmConsumer.unregister(<item handle>) function to close the item request. Please see an example code in EMA Java ex300_MP_Close example.
1 -
Thank you very much Wasin. Do we have any document on what possible error message client could receive if like batch registering failed or disconnected and so on? So client can be prepared if any outage.
In addition is there any guide on how to design auto reconnection on client API please?
Thank you
0 -
Hello @Xiaorong.Xu
Unfortunately, we do not have those kinds of documents (list of error messages and auto reconnection).
Please be informed that most error messages are generated from the infrastructure side, from the Exchange, or from the Feed. The application should check the item or login stream health from the StatusMsg.OmmState object via the onStatusMsg() callback function.
- OmmState.DataState: represents item data state.
- OmmState.StreamState: represents item stream state.
- OmmState.StatusCode: represents status code.
- OmmState.statusText(): get status text information
The client can find more detail about OmmState, DataState, StreamState, and StatusCode objects in EMA Java Reference Guide document (<RTSDK Java package>\Java\Ema\Docs\refman folder).
1 -
Thank you Wasin. The latest SDK Real-Time-SDK-2.0.0.L1.java.zip does not contain the document mentioned. Could you please provide the link where we can download it?
Could you please also confirm if we have any heartbeat message in the feed that client can monitor and trigger alert with? Thank you.
0 -
@wasin.waeosri Thank you very much! Problem resolved!
0
Categories
- All Categories
- 6 AHS
- 39 Alpha
- 161 App Studio
- 4 Block Chain
- 4 Bot Platform
- 16 Connected Risk APIs
- 47 Data Fusion
- 30 Data Model Discovery
- 608 Datastream
- 1.3K DSS
- 577 Eikon COM
- 4.9K Eikon Data APIs
- 7 Electronic Trading
- Generic FIX
- 7 Local Bank Node API
- Trading API
- 2.7K Elektron
- 1.3K EMA
- 236 ETA
- 519 WebSocket API
- 33 FX Venues
- 10 FX Market Data
- 1 FX Post Trade
- 1 FX Trading - Matching
- 12 FX Trading – RFQ Maker
- 5 Intelligent Tagging
- 2 Legal One
- 20 Messenger Bot
- 2 Messenger Side by Side
- 9 ONESOURCE
- 7 Indirect Tax
- 59 Open Calais
- 264 Open PermID
- 39 Entity Search
- 2 Org ID
- PAM
- PAM - Logging
- 8.4K Private Comments
- 6 Product Insight
- Project Tracking
- ProView
- ProView Internal
- 20 RDMS
- 1.4K Refinitiv Data Platform
- 367 Refinitiv Data Platform Libraries
- 3 Refinitiv Due Diligence
- LSEG Due Diligence Portal API
- 3 Refinitiv Due Dilligence Centre
- Rose's Space
- 1.1K Screening
- 18 Qual-ID API
- 13 Screening Deployed
- 23 Screening Online
- 10 World-Check Customer Risk Screener
- 990 World-Check One
- 44 World-Check One Zero Footprint
- 45 Side by Side Integration API
- Test Space
- 3 Thomson One Smart
- 1.2K TR Internal
- Global Hackathon 2015
- 2 Specialists Who Code
- 10 TR Knowledge Graph
- 150 Transactions
- 142 REDI API
- 1.7K TREP APIs
- 4 CAT
- 21 DACS Station
- 117 Open DACS
- 1.1K RFA
- 103 UPA
- 172 TREP Infrastructure
- 224 TRKD
- 886 TRTH
- 5 Velocity Analytics
- 5 Wealth Management Web Services
- 59 Workspace SDK
- 9 Element Framework
- 5 Grid
- 13 World-Check Data File
- Yield Book Analytics
- 46 中文论坛