Failed to receive notification on ADH unavailability when connection is active using java reactor ap

I am connecting to reuters using java reactor api for consuming market data, and have supplied two hosts using below piece of code :

 dispatchOptions.maxMessages(1);
connectOptions.reconnectAttemptLimit(trepConnectionConfig.connectMaxRetries()); // attempt to recover forever
connectOptions.reconnectMinDelay(trepConnectionConfig.connectMinDelayInSec()); // 1 second minimum
connectOptions.reconnectMaxDelay(trepConnectionConfig.connectMaxDelayInSec()); // 60 second maximum

this.connectOptions.connectionList().add(connectInfo);

final ConnectOptions primaryConnectOptions = this.connectOptions.connectionList().get(0).connectOptions();
primaryConnectOptions.majorVersion(Codec.majorVersion());
primaryConnectOptions.minorVersion(Codec.minorVersion());
primaryConnectOptions.connectionType(ConnectionTypes.SOCKET);
primaryConnectOptions.unifiedNetworkInfo().address(trepConnectionConfig.host());
primaryConnectOptions.unifiedNetworkInfo().serviceName(trepConnectionConfig.port());
primaryConnectOptions.guaranteedOutputBuffers(trepConnectionConfig.buffers());


if (StringUtils.isNotBlank(trepConnectionConfig.backupHost()) && StringUtils.isNotBlank(trepConnectionConfig.backupPort())) {
this.connectOptions.connectionList().add(backUpConnectInfo);
final ConnectOptions backUpConnectOptions = this.connectOptions.connectionList().get(1).connectOptions();
backUpConnectOptions.majorVersion(Codec.majorVersion());
backUpConnectOptions.minorVersion(Codec.minorVersion());
backUpConnectOptions.connectionType(ConnectionTypes.SOCKET);
backUpConnectOptions.unifiedNetworkInfo().address(trepConnectionConfig.backupHost());
backUpConnectOptions.unifiedNetworkInfo().serviceName(trepConnectionConfig.backupPort());
backUpConnectOptions.guaranteedOutputBuffers(trepConnectionConfig.buffers());
}

1. Connect to reuters - both ADH and ADS are up

2. send valid subscription

3. App is able to receive data

4.While doing test with connectivity team, stop only ADH server - which results in unavailability of IDN_SELECTFEED service.

5. Java consumer APP should have received service unavailability or some kind of notification, however app did not receive any notification.


Could someone please assist here as this is blocking onboarding of application.

Tagged:

