Local test environment to test data posting to the Refinitiv Contribution Channel (RCC) directly
I have followed the steps from https://developers.refinitiv.com/en/api-catalog/refinitiv-real-time-opnsrc/rt-sdk-java/tutorials/ema-consumer/ema-consumer-posting-data-to-contribution-channel and now I can post data via consumer directly to Refinitiv Contribution Channel (RCC). So no questions about that. However, I need to write some integration tests for our posting consumer, but I cannot figure out how I can emulate a connection behind a tunnel. There are a lot of test examples how a consumer can connect directly to a provider and post some data. For example:
./gradlew runiProvider340 - starts interactive provider and accepts posting message from a consumer
and when I run
./gradlew runConsumer341 - messages are posted to the provider.
All is good, except the fact that this method to test is not working if I use tunnelled connection. EMA Java SDK provides a test for the tunnel posting consumer ex440_System_TunStrm, but how I am supposed to test it? against what? My believe that my tests are supposed to be run offline, tests also are run in parallel which is another reason not to open a real/true connection to Refiniv servers on each test run. Is the there a way to test let's say ex440_System_TunStrm consumer or the one mentioned in the article offline as part of local integration tests?
Best Answer
-
@Jirapongse
After some tries/errors and debug I was able to adapt com.refinitiv.eta.valueadd.examples.provider to accept a tunnelled connection. Now It can also accepts posts and sends Acks in response. The only file I have to modify wascom.refinitiv.eta.valueadd.examples.provider.TunnelStreamHandler
I have attached it for the reference - TunnelStreamHandler.txt
It works. However, there 2 things which are still not clear.
1. Upon receiving the first login request (line 119) Original code checks that authentication type equals toClassesOfService.AuthenticationTypes.OMM_LOGIN
But this is not the case actual authentication type sent by Consumer equals to
ClassesOfService.AuthenticationTypes.NOT_REQUIRED
So I have to modify the initial check to
if (event.containerType() == DataTypes.MSG &&
(event.tunnelStream().classOfService().authentication().type() == ClassesOfService.AuthenticationTypes.OMM_LOGIN
|| event.tunnelStream().classOfService().authentication().type() == ClassesOfService.AuthenticationTypes.NOT_REQUIRED
) && _msg.domainType() == DomainTypes.LOGIN && _msg.msgClass() == MsgClasses.REQUEST)2. Upon above change I was able to get almost proper Refresh Message im my Consumer app. I am saying "almost" because serviceId was absent. I tried several ways to add/encode missing serviceId into the LoginRequest but I could not figure out the way to do it. It appears that it is somehow removed during encoding phase. My current 'live' Consumer which works with 'true' online Refinitiv endpoints relies on this check to mark connection as valid. I mean 'true' Refinitiv endpoints have serviceId in that Refresh messages while the test is not.
@Override
public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event) {
log.info("Refresh message {}", refreshMsg);
if (refreshMsg.domainType() == EmaRdm.MMT_LOGIN
&& refreshMsg.state().streamState() == OmmState.StreamState.OPEN
&& refreshMsg.state().dataState() == OmmState.DataState.OK) {
// Login accepted, app can post data now
loginStreamID = refreshMsg.streamId();
log.info("System refresh, loginStreamID: {}", loginStreamID);
if (refreshMsg.hasServiceId()
|| refreshMsg.toString().contains("CONTRIBUTION_TUNNEL")
) {In other words, onRefreshMsg method is called twice, the first call contains no serviceId in refreshMsg while the second call contains serviceId and I use it as the trigger to mark the connection 'active', right? However, now for an emulated connection for the test via
com.refinitiv.eta.valueadd.examples.provider.TunnelStreamHandler
I have to add this not very reliable check
|| refreshMsg.toString().contains("CONTRIBUTION_TUNNEL")
as well. Is there a better way to know that the cannel I request has actually accepted the connection/tunnel? Modifying Consumer code to comply with the emulated provider is not a good way to proceed. How can I inject serviceId during LoginRequest reply (second Message Refresh Consumer callback)?
0
Answers
-
Hi @nariman.ab,
Tunnel is a stream with end to end line of sight authentication and guaranteed delivery. I am not sure how you can unit test it without a connection.
Either ways, you can get some hints of how unit tests are done in the RTSDK here and here.
0 -
You can run the ETA VA Provider example (com.refinitiv.eta.valueadd.examples.provider). It supports tunnel streams. However, you need to modify it to handle post messages.
2 -
Thank you for the hint. I had also to change Cannel type from RSSL_ENCRYPTED to RSSL_SOCKET otherwise I got the following error on Provider side
ConnectionType:encrypted
portNo: 14002
interfaceName: null
serviceName: DIRECT_FEED
serviceId: 1
enableRTT: false
protocolList: rssl.rwf, tr_json2, rssl.json.v2
keyfile: null
keypasswd: null
Server bound on port 14002
Connection down: Channel java.nio.channels.SocketChannel[connected local=/127.0.0.1:14002 remote=/127.0.0.1:61780]
Error text: Error initializing channel: errorId=-1 text=no cipher suites in commonI guess I have to configure trust store, certificates and all that to make it work. so I just decided to set the channel type to RSSL_SOCKET in order not to deal with certificates, right?
And now I have the progress!
Provider:
ConnectionType:socket
portNo: 14002
interfaceName: null
serviceName: DIRECT_FEED
serviceId: 1
enableRTT: false
protocolList: rssl.rwf, tr_json2, rssl.json.v2
Server bound on port 14002
Connection up!
Server sun.nio.ch.ServerSocketChannelImpl[/[0:0:0:0:0:0:0:0]:14002]: New client on Channel java.nio.channels.SocketChannel[connected local=/127.0.0.1:14002 remote=/127.0.0.1:61632]
Received Login Request for Username: xxx
Received Source Directory Request
Received Login Close for StreamId 1
Closing login stream id '1' with user name: xxx
Connection down: Channel java.nio.channels.SocketChannel[connected local=/127.0.0.1:14002 remote=/127.0.0.1:61632]
Error text: SocketChannel.read returned -1 (end-of-stream)
Closing source directory stream id '2' with service name: DIRECT_FEEDConsumer:
I 230324 092657.596 [main] OmmConsumerImpl - loggerMsg
ClientName: ChannelCallbackClient
Severity: Info
Text: Received ChannelUp event on channel channel_local-a
Instance Name consumer3_1
Component Version etaj3.6.8.L1.all.rrg
loggerMsgEnd
I 230324 092657.660 [main] ConsumerClient - Connecting...
I 230324 092657.662 [main] ConsumerClient - Requesting tunnel stream CONTRIBUTION_TUNNEL
I 230324 092657.664 [main] Publisher - waiting for queue processing end
I 230324 092657.664 [main] Publisher - queue processing is empty
I 230324 092657.664 [pool-4-thread-1] ConsumerClient - Refresh message RefreshMsg
streamId="1"
domain="Login Domain"
solicited
RefreshComplete
state="Open / Ok / None / 'Login accepted by host localhost'"
itemGroup="00 00"
name="xxx"
nameType="1"
Attrib dataType="ElementList"
ElementList
ElementEntry name="ApplicationId" dataType="Ascii" value="256"
ElementEntry name="ApplicationName" dataType="Ascii" value="ETA Provider"
ElementEntry name="Position" dataType="Ascii" value="192.168.178.60/xxx-xxxxx"
ElementEntry name="SingleOpen" dataType="UInt" value="0"
ElementEntry name="SupportOMMPost" dataType="UInt" value="1"
ElementEntry name="SupportBatchRequests" dataType="UInt" value="1"
ElementListEnd
AttribEnd
RefreshMsgEnd
I 230324 092657.664 [pool-4-thread-1] ConsumerClient - System refresh, loginStreamID: 1
Exception in thread "main" Exception Type='OmmInvalidUsageException', Text='Attempt to get servieId while it is not set.', Error Code='-4048'
at com.refinitiv.ema.access.TunnelStreamRequestImpl.ommIUExcept(TunnelStreamRequestImpl.java:221)
at com.refinitiv.ema.access.TunnelStreamRequestImpl.serviceId(TunnelStreamRequestImpl.java:162)
at com.refinitiv.ema.access.TunnelItem.submit(ItemCallbackClient.java:579)
at com.refinitiv.ema.access.TunnelItem.open(ItemCallbackClient.java:448)
at com.refinitiv.ema.access.ItemCallbackClient.registerClient(ItemCallbackClient.java:2294)
at com.refinitiv.ema.access.OmmBaseImpl.registerClient(OmmBaseImpl.java:566)
at com.refinitiv.ema.access.OmmConsumerImpl.registerClient(OmmConsumerImpl.java:267)So the next step is to figure out what servieId it complains about...
0 -
Yes, you need certificate files for establishing encrypted connections. It could be a self-signed certificate.
You can enable tracing in the EMA so you see the messages sent between the applications by setting the XmlTraceToStdout configuration to 1 in the consumer.
For example:
<Consumer>
<!-- Name is mandatory -->
<Name value="Consumer_1"/>
<!-- Channel is optional: defaulted to "RSSL_SOCKET + localhost + 14002" -->
<!-- Channel or ChannelSet may be specified -->
<Channel value="Channel_1"/>
<!-- Dictionary is optional: defaulted to "ChannelDictionary" -->
<Dictionary value="Dictionary_1"/>
<XmlTraceToStdout value="1"/>
</Consumer>1 -
I think I'v got the answer on my second question. I just have not to use LoginRefresh (delete its complete mapping) and map/encode directly to RefreshMsg.
However, the first question from the above post is still open.0 -
@nariman.ab
Thank you for sharing the code.
The Provider example code may not be able to handle all use cases for tunnel streams. It is typically used to test the Consumer example so it needs to be modified to support RCC applications.
0
Categories
- All Categories
- 6 AHS
- 37 Alpha
- 161 App Studio
- 4 Block Chain
- 4 Bot Platform
- 16 Connected Risk APIs
- 47 Data Fusion
- 30 Data Model Discovery
- 608 Datastream
- 1.3K DSS
- 577 Eikon COM
- 4.9K Eikon Data APIs
- 7 Electronic Trading
- Generic FIX
- 7 Local Bank Node API
- Trading API
- 2.7K Elektron
- 1.3K EMA
- 236 ETA
- 519 WebSocket API
- 33 FX Venues
- 10 FX Market Data
- 1 FX Post Trade
- 1 FX Trading - Matching
- 12 FX Trading – RFQ Maker
- 5 Intelligent Tagging
- 2 Legal One
- 20 Messenger Bot
- 2 Messenger Side by Side
- 9 ONESOURCE
- 7 Indirect Tax
- 59 Open Calais
- 264 Open PermID
- 39 Entity Search
- 2 Org ID
- PAM
- PAM - Logging
- 8.4K Private Comments
- 6 Product Insight
- Project Tracking
- ProView
- ProView Internal
- 20 RDMS
- 1.4K Refinitiv Data Platform
- 367 Refinitiv Data Platform Libraries
- 3 Refinitiv Due Diligence
- LSEG Due Diligence Portal API
- 3 Refinitiv Due Dilligence Centre
- Rose's Space
- 1.1K Screening
- 18 Qual-ID API
- 13 Screening Deployed
- 23 Screening Online
- 10 World-Check Customer Risk Screener
- 990 World-Check One
- 44 World-Check One Zero Footprint
- 45 Side by Side Integration API
- Test Space
- 3 Thomson One Smart
- 1.2K TR Internal
- Global Hackathon 2015
- 2 Specialists Who Code
- 10 TR Knowledge Graph
- 150 Transactions
- 142 REDI API
- 1.7K TREP APIs
- 4 CAT
- 21 DACS Station
- 117 Open DACS
- 1.1K RFA
- 103 UPA
- 172 TREP Infrastructure
- 224 TRKD
- 886 TRTH
- 5 Velocity Analytics
- 5 Wealth Management Web Services
- 59 Workspace SDK
- 9 Element Framework
- 5 Grid
- 13 World-Check Data File
- Yield Book Analytics
- 46 中文论坛