I’ve been having some issues with a connection to a Postgresql instance on Google Cloud SQL, and wanted to ask for help. I’m not sure if a solution would be to initiate a connection engine or something of the sort, but here is my issue. My code is as follows
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db
source_config = relational_db.SourceConfiguration(
drivername='postgresql+pg8000',
host='localhost',
port=5432,
username= USERNAME,
password= PASSWORD,
database= DB-NAME,
create_if_missing=True,
)
table_config = relational_db.TableConfiguration(
name=TABLE-NAME,
create_if_missing=False,
primary_key_columns=["key"],
create_insert_f=FUNCTION,
)
with beam.Pipeline(options= pipeline_options) as pipeline:
update_pipe= (
pipeline
| 'QueryTable' >> beam.io.ReadFromBigQuery(
table= TABLE)
| 'UPDATE DB' >> relational_db.Write(source_config=source_config, table_config=table_config)
)
Running the code like this has resulted in the following error:
RuntimeError: sqlalchemy.exc.InterfaceError: (pg8000.exceptions.InterfaceError) Can't create a connection to host localhost and port 5432 (timeout is None and source_address is None).
I’ve read the documentation and some related questions on stackoverflow, where I’ve seen some suggestions like Private IP connectivity, or trying to authenticate with the Gcloud CLI , and other such things. But my confusion relates to the following. If I use sqlalchemy without trying to implement it in an apache beam pipeline, I don’t get the same connection refusal. And one of the arguments explicitly defines the IP types to look for as Public
import sys
import sqlalchemy
from google.cloud.sql.connector import Connector, IPTypes
# initialize Python Connector object
connector = Connector()
# Python Connector database connection function
def getconn():
conn = connector.connect(
CLOUD-SQL-CONNECTION-NAME, # Cloud SQL Instance Connection Name
"pg8000",
user=USERNAME,
password=PASSWORD,
db=DB-NAME,
ip_type= IPTypes.PUBLIC # IPTypes.PRIVATE for private IP
)
return conn
# create connection pool with 'creator' argument to our connection object function
pool = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=getconn,
)
# interact with Cloud SQL database using connection pool
with pool.connect() as db_conn:
result = db_conn.execute(sqlalchemy.text("SELECT * from users_copy LIMIT 10")).fetchall()
So my question is, is there a way to include the connection/engine in the Beam pipeline to avoid the error? Or do I need to change the source config arguments to include the Cloud SQL Instance connection name?
Thanks for the help and for reading my issue.
2
Answers
I have the same question. Were you able to resolve?
Are you locked into using the
beam_nuggets.io
library?If yes, then you can unfortunately not use the Cloud SQL Python Connector.
Taking a look at the beam-nuggets code it uses SQLAlchemy under the hood which can be used by the Python Connector, however it configures the database engine purely using a database URL/URI. The Cloud SQL Python Connector requires leveraging SQLAlchemy’s
create_engine
method with thecreator
argument. Beam nuggets would have to add support for thecreator
argument in order to support the Cloud SQL Python Connector.An alternative to the Cloud SQL Python Connector would be to try and deploy the Cloud SQL Proxy alongside your Beam application so that your initial code snippet would work connecting to
localhost
.