Invalid attempt to submit PostMsg on tunnel stream.

Hello guys, encountered a problem. Language Java. I have created OmmConsumer, and opened a tunnel stream like in tutorials.

The goal is to contribute price, and to get other stock prices. Both of the goals fails.

Here is the code of tunnel opening

consumer = EmaFactory.createOmmConsumer(
consumerConfig.consumerName("Consumer_1")
.username("***")
.password("***")
);

ClassOfService cos = EmaFactory.createClassOfService()
.authentication(EmaFactory.createCosAuthentication()
.type(CosAuthentication.CosAuthenticationType.OMM_LOGIN))
.dataIntegrity(EmaFactory.createCosDataIntegrity()
.type(CosDataIntegrity.CosDataIntegrityType.RELIABLE))
.flowControl(EmaFactory.createCosFlowControl()
.type(CosFlowControl.CosFlowControlType.BIDIRECTIONAL)
.recvWindowSize(1200))
.guarantee(EmaFactory.createCosGuarantee()
.type(CosGuarantee.CosGuaranteeType.NONE));

TunnelStreamRequest tsr = EmaFactory.createTunnelStreamRequest()
.classOfService(cos)
.domainType(EmaRdm.MMT_SYSTEM)
.name("TUNNEL1")
.serviceName("DDS_TRCE");

val tunnelStreamConsumerClient = OmmConsumerClientImpl.builder()
.consumerName("tunnel-stream")
.statusMsgConsumer(this::handleConnectionStatus)
.log(log)
.build();

this.tunnelStreamHandle = consumer.registerClient(tsr, tunnelStreamConsumerClient);

That part completes successfully. Here are responses from trace:

<REFRESH domainType="LOGIN" streamId="1" containerType="NO_DATA" flags="0x68 (HAS_MSG_KEY|SOLICITED|REFRESH_COMPLETE)" groupId="0" State: Open/Ok/None - text: "" dataSize="0">
    <key flags="0x26 (HAS_NAME|HAS_NAME_TYPE|HAS_ATTRIB)" name="GE-A-10001662-3-4170" nameType="1" attribContainerType="ELEMENT_LIST">
        <attrib>
            <elementList flags="0x08 (HAS_STANDARD_DATA)">
                <elementEntry name="ApplicationId" dataType="ASCII_STRING" data="256"/>
                <elementEntry name="ApplicationName" dataType="ASCII_STRING" data="ema"/>
                <elementEntry name="Position" dataType="ASCII_STRING" data="192.168.109.1/***-windows-pc"/>
                <elementEntry name="SingleOpen" dataType="UINT" data="0"/>
                <elementEntry name="SupportOMMPost" dataType="UINT" data="1"/>
            </elementList>
        </attrib>
    </key>
    <dataBody>
    </dataBody>
</REFRESH>
<!-- Incoming Reactor message -->
<!-- java.nio.channels.SocketChannel[connected local=/192.168.31.74:50321 remote=contrib1-amers1.platform.refinitiv.com/3.218.233.177:443] -->
<!-- Thu Oct 01 12:18:25 UTC 2020 -->
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<REFRESH domainType="SOURCE" streamId="2" containerType="MAP" flags="0x68 (HAS_MSG_KEY|SOLICITED|REFRESH_COMPLETE)" groupId="0" State: Open/Ok/None - text: "" dataSize="195">
    <key flags="0x08 (HAS_FILTER)" filter="11"/>
    <dataBody>
        <map flags="0x00" countHint="0" keyPrimitiveType="UINT" containerType="FILTER_LIST" >
            <mapEntry flags="0x00" action="ADD" key="10" >
                <filterList containerType="ELEMENT_LIST" countHint="0" flags="0x00">
                    <filterEntry id="1" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                        <elementList flags="0x08 (HAS_STANDARD_DATA)">
                            <elementEntry name="Name" dataType="ASCII_STRING" data="DDS_TRCE"/>
                            <elementEntry name="Vendor" dataType="ASCII_STRING" data="Refinitiv"/>
                            <elementEntry name="Capabilities" dataType="ARRAY">
                                <array itemLength="0" primitiveType="UINT">
                                    <arrayEntry data="127"/>
                                    <arrayEntry data="6"/>
                                </array>
                            </elementEntry>
                            <elementEntry name="QoS" dataType="ARRAY">
                                <array itemLength="0" primitiveType="QOS">
                                    <arrayEntry Qos: Realtime/TickByTick/Static - timeInfo: 0 - rateInfo: 0/>
                                </array>
                            </elementEntry>
                            <elementEntry name="SupportsOutOfBandSnapshots" dataType="UINT" data="0"/>
                        </elementList>
                    </filterEntry>
                    <filterEntry id="2" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                        <elementList flags="0x08 (HAS_STANDARD_DATA)">
                            <elementEntry name="ServiceState" dataType="UINT" data="1"/>
                            <elementEntry name="Status" dataType="STATE" State: Open/Ok/None - text: ""/>
                        </elementList>
                    </filterEntry>
                    <filterEntry id="4" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                        <elementList flags="0x08 (HAS_STANDARD_DATA)">
                            <elementEntry name="OpenLimit" dataType="UINT" data="3"/>
                            <elementEntry name="OpenWindow" dataType="UINT" data="1"/>
                        </elementList>
                    </filterEntry>
                </filterList>
            </mapEntry>
        </map>
    </dataBody>
