How to handle COMPLETION_EVENT while migrating Refinitiv RFA API to EMA API?

We are currently upgrading our application from RFA API to use EMA API. We referred a similar question in this link:

https://community.developers.refinitiv.com/questions/92298/whats-the-equivalent-of-rfas-completion-event-in-r.html

But we are not able to use the rsslIsFinalState given in the solution here. Please provide a working example to identify the completion event or closing of a stream in any Consumer(given below) implementing OmmConsumerClient.

///*|----------------------------------------------------------------------------------------------------

// *| This source code is provided under the Apache 2.0 license --

// *| and is provided AS IS with no warranty or guarantee of fit for purpose. --

// *| See the project's LICENSE.md for details. --

// *| Copyright (C) 2019 Refinitiv. All rights reserved. --

///*|----------------------------------------------------------------------------------------------------


package com.refinitiv.ema.examples.training.consumer.series100.ex100_MP_Streaming;


import com.refinitiv.ema.access.Msg;

import com.refinitiv.ema.access.AckMsg;

import com.refinitiv.ema.access.GenericMsg;

import com.refinitiv.ema.access.RefreshMsg;

import com.refinitiv.ema.access.ReqMsg;

import com.refinitiv.ema.access.StatusMsg;

import com.refinitiv.ema.access.UpdateMsg;

import com.refinitiv.ema.access.EmaFactory;

import com.refinitiv.ema.access.OmmConsumer;

import com.refinitiv.ema.access.OmmConsumerClient;

import com.refinitiv.ema.access.OmmConsumerConfig;

import com.refinitiv.ema.access.OmmConsumerEvent;

import com.refinitiv.ema.access.OmmException;


class AppClient implements OmmConsumerClient

{

public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)

{

System.out.println(refreshMsg);

}

public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent event)

{

System.out.println(updateMsg);

}


public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent event)

{

System.out.println(statusMsg);

}


public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent consumerEvent){}

public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent consumerEvent){}

public void onAllMsg(Msg msg, OmmConsumerEvent consumerEvent){}

}


public class Consumer

{

public static void main(String[] args)

{

OmmConsumer consumer = null;

try

{

AppClient appClient = new AppClient();

OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig();

consumer = EmaFactory.createOmmConsumer(config.host("localhost:14002").username("user"));

ReqMsg reqMsg = EmaFactory.createReqMsg();

consumer.registerClient(reqMsg.serviceName("DIRECT_FEED").name("IBM.N"), appClient);

Thread.sleep(60000); // API calls onRefreshMsg(), onUpdateMsg() and onStatusMsg()

}

catch (InterruptedException | OmmException excp)

{

System.out.println(excp.getMessage());

}

finally

{

if (consumer != null) consumer.uninitialize();

}

}

}





Best Answer

  • Jirapongse
    Answer ✓

    @dimple.shah

    Thank you for reaching out to us.

    The code looks like this:

        public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)
        {
    ...
            
       if (isFinalState(refreshMsg.state())) {
                ...
            }

    ...
        }
        
        public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent event) 
        {
     ...
     if (statusMsg.hasState()) {
                System.out.println("Item State: " +statusMsg.state());
                if (isFinalState(statusMsg.state())) {
                    ...
                }
            }

            
    ...
            
        }
        public boolean isFinalState(OmmState state) {
            
            if(state.streamState() == StreamState.CLOSED || 
                    state.streamState() == StreamState.CLOSED_RECOVER ||
                    state.streamState() == StreamState.CLOSED_REDIRECTED ||
                    state.streamState() == StreamState.NON_STREAMING) {
                return true;
            }
            return false;
    }

    The state is available in the Refresh messages and optional in Update messages.

    You need to check the stream state. If the stream state is CLOSED. CLOSED_RECOVER, REDIRECTED, or NON_STREAMING, this means that the stream has been closed.

Answers