Best Answer

  • Jirapongse
    Answer ✓

    @rinki121

    When the service is down, the application will receive the group stale update in the source directory update message which indicates all items in this group are suspect (stale). The application can fan out this message to all items in the group to indicate that the items are stale.

    <!-- rwfMajorVer="14" rwfMinorVer="1" -->
    <UPDATE domainType="SOURCE" streamId="2" containerType="MAP" flags="0xC0 (DO_NOT_CACHE|DO_NOT_CONFLATE)" updateType="0" dataSize="116">
        <dataBody>
            <map flags="0x00" countHint="0" keyPrimitiveType="UINT" containerType="FILTER_LIST" >
                <mapEntry flags="0x00" action="UPDATE" key="4594" >
                    <filterList containerType="ELEMENT_LIST" countHint="0" flags="0x00">
                        <filterEntry id="3" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                            <elementList flags="0x08 (HAS_STANDARD_DATA)">
                                <elementEntry name="Group" dataType="BUFFER" data="1"/>
    <elementEntry name="Status" dataType="STATE" State: Open/Suspect/None - text: "A23: Service has gone down. Will recall when service becomes available."/>

                            </elementList>
                        </filterEntry>
                    </filterList>
                </mapEntry>
            </map>
        </dataBody>
    </UPDATE>


    Received Source Directory Update
    DirectoryUpdate: 
        streamId: 2
        filter: 
        Service:
            serviceId: 4594
    [        GroupFilter:
                group: 0000: 00 01                                              ..
                status: State: Open/Suspect/None - text: "A23: Service has gone down. Will recall when service becomes available."
    ]

    However, the item streams are still opened so the application doesn't need to resubscribe to items.

    When the service is up, the application will get a source directly update message indicating that the service is up.

    <!-- rwfMajorVer="14" rwfMinorVer="1" -->
    <UPDATE domainType="SOURCE" streamId="2" containerType="MAP" flags="0x80 (DO_NOT_CONFLATE)" updateType="0" dataSize="291">
        <dataBody>
            <map flags="0x00" countHint="0" keyPrimitiveType="UINT" containerType="FILTER_LIST" >
                <mapEntry flags="0x00" action="UPDATE" key="4594" >
                    <filterList containerType="ELEMENT_LIST" countHint="0" flags="0x00">
                        <filterEntry id="1" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                            <elementList flags="0x08 (HAS_STANDARD_DATA)">
                                <elementEntry name="Name" dataType="ASCII_STRING" data="ELEKTRON_DD"/>
                                <elementEntry name="SupportsQoSRange" dataType="UINT" data="0"/>
                                <elementEntry name="Capabilities" dataType="ARRAY">
                                    <array itemLength="1" primitiveType="UINT">
                                        <arrayEntry data="5"/>
                                        <arrayEntry data="6"/>
                                        ...
                                    </array>
                                </elementEntry>
                                <elementEntry name="QoS" dataType="ARRAY">
                                    <array itemLength="0" primitiveType="QOS">
                                        <arrayEntry Qos: Realtime/TickByTick/Static - timeInfo: 0 - rateInfo: 0/>
                                    </array>
                                </elementEntry>
                                <elementEntry name="DictionariesProvided" dataType="ARRAY">
                                    <array itemLength="0" primitiveType="ASCII_STRING">
                                        <arrayEntry data="RWFFld"/>
                                        <arrayEntry data="RWFEnum"/>
                                    </array>
                                </elementEntry>
                                <elementEntry name="DictionariesUsed" dataType="ARRAY">
                                    <array itemLength="0" primitiveType="ASCII_STRING">
                                        <arrayEntry data="RWFFld"/>
                                        <arrayEntry data="RWFEnum"/>
                                    </array>
                                </elementEntry>
                                <elementEntry name="Vendor" dataType="ASCII_STRING" data="Thomson Reuters"/>
                                <elementEntry name="AcceptingConsumerStatus" dataType="UINT" data="1"/>
                            </elementList>
                        </filterEntry>
                        <filterEntry id="2" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                            <elementList flags="0x08 (HAS_STANDARD_DATA)">
                                <elementEntry name="ServiceState" dataType="UINT" data="1"/>
                                <elementEntry name="AcceptingRequests" dataType="UINT" data="1"/>
                            </elementList>
                        </filterEntry>
                    </filterList>
                </mapEntry>
            </map>
        </dataBody>
    </UPDATE>


    Received Source Directory Update
    DirectoryUpdate: 
        streamId: 2
        filter: 
        Service:
            serviceId: 4594
            InfoFilter:
                serviceName: ELEKTRON_DD
                vendor: Thomson Reuters
                isSource: 0
                supportsQosRange: 0
                supportsOutOfBandSnapshots: 1
                acceptingConsumerStatus: 1
                capabilities: [5, 6, 7, 8, 9, 10, 11, 13, 14, 18, 19, 20, 21, 23, 24, 25, 26, 28, 29, 33, 34, 35, 126]
                dictionariesProvided: [RWFFld, RWFEnum]
                dictionariesUsed: [RWFFld, RWFEnum]
                qos: [Qos: Realtime/TickByTick/Static - timeInfo: 0 - rateInfo: 0]
            StateFilter:
                ServiceState: 1
                AcceptingRequests: 1


    Received serviceName: ELEKTRON_DD

    Then, the application will retrieve refresh messages of subscribed items.

    <!-- rwfMajorVer="14" rwfMinorVer="1" -->
    <REFRESH domainType="MARKET_PRICE" streamId="5" containerType="FIELD_LIST" flags="0x1DA (HAS_PERM_DATA|HAS_MSG_KEY|HAS_SEQ_NUM|REFRESH_COMPLETE|HAS_QOS|CLEAR_CACHE)" groupId="1" seqNum="15966" permData="0311 F252 6C" Qos: Realtime/TickByTick/Static - timeInfo: 0 - rateInfo: 0 State: Open/Ok/None - text: "All is well" dataSize="2093">
        <key flags="0x07 (HAS_SERVICE_ID|HAS_NAME|HAS_NAME_TYPE)" serviceId="4594" name="JPY=" nameType="1"/>
        <dataBody>
            <fieldList flags="0x09 (HAS_FIELD_LIST_INFO|HAS_STANDARD_DATA)" fieldListNum="99" dictionaryId="1">
                <fieldEntry fieldId="1" data="020E"/>
                <fieldEntry fieldId="2" data="99"/>
                <fieldEntry fieldId="3" data="5341 4E54 414E 4445 5220 2020 2048 4B47"/>
                <fieldEntry fieldId="5" data="0407"/>
    ...
    JPY=
    DOMAIN: MARKET_PRICE
    State: Open/Ok/None - text: "All is well"
        1/PROD_PERM: 526
        2/RDNDISPLAY: 153
        3/DSPLY_NAME: SANTANDER    HKG
        5/TIMACT: 04:07:00:000:000:000
        11/NETCHNG_1: -0.19

    The refresh message has the groupId and data state which is Ok.

    For more information about Item Group, please refer to the 13.4 Item Group Use section in the ETA Developers Guide.

    Next, please see the answers to the questions.

    1. When ads service state is changed from UP-->down, Will consumer receives Status message or update messages?

    The consumer will get the source directory update messages that indicate the group and service are down. This is the order of the source directory update messages.

    <!-- rwfMajorVer="14" rwfMinorVer="1" -->
    <UPDATE domainType="SOURCE" streamId="2" containerType="MAP" flags="0xC0 (DO_NOT_CACHE|DO_NOT_CONFLATE)" updateType="0" dataSize="116">
        <dataBody>
            <map flags="0x00" countHint="0" keyPrimitiveType="UINT" containerType="FILTER_LIST" >
                <mapEntry flags="0x00" action="UPDATE" key="4594" >
                    <filterList containerType="ELEMENT_LIST" countHint="0" flags="0x00">
                        <filterEntry id="3" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                            <elementList flags="0x08 (HAS_STANDARD_DATA)">
                                <elementEntry name="Group" dataType="BUFFER" data="1"/>
                                <elementEntry name="Status" dataType="STATE" State: Open/Suspect/None - text: "A23: Service has gone down. Will recall when service becomes available."/>
                            </elementList>
                        </filterEntry>
                    </filterList>
                </mapEntry>
            </map>
        </dataBody>
    </UPDATE>
    <!-- rwfMajorVer="14" rwfMinorVer="1" -->
    <UPDATE domainType="SOURCE" streamId="2" containerType="MAP" flags="0xC0 (DO_NOT_CACHE|DO_NOT_CONFLATE)" updateType="0" dataSize="51">
        <dataBody>
            <map flags="0x00" countHint="0" keyPrimitiveType="UINT" containerType="FILTER_LIST" >
                <mapEntry flags="0x00" action="UPDATE" key="4594" >
                    <filterList containerType="ELEMENT_LIST" countHint="0" flags="0x00">
                        <filterEntry id="3" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                            <elementList flags="0x08 (HAS_STANDARD_DATA)">
                                <elementEntry name="Group" dataType="BUFFER" data="1"/>
                                <elementEntry name="MergedToGroup" dataType="BUFFER" data="0"/>
                            </elementList>
                        </filterEntry>
                    </filterList>
                </mapEntry>
            </map>
        </dataBody>
    </UPDATE>


    <!-- rwfMajorVer="14" rwfMinorVer="1" -->
    <UPDATE domainType="SOURCE" streamId="2" containerType="MAP" flags="0x80 (DO_NOT_CONFLATE)" updateType="0" dataSize="72">
        <dataBody>
            <map flags="0x00" countHint="0" keyPrimitiveType="UINT" containerType="FILTER_LIST" >
                <mapEntry flags="0x00" action="UPDATE" key="4594" >
                    <filterList containerType="ELEMENT_LIST" countHint="0" flags="0x00">
                        <filterEntry id="2" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                            <elementList flags="0x08 (HAS_STANDARD_DATA)">
                                <elementEntry name="ServiceState" dataType="UINT" data="0"/>
                                <elementEntry name="AcceptingRequests" dataType="UINT" data="1"/>
                                <elementEntry name="Status" dataType="STATE" State: Open/Suspect/None - text: ""/>
                            </elementList>
                        </filterEntry>
                    </filterList>
                </mapEntry>
            </map>
        </dataBody>
    </UPDATE>


    2. If we are not using watch list, do we need to re-subscribe for existing subscription when service becomes available again?

    If you use the SingleOpen (1) and AllowSuspectData (1) which is the default behavior, the application doesn't need to re-subscribe to items because the item streams are still open and ADS will recover the items on behalf of the application.

    However, if you use the SingleOpen (0) and AllowSuspectData (0), the item streams will be Closed, Recoverable and the application needs to resubscribe to items. For more information regarding SingleOpen and AllowSuspectData, please refer to the 13.5 Single Open and Allow Suspect Data Behavior section in the ETA Developer Guide.


    3. Will we receive any chanel Up or FD change etc events when service becomes available again?

    In this case, the service is down but the channel remains connected. Therefore, the application will not receive the channel up or FD change.

    3. When ads service state is changed from down ->up, what would be message flow state for directory updates i.e. will we receive refresh again ?

    When the service is up, the application will receive the source directory update message that indicates that the service is up, as mentioned above.

