skip to Main Content

I have asyncio crawler, that visits URLs and collects new URLs from HTML responses. I was inspired that great tool: https://github.com/aio-libs/aiohttp/blob/master/examples/legacy/crawl.py

Here is a very simplified piece of workflow, how it works:

import asyncio
import aiohttp

class Requester:

    def __init__(self):
        self.sem = asyncio.BoundedSemaphore(1)

    async def fetch(self, url, client):

            async with client.get(url) as response:
                data = (await response.read()).decode('utf-8', 'replace')
                print("URL:", url, " have code:", response.status)
                return response, data

    async def run(self, urls):
        async with aiohttp.ClientSession() as client:
            for url in urls:
                await self.sem.acquire()
                task = asyncio.create_task(self.fetch(url, client))
                task.add_done_callback(lambda t: self.sem.release())

    def http_crawl(self, _urls_list):
        loop = asyncio.get_event_loop()
        crawl_loop = asyncio.ensure_future(self.run(_urls_list))
        loop.run_until_complete(crawl_loop)

r = Requester()
_url_list = ['https://www.google.com','https://images.google.com','https://maps.google.com','https://mail.google.com','https://news.google.com','https://video.google.com','https://books.google.com']
r.http_crawl(_url_list)

What I need now is to add some very slow beautifulsoap based function. I need that function do not block main loop and work as background process. For instance, I will handle HTTP responses.

I read python docs about it and found that: https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

I tried to add it to my code, but it does not work as should (I use cpu_bound only for demo):

import asyncio
import aiohttp
import concurrent.futures

def cpu_bound():
    return sum(i * i for i in range(10 ** 7))

class Requester:

    def __init__(self):
        self.sem = asyncio.BoundedSemaphore(1)

    async def fetch(self, url, client):

            async with client.get(url) as response:
                data = (await response.read()).decode('utf-8', 'replace')
                print("URL:", url, " have code:", response.status)
                ####### Blocking operation #######
                loop = asyncio.get_running_loop()
                with concurrent.futures.ProcessPoolExecutor() as pool:
                    result = await loop.run_in_executor(pool, cpu_bound)
                    print('custom process pool', result)
                #################################
                return response, data

    async def run(self, urls):
        async with aiohttp.ClientSession() as client:
            for url in urls:
                await self.sem.acquire()
                task = asyncio.create_task(self.fetch(url, client))
                task.add_done_callback(lambda t: self.sem.release())

    def http_crawl(self, _urls_list):
        loop = asyncio.get_event_loop()
        crawl_loop = asyncio.ensure_future(self.run(_urls_list))
        loop.run_until_complete(crawl_loop)

r = Requester()
_url_list = ['https://www.google.com','https://images.google.com','https://maps.google.com','https://mail.google.com','https://news.google.com','https://video.google.com','https://books.google.com']
r.http_crawl(_url_list)

For now, it doesn’t work as expected, it blocks HTTP requests every time:

URL: https://www.google.com  have code: 200
custom process pool 333333283333335000000
URL: https://images.google.com  have code: 200
custom process pool 333333283333335000000
URL: https://maps.google.com  have code: 200
custom process pool 333333283333335000000
URL: https://mail.google.com  have code: 200
custom process pool 333333283333335000000
URL: https://news.google.com  have code: 200
custom process pool 333333283333335000000
URL: https://video.google.com  have code: 200
custom process pool 333333283333335000000

How to correctly put the task in the background inside the main asyncio process?

Are there best practices on how to do that in a simple way, or I should use Redis for task planning?

2

Answers


  1. Chosen as BEST ANSWER

    I would also upload code that works for me. It is two independent async queues, and one of them spawn high-CPU consumption process in a separate loop:

    import asyncio
    import functools
    import aiohttp
    import concurrent.futures
    
    def cpu_bound(num):
        return sum(i * i for i in range(10 ** num))
    
    class Requester:
    
        def __init__(self):
            self.threads = 3
            self.threads2 = 10
            self.pool = concurrent.futures.ProcessPoolExecutor()
    
        async def fetch(self, url):
            try:
                timeout = aiohttp.ClientTimeout(total=10)
                async with self.client.get(url, allow_redirects=False, verify_ssl=False, timeout=timeout) as response:
                    data = (await response.read()).decode('utf-8', 'replace')
                    print("URL:", url, " have code:", response.status)
                    resp_list = {'url': str(response.real_url), 'data': str(data), 'headers': dict(response.headers)}
                    return resp_list
    
            except Exception as err:
                print(err)
                return {}
    
        async def heavy_worker(self, a):
            while True:
                resp_list = await a.get()
                if resp_list.keys():
                    ####### Blocking operation #######
                    try:
                        loop = asyncio.get_event_loop()
                        result = await loop.run_in_executor(self.pool, functools.partial(cpu_bound, num=5))
                        print('wappalazer', result)
                    except Exception as err:
                        print(err)
                #################################
                    a.task_done()
                
                else:
                    a.task_done()
    
        async def fetch_worker(self, q, a):
            while True:
                url = await q.get()
                resp_list = await self.fetch(url)
                q.task_done()
                await a.put(resp_list)
    
        async def main(self, urls):
            # Create an queues those we will use to store our "workload".
            q = asyncio.Queue()
            a = asyncio.Queue()
    
            # Create workers tasks to process the queue concurrently.
            workers_fetch = [asyncio.create_task(self.fetch_worker(q, a)) for _ in range(self.threads)]
            workers_heavy = [asyncio.create_task(self.heavy_worker(a)) for _ in range(self.threads2)]
    
            for url in urls:
                await q.put(url)
    
            # wait for all tasks to be processed
            await q.join()
            await a.join()
    
            # Cancel our worker tasks.
            for worker in workers_fetch:
                worker.cancel()
            await asyncio.gather(*workers_fetch , return_exceptions=True)
            for worker in workers_heavy:
                worker.cancel()
            await asyncio.gather(*workers_heavy , return_exceptions=True)
    
        async def run(self, _urls_list):
            async with aiohttp.ClientSession() as self.client:
                task_for_first_run = asyncio.create_task(self.main(_urls_list))
                await asyncio.sleep(1)
    
                await task_for_first_run
                print("All tasks completed")
    
        def http_crawl(self, _urls_list):
            asyncio.run(self.run(_urls_list))
    
    r = Requester()
    _url_list = ['http://aaaaaaaaaaaaaaaa.aaaaaaaaaaaaaaaaaaa.aa', 'https://www.google.com','https://images.google.com','https://maps.google.com','https://mail.google.com',
                 'https://news.google.com','https://video.google.com','https://books.google.com', 'https://www.google.com',
                 'https://images.google.com','https://maps.google.com','https://mail.google.com','https://news.google.com',
                 'https://video.google.com','https://books.google.com', 'https://www.google.com','https://images.google.com',
                 'https://maps.google.com','https://mail.google.com','https://news.google.com','https://video.google.com',
                 'https://books.google.com', 'https://www.google.com','https://images.google.com','https://maps.google.com',
                 'https://mail.google.com','https://news.google.com','https://video.google.com','https://books.google.com',
                 'https://www.google.com','https://images.google.com','https://maps.google.com','https://mail.google.com',
                 'https://news.google.com','https://video.google.com','https://books.google.com']
    r.http_crawl(_url_list)
    

  2. I believe that since you are setting your BoundedSemaphore to 1 it is only allowing one instance of your task to run at a time.

    You can use the ratelimiter package to limit the number of concurrent requests in a certain amount of time.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search