skip to Main Content

I have a task running on AirFlow that has two steps:

  1. fetches data from MSSql server as a dataframe;
  2. stores it in a PostGres database;

I’m using the MsSqlHook and PostgresHook to manage the connections.

The fetching is going ok, but I’m not been able to accomplish the second one: I’m trying to use the .to_sql method, but it keeps trowing errors…

Since I’ve already did it using SQLAlchemy in other contexts, I’d also tried it in this problem, but with no success…

This is my code:

from airflow import DAG
from airflow.decorators import task
from datetime import timedelta
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from airflow.providers.postgres.hooks.postgres import PostgresHook

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': '2024-01-01',
    'email_on_failure': False,
    'email_on_retry': False,
    'retry_delay': timedelta(minutes=5),
}
    
def mssql_to_postgres_transfer(mssql_hook, postgres_hook, sql, table_name, dagrun_id):
    
    df = mssql_hook.get_pandas_df(sql=sql)
    df['dagrun_id'] = dagrun_id
    
    df.to_sql(
        name=table_name,
        con=postgres_hook.get_sqlalchemy_engine()
        if_exists='replace',
        )
    
def do_the_job():
    mssql_hook = MsSqlHook(mssql_conn_id='mssql_conn_id')
    postgres_hook = PostgresHook(postgres_conn_id='postgres_conn_id')
    
    qry = "SELECT * FROM tbl_dummy"
    
    mssql_to_postgres_transfer(
        mssql_hook=mssql_hook, 
        postgres_hook=postgres_hook, 
        sql=qry, 
        table_name='tbl_dummy_local', 
        dagrun_id=1
        )
    
with DAG(
    'my_dag',
    default_args=default_args,
    catchup=False,
    description='fetch and store',
    max_active_runs=1,
    schedule_interval='5 * * * *'
) as dag:
    
    @task
    def fetch_and_store_task():
        do_the_job()
    
    fetch_and_store_task()

And the output:

[2024-02-07T22:25:08.665+0000] {base.py:83} INFO - Using connection ID 'postgres_conn_id' for task execution.
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/airflow/.local/lib/python3.11/site-packages/pandas/util/_decorators.py", line 333, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/pandas/core/generic.py", line 3081, in to_sql
    return sql.to_sql(
           ^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 842, in to_sql
    return pandas_sql.to_sql(
           ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 2851, in to_sql
    table.create()
  File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 984, in create
    if self.exists():
       ^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 970, in exists
    return self.pd_sql.has_table(self.name, self.schema)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 2866, in has_table
    return len(self.execute(query, [name]).fetchall()) > 0
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 2673, in execute
    cur = self.con.cursor()
          ^^^^^^^^^^^^^^^
AttributeError: 'Engine' object has no attribute 'cursor'

As I said, when I’ve tried to use SQLAlchemy like this:

eng = aircerv.get_sqlalchemy_engine()
df.to_sql(name='tbl_dummy_local', con=eng.raw_connection(), if_exists='replace')

But I doesn’t work either!

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 2675, in execute
    cur.execute(sql, *args)
psycopg2.errors.UndefinedTable: relation "sqlite_master" does not exist
LINE 5:             sqlite_master
                    ^


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/airflow/.local/lib/python3.11/site-packages/pandas/util/_decorators.py", line 333, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/pandas/core/generic.py", line 3081, in to_sql
    return sql.to_sql(
           ^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 842, in to_sql
    return pandas_sql.to_sql(
           ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 2851, in to_sql
    table.create()
  File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 984, in create
    if self.exists():
       ^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 970, in exists
    return self.pd_sql.has_table(self.name, self.schema)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 2866, in has_table
    return len(self.execute(query, [name]).fetchall()) > 0
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 2687, in execute
    raise ex from exc
pandas.errors.DatabaseError: Execution failed on sql '
        SELECT
            name
        FROM
            sqlite_master
        WHERE
            type IN ('table', 'view')
            AND name=?;
        ': relation "sqlite_master" does not exist
LINE 5:             sqlite_master
                    ^

Both connections are working well, and the dataframe has rows:

>>> postgres_hook.test_connection()
[2024-02-07T22:25:51.755+0000] {base.py:83} INFO - Using connection ID 'aircerv' for task execution.
[2024-02-07T22:25:51.765+0000] {sql.py:450} INFO - Running statement: select 1, parameters: None
[2024-02-07T22:25:51.766+0000] {sql.py:459} INFO - Rows affected: 1
(True, 'Connection successfully tested')

>>> df.shape
(366, 77)

2

Answers


  1. While I don’t use Airflow, I feel your pain! I too was always getting either 'Engine' object has no attribute 'cursor' or relation "sqlite_master" does not exist errors, but was finally able to get my setup working by upgrading to the latest of everything (and starting with a fresh virtualenv):

    (runtime: python-3.12.1)
    pip install psycopg2-binary~=2.9.9 SQLAlchemy~=2.0.25 pandas~=2.2.0

    I spent way too long troubleshooting this, since I’d been using older versions to also keep camelot.read_pdf working. But now that magically also still works (using camelot-py~=0.9.0)

    Login or Signup to reply.
  2. I am getting the same error. Im confused why the table sqlite_master is being referenced at all. Im not using or pushing to a mysql database anywhere in my deployment. It looks like the root cause is pandas doesn’t identify the sqlalchemy engine dialect and uses a default of SqlLiteDatabase class. I was able to resolve this by downgrading my pandas version. Im using Airflow 2.8.0

    psycopg2-binary = "^2.8.6"
    pandas-gbq = "<0.15.0"
    pandas = "1.3.5"
    sqlalchemy = "1.4.51"
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search