skip to Main Content

Being very new to FastAPI I am strugling to test slightly more difficult code than I saw in the tutorial. I use fastapi_cache module and Redis like this:

from fastapi import Depends, FastAPI, Query, Request
from fastapi_cache.backends.redis import CACHE_KEY, RedisCacheBackend
from fastapi_cache import caches, close_caches

app = FastAPI()

def redis_cache():
    return caches.get(CACHE_KEY)    

@app.get('/cache')
async def test(
    cache: RedisCacheBackend = Depends(redis_cache),
    n: int = Query(
        ..., 
        gt=-1
    )
):  
    # code that uses redis cache

@app.on_event('startup')
async def on_startup() -> None:
    rc = RedisCacheBackend('redis://redis')
    caches.set(CACHE_KEY, rc)

@app.on_event('shutdown')
async def on_shutdown() -> None:
    await close_caches()

test_main.py looks like this:

import pytest
from httpx import AsyncClient
from .main import app

@pytest.mark.asyncio
async def test_cache():
    async with AsyncClient(app=app, base_url="http://test") as ac:
        response = await ac.get("/cache?n=150")

When I run pytest, it sets cache variable to None and test fails. I think I understand why the code is not working. But how do I fix it to test my caching properly?

2

Answers


  1. The point is that httpx does not implement lifespan protocol and trigger startup event handlers. For this, you need to use LifespanManager.

    Install: pip install asgi_lifespan

    The code would be like so:

    import pytest
    from asgi_lifespan import LifespanManager
    from httpx import AsyncClient
    from .main import app
    
    
    @pytest.mark.asyncio
    async def test_cache():
        async with LifespanManager(app):
            async with AsyncClient(app=app, base_url="http://localhost") as ac:
                response = await ac.get("/cache")
    

    More info here: https://github.com/encode/httpx/issues/350

    Login or Signup to reply.
  2. In case if you don’t want to add dependency just for a tests, here is the simple implementation for asyncio:

    import asyncio
    
    
    class LifespanWaiter:
        def __init__(self, app):
            self.app = app
            self.startup = asyncio.Future()
            self.shutdown = asyncio.Future()
    
        async def send(self, obj):
            if obj['type'] == 'lifespan.startup.complete':
                self.startup.set_result(None)
    
        async def receive(self):
            if self.startup.done():
                await self.shutdown
    
        async def __aenter__(self):
            asyncio.create_task(
                self.app({"type": "lifespan"}, receive=self.receive, send=self.send)
            )
            await self.startup
            return self
    
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            self.shutdown.set_result(None)
    

    I use it like this:

    @fixture(scope='session')
    async def app(test_db_name):
        """Creates app instance and apply migrations.
        """
        DATABASES["db_options"]["database"] = test_db_name
    
        app = make_app()
    
        @app.on_event("startup")
        async def db_setup():
            async with app.state.db_pool.acquire() as conn:
                await apply_migrations(conn, 'local')
    
        async with LifespanWaiter(app):
            yield app
    
    
    @fixture(scope='session')
    async def client(app: FastAPI):
        async with AsyncClient(app=app, base_url="http://test") as client:
            yield client
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search