Answers

  • @rinki121

    I tested with the com.refinitiv.eta.valueadd.examples.consumer example and found that if I stop ADS, the example will get the Source Directory Update that indicates that the services provided by ADH are down, as shown below.

    Received Source Directory Update
    DirectoryUpdate: 
        streamId: 2
        filter: 
        Service:
            serviceId: 4594
    [        GroupFilter:
                group: 0000: 00 01                                              ..
                status: State: Open/Suspect/None - text: "A23: Service has gone down. Will recall when service becomes available."
    ]
    Received Source Directory Update
    DirectoryUpdate: 
        streamId: 2
        filter: 
        Service:
            serviceId: 4594
    [        GroupFilter:
                group: 0000: 00 01                                              ..
                mergedToGroup: 0000: 00 00                                              ..
    ]
    Received Source Directory Update
    DirectoryUpdate: 
        streamId: 2
        filter: 
        Service:
            serviceId: 4594
            StateFilter:
                ServiceState: 0
                AcceptingRequests: 1
                State: Open/Suspect/None - text: "null"

    Please try to run the com.refinitiv.eta.valueadd.examples.consumer example to verify if you see this behavior. You can run the example with the -x option to verify the messages retrieved by the API.

    -c <hostname>:<port> <service name> mp:JPY= -x
  • @Jirapongse What is the exact flow of events if service becomes unavailable ?


    1. When ads service state is changed from UP-->down, Will consumer receives

    Status message or update messages ?

    If both of them are received, what would be exact sequence and what could be value for status

    2. If we are not using watch list, do we need to re-subscribe for existing subscription when service becomes available again ?

    3. Will we receive any chanel Up or FD change etc events when service becomes available again?

    3. When ads service state is changed from down ->up, what would be message flow state for directory updates i.e. will we receive refresh again ?