skip to Main Content

Being very new to FastAPI I am strugling to test slightly more difficult code than I saw in the tutorial. I use fastapi_cache module and Redis like this:

from fastapi import Depends, FastAPI, Query, Request
from fastapi_cache.backends.redis import CACHE_KEY, RedisCacheBackend
from fastapi_cache import caches, close_caches

app = FastAPI()

def redis_cache():
    return caches.get(CACHE_KEY)    

async def test(
    cache: RedisCacheBackend = Depends(redis_cache),
    n: int = Query(
    # code that uses redis cache

async def on_startup() -> None:
    rc = RedisCacheBackend('redis://redis')
    caches.set(CACHE_KEY, rc)

async def on_shutdown() -> None:
    await close_caches() looks like this:

import pytest
from httpx import AsyncClient
from .main import app

async def test_cache():
    async with AsyncClient(app=app, base_url="http://test") as ac:
        response = await ac.get("/cache?n=150")

When I run pytest, it sets cache variable to None and test fails. I think I understand why the code is not working. But how do I fix it to test my caching properly?



  1. The point is that httpx does not implement lifespan protocol and trigger startup event handlers. For this, you need to use LifespanManager.

    Install: pip install asgi_lifespan

    The code would be like so:

    import pytest
    from asgi_lifespan import LifespanManager
    from httpx import AsyncClient
    from .main import app
    async def test_cache():
        async with LifespanManager(app):
            async with AsyncClient(app=app, base_url="http://localhost") as ac:
                response = await ac.get("/cache")

    More info here:

    Login or Signup to reply.
  2. In case if you don’t want to add dependency just for a tests, here is the simple implementation for asyncio:

    import asyncio
    class LifespanWaiter:
        def __init__(self, app):
   = app
            self.startup = asyncio.Future()
            self.shutdown = asyncio.Future()
        async def send(self, obj):
            if obj['type'] == 'lifespan.startup.complete':
        async def receive(self):
            if self.startup.done():
                await self.shutdown
        async def __aenter__(self):
      {"type": "lifespan"}, receive=self.receive, send=self.send)
            await self.startup
            return self
        async def __aexit__(self, exc_type, exc_val, exc_tb):

    I use it like this:

    async def app(test_db_name):
        """Creates app instance and apply migrations.
        DATABASES["db_options"]["database"] = test_db_name
        app = make_app()
        async def db_setup():
            async with app.state.db_pool.acquire() as conn:
                await apply_migrations(conn, 'local')
        async with LifespanWaiter(app):
            yield app
    async def client(app: FastAPI):
        async with AsyncClient(app=app, base_url="http://test") as client:
            yield client
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top