</REFRESH>
2020-10-01T15:18:27.359 | DEBUG | pool-3-thread-1 | RefinitivConnection-main: Consumer: tunnel-stream StatusMsg: StatusMsg
    streamId="5"
    domain="System Domain"
    privateStream
    state="Open / Ok / None / 'Login accepted by host 169c5654fd6c via ip-10-62-11-92'"
    name="TUNNEL1"
    serviceId="10"
    serviceName="DDS_TRCE"
StatusMsgEnd

So when the tunnel stream is opened. First attempt to contribute:

FieldList nestedFieldList = EmaFactory.createFieldList();
nestedFieldList.add(EmaFactory.createFieldEntry().floatValue(393, 1));

UpdateMsg nestedUpdateMsg = EmaFactory.createUpdateMsg()
.streamId(1)
.name("DDS_TRCE")
.payload(nestedFieldList);

consumer.submit(EmaFactory.createPostMsg()
.streamId(1)
.postId(1)
.domainType(EmaRdm.MMT_MARKET_PRICE)
.solicitAck(true)
.complete(true)
.payload(nestedUpdateMsg), tunnelStreamHandle);

The response of failed post msg.

2020-10-01T15:43:13.173 | ERROR | pool-3-thread-1 | c.t.ema.access.OmmConsumerImpl: loggerMsg
    ClientName: TunnelItem
    Severity: Error
    Text:    Invalid attempt to submit PostMsg on tunnel stream.
loggerMsgEnd

 | MDC: 

<!-- Outgoing Reactor message -->
<!-- java.nio.channels.SocketChannel[connected local=/192.168.31.74:50631 remote=contrib1-amers1.platform.refinitiv.com/3.218.233.177:443] -->
<!-- Thu Oct 01 12:43:13 UTC 2020 -->
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<CLOSE domainType="SYSTEM" streamId="3" containerType="NO_DATA" flags="0x00" dataSize="0">
    <dataBody>
    </dataBody>
</CLOSE>

2020-10-01T15:43:13.175 | DEBUG | pool-3-thread-1 | RefinitivConnection-main: Consumer: tunnel-stream StatusMsg: StatusMsg
    streamId="5"
    domain="System Domain"
    privateStream
    state="Closed, Recoverable / Suspect / None / 'TunnelStream.readMsg() Exception: loggerMsg
    ClientName: TunnelItem
    Severity: Error
    Text:    Invalid attempt to submit PostMsg on tunnel stream.
loggerMsgEnd

'"
    name="TUNNEL1"
    serviceId="10"
    serviceName="DDS_TRCE"
StatusMsgEnd
 event: com.thomsonreuters.ema.access.OmmEventImpl@1461a361 | MDC:

So the question is, how to do it right?

And here is another situation. Attempting to get prices from the same tunnel stream after it is successfully authorized:

consumer.registerClient(
EmaFactory.createReqMsg().name("IBM.N").serviceId(1),
OmmConsumerClientImpl.builder().consumerName("request").log(log).build(),
1,
tunnelStreamHandle);

Response:

<!-- Outgoing Reactor message -->
<!-- java.nio.channels.SocketChannel[connected local=/192.168.31.74:50690 remote=contrib1-amers1.platform.refinitiv.com/3.213.90.178:443] -->
<!-- Thu Oct 01 12:47:19 UTC 2020 -->
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<GENERIC domainType="SYSTEM" streamId="3" containerType="MSG" flags="0x19 (HAS_EXTENDED_HEADER|HAS_SEQ_NUM|MESSAGE_COMPLETE)" seqNum="2" dataSize="20">
    <extendedHeader data="0100"/>
    <dataBody>
        <REQUEST domainType="MARKET_PRICE" streamId="6" containerType="NO_DATA" flags="0x04 (STREAMING)" dataSize="0">
            <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="1" name="IBM.N"/>
            <dataBody>
            </dataBody>
        </REQUEST>
    </dataBody>
</GENERIC>


<!-- Incoming Reactor message -->
<!-- java.nio.channels.SocketChannel[connected local=/192.168.31.74:50690 remote=contrib1-amers1.platform.refinitiv.com/3.213.90.178:443] -->
<!-- Thu Oct 01 12:47:19 UTC 2020 -->
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<GENERIC domainType="SYSTEM" streamId="3" containerType="NO_DATA" flags="0x11 (HAS_EXTENDED_HEADER|MESSAGE_COMPLETE)" dataSize="0">
    <extendedHeader data="0200 0000 0002 0000 000F 4240"/>
    <dataBody>
    </dataBody>
