skip to Main Content

The title says it all. Here is the code snippet.

async with EngineContext(uri=URI) as engine:
    session = async_sessionmaker(bind=engine, expire_on_commit=True)()
    async with session.begin():
        stmt: Select = select(User).order_by(_GenerativeSelect__first=User.login_date.desc()).limit(limit=10)
        result = await session.execute(statement=stmt)

Equivalent to the very simple query,

SELECT * FROM User ORDER BY login_date DESC LIMIT 10;

The query is working fine and I am getting the results as expected. But I want to form a polars dataframe from the result, and without having to hardcode (or externally supply) the column names. I cannot seem to get the correct public APIs of the result object to manage it. When I try result.keys(), I get RMKeyView(['User']), not the column names.

I tried iterating over the results, where each object has the column names as attributes, but there are many other attributes. So the code cannot be dynamic enough to pick up the column names.

So any help on the cleanest way to form a polars Dataframe (preferably lazy) from this result?

Related, when I begin the session, is there a way to explicitly mark the session as read-only in SQL alchemy, to preclude any possibility of data corruption when no write is intended in that session?

2

Answers


  1. Chosen as BEST ANSWER

    After some pain, I got this piece of rather convoluted mess working, but I believe there should be a cleaner way to do it.

    def column_to_series(column: str) -> pl.Series:
        """Convert a column to a series based on DB query results """
        return pl.Series(name=column,
                         values=map(lambda item: getattr(
                             item[0], column), data))
    
    async with EngineContext(uri=URI) as engine:
        session: Any = async_sessionmaker(bind=engine, expire_on_commit=True)()
        async with session.begin():
            stmt: Select = select(User).order_by(
                _GenerativeSelect__first=User.login_date.desc()).limit(limit=n)
            # noinspection PyTypeChecker
            data: List[Tuple[User]] = (await session.execute(
                statement=stmt)).fetchall()
            columns: Iterator[str] = map(lambda item: getattr(item, 'key'),
                                         inspect(subject=User).c)
            res:pl.DataFrame=await pl.LazyFrame(data=map(column_to_series,
                                                columns)).collect_async()      
    

    Van's solution fails because apparently, polars and aiomysql driver do not play well with each other.


  2. # 1. the select query
    sele = select(User).order_by(User.login_date.desc()).limit(10)
    print(sele)
    
    # 2. create a STRING statement from it
    stmt = str(sele.compile(
        dialect=session.bind.dialect,
        compile_kwargs={"literal_binds": True},
    ))
    print(stmt)
    
    # 3. this is the ACTUAL ANSWER part
    import polars as pl
    res = pl.read_database(stmt, session)
    print(res)
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search