skip to Main Content

I am trying to read data from a table in a postgresql database and proceed with an ETL project. I have an Docker enviroment using this docker-compose:

version: "3.3"
services:
  spark-master:
    image: docker.io/bitnami/spark:3.3
    ports:
      - "9090:8080"
      - "7077:7077"
    volumes:
       - /opt/spark-apps
       - /opt/spark-data
    environment:
      - SPARK_LOCAL_IP=spark-master
      - SPARK_WORKLOAD=master
  spark-worker-a:
    image: docker.io/bitnami/spark:3.3
    ports:
      - "9091:8080"
      - "7000:7000"
    depends_on:
      - spark-master
    environment:
      - SPARK_MASTER=spark://spark-master:7077
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=1G
      - SPARK_DRIVER_MEMORY=1G
      - SPARK_EXECUTOR_MEMORY=1G
      - SPARK_WORKLOAD=worker
      - SPARK_LOCAL_IP=spark-worker-a
    volumes:
       - /opt/spark-apps
       - /opt/spark-data
  spark-worker-b:
    image: docker.io/bitnami/spark:3.3
    ports:
      - "9092:8080"
      - "7001:7000"
    depends_on:
      - spark-master
    environment:
      - SPARK_MASTER=spark://spark-master:7077
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=1G
      - SPARK_DRIVER_MEMORY=1G
      - SPARK_EXECUTOR_MEMORY=1G
      - SPARK_WORKLOAD=worker
      - SPARK_LOCAL_IP=spark-worker-b
    volumes:
        - /opt/spark-apps
        - /opt/spark-data

  postgres:
    container_name: postgres_container
    image: postgres:11.7-alpine
    environment:
      POSTGRES_USER: admin
      POSTGRES_PASSWORD: admin
    volumes:
       - /data/postgres
    ports:
      - "4560:5432"
    restart: unless-stopped

  # jupyterlab with pyspark
  jupyter-pyspark:
    image: jupyter/pyspark-notebook:latest
    environment:
      JUPYTER_ENABLE_LAB: "yes"
    ports:
      - "9999:8888"
    volumes:
      - /app/data

I was succesful connecting to the DB, but I can’t print any data. Here’s my code:

from pyspark.sql import SparkSession

spark = SparkSession.builder
                    .appName("salesETL")
                    .config("spark.driver.extraClassPath", "./postgresql-42.5.1.jar")
                    .getOrCreate()

df = spark.read.format("jdbc").option("url", "jdbc:postgresql://postgres_container:5432/postgres")
                              .option("dbtable", "sales")
                              .option("driver", "org.postgresql.Driver")
                              .option("user", "admin")
                              .option("password", "admin").load()

df.show(10).toPandas()

With .toPandas() it gives me this error:

    AttributeError Traceback (most recent call last)
    Cell In[7], line 1
    ----> 1 df.show(10).toPandas()

AttributeError: 'NoneType' object has no attribute 'toPandas'

Without .toPandas() it print the columns but no data

+--------+----------+-----------+-------------+-----------------+-------------+--------------+----------+--------+-----------+
|order_id|order_date|customer_id|customer_name|customer_lastname|customer_city|customer_state|product_id|quantity|order_value|
+--------+----------+-----------+-------------+-----------------+-------------+--------------+----------+--------+-----------+
+--------+----------+-----------+-------------+-----------------+-------------+--------------+----------+--------+-----------+

I am new to Pyspark/Spark so I can’t figure out what I am missing. It’s my very first project. What can it be?

ps: when I run type(df) it returns pyspark.sql.dataframe.DataFrame

2

Answers


  1. Chosen as BEST ANSWER

    Well, I couldn't find a justification of why this has happened and fix it. Instead, I took a workaround: I loaded data to Python using Pandas and then changed the pandas DF to Pyspark DF.

    Here's my code:

    import psycopg2
    import pandas as pd
    from pyspark.sql import SparkSession
    from sqlalchemy import create_engine
    
    appName = "salesETL"
    master = "local"
    
    spark = SparkSession.builder.master(master).appName(appName).getOrCreate()
    
    engine = create_engine(
        "postgresql+psycopg2://admin:admin@postgres_container/postgres?client_encoding=utf8")
    pdf = pd.read_sql('select * from sales.sales', engine)
    
    # Convert Pandas dataframe to spark DataFrame
    df = spark.createDataFrame(pdf)
    

  2. show returns nothing. You should call pandas on the dataframe directly. Moreover, I think it’s to_pandas not toPandas (https://spark.apache.org/docs/3.2.0/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.to_pandas.html). So it seems the error will be vanished, with something like that:

    df.to_pandas()
    

    About the empty dataset, is there any error? If there is no error, are you sure that any records exist on the table?

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search