</GENERIC>


<!-- Incoming Reactor message -->
<!-- java.nio.channels.SocketChannel[connected local=/192.168.31.74:50690 remote=contrib1-amers1.platform.refinitiv.com/3.213.90.178:443] -->
<!-- Thu Oct 01 12:47:19 UTC 2020 -->
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<STATUS domainType="MARKET_PRICE" streamId="6" containerType="NO_DATA" flags="0x60 (HAS_STATE|CLEAR_CACHE)" State: Closed/Suspect/Not entitled - text: "Stream not found" dataSize="0">
    <dataBody>
    </dataBody>
</STATUS>


<!-- Outgoing Reactor message -->
<!-- java.nio.channels.SocketChannel[connected local=/192.168.31.74:50690 remote=contrib1-amers1.platform.refinitiv.com/3.213.90.178:443] -->
<!-- Thu Oct 01 12:47:19 UTC 2020 -->
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<CLOSE domainType="MARKET_PRICE" streamId="6" containerType="NO_DATA" flags="0x00" dataSize="0">
    <dataBody>
    </dataBody>
</CLOSE>


<!-- Incoming Reactor message -->
<!-- java.nio.channels.SocketChannel[connected local=/192.168.31.74:50690 remote=contrib1-amers1.platform.refinitiv.com/3.213.90.178:443] -->
<!-- Thu Oct 01 12:47:19 UTC 2020 -->
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<STATUS domainType="MARKET_PRICE" streamId="6" containerType="NO_DATA" flags="0x60 (HAS_STATE|CLEAR_CACHE)" State: Closed/Suspect/Not entitled - text: "Message ignored as tunnel stream has been established" dataSize="0">
    <dataBody>
    </dataBody>
</STATUS>


<!-- Outgoing Reactor message -->
<!-- java.nio.channels.SocketChannel[connected local=/192.168.31.74:50690 remote=contrib1-amers1.platform.refinitiv.com/3.213.90.178:443] -->
<!-- Thu Oct 01 12:47:19 UTC 2020 -->
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<CLOSE domainType="MARKET_PRICE" streamId="6" containerType="NO_DATA" flags="0x00" dataSize="0">
    <dataBody>
    </dataBody>
</CLOSE>

As a piece of additional information: dictionaries was taken from github examples.

Would appreciate any help.

Best Answer

  • Gurpreet
    Answer ✓

    Hello @dkamburov,

    I am not sure which sample code you are using, but please download and try this fully functional sample for contributions over the internet. Select EMA tutorial source code and RCC sample.

    The code that you have pasted is quite different from the one I have in the sample. Once you get reference sample working, you can then import the code snippets into your own application.

    You can get the contribution endpoints from product support documents.

    Also note that the tunnel stream created to these VIP endpoints can only accept contributions. You will have to subscribe to your instrument from your market data system, or use Eikon desktop to view your contributed data.

Answers

  • Thanks for your answer. Tried the example from your link. The main problem was that I used

    CosAuthentication.CosAuthenticationType.OMM_LOGIN

    Instead of

    CosAuthentication.CosAuthenticationType.NOT_REQUIRED

    And then as I haven't login manually I haven't got a login substream which should be used for contribution.

    So with a working example code, the first response is:

    2020-10-02T12:48:54.594 | DEBUG | pool-3-thread-1 | RefinitivConnection-main: Consumer: login-consumer2 AckMsg: AckMsg
        streamId="6"
        domain="MarketPrice Domain"
        ackId="1"
        nackCode="SymbolUnknown"
        text="Symbol unknown"
    AckMsgEnd
     event: com.thomsonreuters.ema.access.OmmEventImpl@4b4ee8cc | MDC:

    And then all responses are like that.

    2020-10-02T12:48:55.010 | DEBUG | pool-3-thread-1 | RefinitivConnection-main: Consumer: login-consumer2 AckMsg: AckMsg
        streamId="6"
        domain="MarketPrice Domain"
        ackId="2"
        nackCode="DeniedBySource"
        text="RIC has been disabled"
    AckMsgEnd
     event: com.thomsonreuters.ema.access.OmmEventImpl@4b4ee8cc | MDC:


  • Hi @dkamburov

    Can you confirm which RIC code you are trying to post to - for example in one of the earlier snippets it appears that you are trying to post to a RIC call DDS_TRCE - which is a service name?

    UpdateMsg nestedUpdateMsg = EmaFactory.createUpdateMsg()
    .streamId(1)
    .name("DDS_TRCE")
    .payload(nestedFieldList);

    You should speak to your Market Data team or Refinitiv account team for a list of Valid RICs you can contribute to.

  • Yes you are right, I was using a service name instead of RIC

    .name("DDS_TRCE")

    Thanks for your help, will contact a Refinitiv account team for a valid RICs.

  • Please try the RICs I mentioned in my comment above

    TEST1=LEVL

    TEST2=LEVL

    TEST3=LEVL