Local test environment to test data posting to the Refinitiv Contribution Channel (RCC) directly

I have followed the steps from https://developers.refinitiv.com/en/api-catalog/refinitiv-real-time-opnsrc/rt-sdk-java/tutorials/ema-consumer/ema-consumer-posting-data-to-contribution-channel and now I can post data via consumer directly to Refinitiv Contribution Channel (RCC). So no questions about that. However, I need to write some integration tests for our posting consumer, but I cannot figure out how I can emulate a connection behind a tunnel. There are a lot of test examples how a consumer can connect directly to a provider and post some data. For example:

./gradlew runiProvider340 - starts interactive provider and accepts posting message from a consumer

and when I run

./gradlew runConsumer341 - messages are posted to the provider.

All is good, except the fact that this method to test is not working if I use tunnelled connection. EMA Java SDK provides a test for the tunnel posting consumer ex440_System_TunStrm, but how I am supposed to test it? against what? My believe that my tests are supposed to be run offline, tests also are run in parallel which is another reason not to open a real/true connection to Refiniv servers on each test run. Is the there a way to test let's say ex440_System_TunStrm consumer or the one mentioned in the article offline as part of local integration tests?

Best Answer

  • nariman.ab
    Answer ✓

    @Jirapongse
    After some tries/errors and debug I was able to adapt com.refinitiv.eta.valueadd.examples.provider to accept a tunnelled connection. Now It can also accepts posts and sends Acks in response. The only file I have to modify was

    com.refinitiv.eta.valueadd.examples.provider.TunnelStreamHandler

    I have attached it for the reference - TunnelStreamHandler.txt
    It works. However, there 2 things which are still not clear.
    1. Upon receiving the first login request (line 119) Original code checks that authentication type equals to

    ClassesOfService.AuthenticationTypes.OMM_LOGIN 

    But this is not the case actual authentication type sent by Consumer equals to

    ClassesOfService.AuthenticationTypes.NOT_REQUIRED

    So I have to modify the initial check to

    if (event.containerType() == DataTypes.MSG &&
    (event.tunnelStream().classOfService().authentication().type() == ClassesOfService.AuthenticationTypes.OMM_LOGIN
    || event.tunnelStream().classOfService().authentication().type() == ClassesOfService.AuthenticationTypes.NOT_REQUIRED
    ) && _msg.domainType() == DomainTypes.LOGIN && _msg.msgClass() == MsgClasses.REQUEST)

    2. Upon above change I was able to get almost proper Refresh Message im my Consumer app. I am saying "almost" because serviceId was absent. I tried several ways to add/encode missing serviceId into the LoginRequest but I could not figure out the way to do it. It appears that it is somehow removed during encoding phase. My current 'live' Consumer which works with 'true' online Refinitiv endpoints relies on this check to mark connection as valid. I mean 'true' Refinitiv endpoints have serviceId in that Refresh messages while the test is not.

    @Override
    public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event) {
    log.info("Refresh message {}", refreshMsg);
    if (refreshMsg.domainType() == EmaRdm.MMT_LOGIN
    && refreshMsg.state().streamState() == OmmState.StreamState.OPEN
    && refreshMsg.state().dataState() == OmmState.DataState.OK) {
    // Login accepted, app can post data now
    loginStreamID = refreshMsg.streamId();
    log.info("System refresh, loginStreamID: {}", loginStreamID);
    if (refreshMsg.hasServiceId()
    || refreshMsg.toString().contains("CONTRIBUTION_TUNNEL")
    ) {

    In other words, onRefreshMsg method is called twice, the first call contains no serviceId in refreshMsg while the second call contains serviceId and I use it as the trigger to mark the connection 'active', right? However, now for an emulated connection for the test via

    com.refinitiv.eta.valueadd.examples.provider.TunnelStreamHandler

    I have to add this not very reliable check

    || refreshMsg.toString().contains("CONTRIBUTION_TUNNEL")

    as well. Is there a better way to know that the cannel I request has actually accepted the connection/tunnel? Modifying Consumer code to comply with the emulated provider is not a good way to proceed. How can I inject serviceId during LoginRequest reply (second Message Refresh Consumer callback)?



Answers

  • Hi @nariman.ab,

    Tunnel is a stream with end to end line of sight authentication and guaranteed delivery. I am not sure how you can unit test it without a connection.

    Either ways, you can get some hints of how unit tests are done in the RTSDK here and here.


  • @nariman.ab

    You can run the ETA VA Provider example (com.refinitiv.eta.valueadd.examples.provider). It supports tunnel streams. However, you need to modify it to handle post messages.

  • @Jirapongse

    Thank you for the hint. I had also to change Cannel type from RSSL_ENCRYPTED to RSSL_SOCKET otherwise I got the following error on Provider side

    ConnectionType:encrypted
    portNo: 14002
    interfaceName: null
    serviceName: DIRECT_FEED
    serviceId: 1
    enableRTT: false
    protocolList: rssl.rwf, tr_json2, rssl.json.v2
    keyfile: null
    keypasswd: null

    Server bound on port 14002

    Connection down: Channel java.nio.channels.SocketChannel[connected local=/127.0.0.1:14002 remote=/127.0.0.1:61780]
    Error text: Error initializing channel: errorId=-1 text=no cipher suites in common

    I guess I have to configure trust store, certificates and all that to make it work. so I just decided to set the channel type to RSSL_SOCKET in order not to deal with certificates, right?

    And now I have the progress!

    Provider:

    ConnectionType:socket
    portNo: 14002
    interfaceName: null
    serviceName: DIRECT_FEED
    serviceId: 1
    enableRTT: false
    protocolList: rssl.rwf, tr_json2, rssl.json.v2

    Server bound on port 14002

    Connection up!
    Server sun.nio.ch.ServerSocketChannelImpl[/[0:0:0:0:0:0:0:0]:14002]: New client on Channel java.nio.channels.SocketChannel[connected local=/127.0.0.1:14002 remote=/127.0.0.1:61632]

    Received Login Request for Username: xxx

    Received Source Directory Request

    Received Login Close for StreamId 1
    Closing login stream id '1' with user name: xxx

    Connection down: Channel java.nio.channels.SocketChannel[connected local=/127.0.0.1:14002 remote=/127.0.0.1:61632]
    Error text: SocketChannel.read returned -1 (end-of-stream)

    Closing source directory stream id '2' with service name: DIRECT_FEED

    Consumer:

    I 230324 092657.596 [main] OmmConsumerImpl - loggerMsg
    ClientName: ChannelCallbackClient
    Severity: Info
    Text: Received ChannelUp event on channel channel_local-a
    Instance Name consumer3_1
    Component Version etaj3.6.8.L1.all.rrg
    loggerMsgEnd


    I 230324 092657.660 [main] ConsumerClient - Connecting...
    I 230324 092657.662 [main] ConsumerClient - Requesting tunnel stream CONTRIBUTION_TUNNEL
    I 230324 092657.664 [main] Publisher - waiting for queue processing end
    I 230324 092657.664 [main] Publisher - queue processing is empty
    I 230324 092657.664 [pool-4-thread-1] ConsumerClient - Refresh message RefreshMsg
    streamId="1"
    domain="Login Domain"
    solicited
    RefreshComplete
    state="Open / Ok / None / 'Login accepted by host localhost'"
    itemGroup="00 00"
    name="xxx"
    nameType="1"
    Attrib dataType="ElementList"
    ElementList
    ElementEntry name="ApplicationId" dataType="Ascii" value="256"
    ElementEntry name="ApplicationName" dataType="Ascii" value="ETA Provider"
    ElementEntry name="Position" dataType="Ascii" value="192.168.178.60/xxx-xxxxx"
    ElementEntry name="SingleOpen" dataType="UInt" value="0"
    ElementEntry name="SupportOMMPost" dataType="UInt" value="1"
    ElementEntry name="SupportBatchRequests" dataType="UInt" value="1"
    ElementListEnd
    AttribEnd
    RefreshMsgEnd

    I 230324 092657.664 [pool-4-thread-1] ConsumerClient - System refresh, loginStreamID: 1
    Exception in thread "main" Exception Type='OmmInvalidUsageException', Text='Attempt to get servieId while it is not set.', Error Code='-4048'
    at com.refinitiv.ema.access.TunnelStreamRequestImpl.ommIUExcept(TunnelStreamRequestImpl.java:221)
    at com.refinitiv.ema.access.TunnelStreamRequestImpl.serviceId(TunnelStreamRequestImpl.java:162)
    at com.refinitiv.ema.access.TunnelItem.submit(ItemCallbackClient.java:579)
    at com.refinitiv.ema.access.TunnelItem.open(ItemCallbackClient.java:448)
    at com.refinitiv.ema.access.ItemCallbackClient.registerClient(ItemCallbackClient.java:2294)
    at com.refinitiv.ema.access.OmmBaseImpl.registerClient(OmmBaseImpl.java:566)
    at com.refinitiv.ema.access.OmmConsumerImpl.registerClient(OmmConsumerImpl.java:267)

    So the next step is to figure out what servieId it complains about...

  • Yes, you need certificate files for establishing encrypted connections. It could be a self-signed certificate.

    You can enable tracing in the EMA so you see the messages sent between the applications by setting the XmlTraceToStdout configuration to 1 in the consumer.

    For example:

            <Consumer>
                <!-- Name is mandatory    -->
                <Name value="Consumer_1"/>
                <!-- Channel is optional: defaulted to "RSSL_SOCKET + localhost + 14002" -->
                <!-- Channel or ChannelSet may be specified -->
                <Channel value="Channel_1"/>
                <!-- Dictionary is optional: defaulted to "ChannelDictionary" -->
                <Dictionary value="Dictionary_1"/>
                <XmlTraceToStdout value="1"/>
            </Consumer>
  • @Jirapongse

    I think I'v got the answer on my second question. I just have not to use LoginRefresh (delete its complete mapping) and map/encode directly to RefreshMsg.


    However, the first question from the above post is still open.

  • @nariman.ab

    Thank you for sharing the code.

    The Provider example code may not be able to handle all use cases for tunnel streams. It is typically used to test the Consumer example so it needs to be modified to support RCC applications.