skip to Main Content

redshift_connector is defined to be aligned with https://peps.python.org/pep-0249/#id24 but I can’t, after calling procedure, retrieve data into a dataframe.

Instead I’m getting 'mycursor' value. How to overcome this?

fetch*() methods don’t allow passing argument that will allow getting into data in mycursor.

I also tried RECORDS type but no luck.

Procedure’s body:

--CREATE TABLE reporting.tbl(a int, b int);
--INSERT INTO reporting.tblVALUES(1, 4);

CREATE OR REPLACE PROCEDURE reporting.procedure(param IN integer, rs_out INOUT refcursor)
LANGUAGE plpgsql
AS $$

BEGIN
  OPEN rs_out FOR SELECT a FROM reporting.tbl;
END;
$$;

Python code:

import redshift_connector

conn = redshift_connector.connect(
     host='xyz.xyz.region.redshift.amazonaws.com',
     database='db',
     port=5439,
     user="user",
     password='p@@s'
)
  

cursor = conn.cursor()

cursor.execute("BEGIN;") 
res = cursor.callproc("reporting.procedure", parameters=[1, 'mycursor'])
res = cursor.fetchall()
cursor.execute("COMMIT;")

#returns (['mycursor'],)
print(res)

2

Answers


  1. Chosen as BEST ANSWER

    I modified Bill's solution by:

    Adding context manager class:

    import redshift_connector
    
    class ConnectionError(Exception):
        pass
    
    class CredentialsError(Exception):
        pass
    
    class SQLError(Exception):
        pass
    
    class RedshiftClientData():
        def __init__(self, config: dict) -> None:
            self.configuration = config
    
        def __enter__(self) -> 'cursor':
            try:
                self.conn = redshift_connector.connect(**self.configuration)
                self.cursor = self.conn.cursor()
                return self.cursor
    
            except redshift_connector.InterfaceError as err:
                raise ConnectionError(err)
    
            except redshift_connector.ProgrammingError as err:
                raise CredentialsError(err)
    
            except Exception as ex:
                template = "An exception of type {0} occured in class RedshiftClient. Arguments:n{1!r}"
                message = template.format(type(ex).__name__, ex.args)
                print(message)
    
    
        def __exit__(self, exc_type, exc_value, exc_trace) -> None:
            self.conn.commit()
            self.cursor.close()
            self.conn.close()
            if exc_type is redshift_connector.ProgrammingError:
                raise SQLError(exc_value)
            elif exc_type:
                raise exc_type(exc_value)
    

    Below tested and I confirm that data is fetched without any temp tables.

    dbconfig = {
        "host": os.environ["redshift_host"],
        "database": os.environ["redshift_database"],
        "port": int(os.environ["redshift_port"]),
        "user": os.environ["redshift_username"],
        "password": os.environ["redshift_password"],
    }
    
    logging.info("Connecting to Redshift...")
    with RedshiftClientData(dbconfig) as cursor:
    
        logging.info("Excel file prep...")
    
        #on the fly process
        cursor.execute("CALL reporting.procedure('mycursor');")
        cursor.execute("FETCH ALL FROM mycursor;")
    
        result = pd.DataFrame = cursor.fetch_dataframe()
    
        print(result)
    

  2. I think you are trying to define 2 cursors and only one is allowed. "conn.cursor()" creates a cursor with name defined by redshift_connector. "OPEN rs_out FOR SELECT a FROM reporting.tbl;" in your procedure opens a second cursor with the name mycursor. The "cursor.fetchall()" is trying to fetch from the first cursor (and possibly erroring). No command is fetching from mycursor.

    I don’t believe there is a way to get "cursor.fetchall()" to point to a different cursor name so I think you need to run the SQL commands (CALL, FETCH, etc) directly.

    Something like this:

    import redshift_connector
    
    conn = redshift_connector.connect(
         host='xyz.xyz.region.redshift.amazonaws.com',
         database='db',
         port=5439,
         user="user",
         password='p@@s'
    )
    
    conn.run("BEGIN;") 
    res = conn.run("CALL reporting.procedure(1, 'mycursor')")
    res = conn.run("FETCH ALL FROM mycursor;")
    conn.run("COMMIT;")
    
    print(res)
    

    Be aware that if you are on a single node Redshift cluster FETCH ALL isn’t allowed and you will need to use FETCH FORWARD instead.

    Above untested and off the cuff.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search