async function error

Hi team,

I am going to write a for loop which will generate several thousands get_data function by investor id for a list of stocks. If I am use sync method, it takes extremely long time to finish the task. Hence, I try to using Async approach to do so. Wonder it is possible? I try to implement the async approach as below code but I encountered run time error. Wonder what is the correct way to do so? Thanks.


import asyncio


#shareheld by each investor

async def get_HVH(id):

df, err = ek.get_data(

instruments = Custom_PeersRICs,

fields = ['TR.SharesHeldValue'],

parameters = {'legacyInvestorId':str(id),'Scale':Scaling.value}

)

return df.iloc[:,1]


async def main():

tbl=pd.DataFrame(columns = Custom_PeersRICs)

for i in range(Output_Number.value):

id=output['investorid'][i]

hvh=loop.create_task(get_HVH(id))

print(i)

tbl_length=len(tbl)

tbl.loc[tbl_length]=list(hvh)

await asyncio


if __name__ == '__main__':

loop = asyncio.get_event_loop()

loop.run_until_complete(main())

loop.close()


Error msg

---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-21-d4e8b324361a> in <module>
21 22 loop = asyncio.get_event_loop()
---> 23 loop.run_until_complete(main())
24 loop.close()

/opt/conda/lib/python3.7/site-packages/nest_asyncio.py in run_until_complete(self, future)
93 raise RuntimeError(
94 'Event loop stopped before Future completed.')
---> 95 return f.result()
96 finally:
97 events._set_running_loop(old_running_loop)

/opt/conda/lib/python3.7/asyncio/futures.py in result(self)
176 self.__log_traceback = False
177 if self._exception is not None:
--> 178 raise self._exception
179 return self._result
180
/opt/conda/lib/python3.7/asyncio/tasks.py in __step(***failed resolving arguments***)
247 # We use the `send` method directly, because coroutines
248 # don't have `__iter__` and `__next__` methods.
--> 249 result = coro.send(None)
250 else:
251 result = coro.throw(exc)

<ipython-input-21-d4e8b324361a> in main()
17 print(i)
18 tbl_length=len(tbl)
---> 19 tbl.loc[tbl_length]=list(hvh)
20 await asyncio
21
/opt/conda/lib/python3.7/asyncio/futures.py in __await__(self)
260 yield self # This tells Task to wait for completion.
261 if not self.done():
--> 262 raise RuntimeError("await wasn't used with future")
263 return self.result() # May raise too.
264
RuntimeError: await wasn't used with future

Best Answer

  • Jirapongse
    Answer ✓

    @sunny.to

    I am new to the asyncio. However, I assume that you would like to send requests concurrently. I found how to turn Python sync functions to async after searching via google.

    The code that I tested is:


    import asyncio
    import eikon as ek
    import pandas as pd
    import logging
    import sys,os,os.path
    import time
    import threading
    import functools

    def force_async(fn):
        '''
        turns a sync function to async function using threads
        '''
        from concurrent.futures import ThreadPoolExecutor
        import asyncio
        pool = ThreadPoolExecutor()
        
        @functools.wraps(fn)
        def wrapper(*args, **kwargs):
            global sem
            sem.acquire()
            future = pool.submit(fn, *args, **kwargs)        
            return asyncio.wrap_future(future)  # make it awaitable

        return wrapper

    @force_async
    def get_HVH(id):
        
        global sem
        df, err = ek.get_data(instruments = id, 
                              fields = ['TR.SharesHeldValue'],
                              parameters = {})
        sem.release()
        print(df)   
        return df.iloc[:,1]

    async def main():   
        chain, err = ek.get_data("0#.FTSE",["TR.RIC"])
        print(chain)
        output = chain['Instrument']
        hvh = []   
        for i in range(len(output)):
            id=output[i]       
            hvh.append(get_HVH(id))
            print(i)        

        
        await asyncio.wait(hvh, return_when=asyncio.ALL_COMPLETED)  
      
        print("asyncio.as_completed")

    if __name__ == '__main__':
        start = time.time()
        sem = threading.Semaphore(20)
        ek.set_app_key("<app key>")
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
        print("Loop Close");
        loop.close()
        end = time.time()
        print(end - start)


    The above code subscribes to a chain RIC (0#.FTSE) and then concurrently subscribe to items in that chain to retrieve TR.SharesHeldValue. The number of active requests at a time is controlled by Semaphore. The code sets a value of the semaphore to 20 so it will have 20 active requests at a time.

    After running, it takes around 51 seconds to get TR.SharesHeldValue for 101 items. However, if I call the get_data method in sequence, it will take around 300 seconds to get TR.SharesHeldValue for 101 items. Concurrent requests are much faster than sequential requests.

Answers