skip to Main Content

I am working with a Docker setup for Hadoop and Spark using the following repository: docker-hadoop-spark. My Docker Compose YAML configuration is working correctly, and I am able to run the containers without issues.

Here is the YAML configuration:

version: "3"

services:
  namenode:
    image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8
    container_name: namenode
    restart: always
    ports:
      - 9870:9870
      - 9010:9000
    volumes:
      - hadoop_namenode:/hadoop/dfs/name
    environment:
      - CLUSTER_NAME=test
      - CORE_CONF_fs_defaultFS=hdfs://namenode:9000
    env_file:
      - ./hadoop.env

  datanode:
    image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8
    container_name: datanode
    restart: always
    volumes:
      - hadoop_datanode:/hadoop/dfs/data
    environment:
      SERVICE_PRECONDITION: "namenode:9870"
      CORE_CONF_fs_defaultFS: hdfs://namenode:9000
    ports:
      - "9864:9864"
    env_file:
      - ./hadoop.env

  resourcemanager:
    image: bde2020/hadoop-resourcemanager:2.0.0-hadoop3.2.1-java8
    container_name: resourcemanager
    restart: always
    environment:
      SERVICE_PRECONDITION: "namenode:9000 namenode:9870 datanode:9864"
    env_file:
      - ./hadoop.env

  nodemanager1:
    image: bde2020/hadoop-nodemanager:2.0.0-hadoop3.2.1-java8
    container_name: nodemanager
    restart: always
    environment:
      SERVICE_PRECONDITION: "namenode:9000 namenode:9870 datanode:9864 resourcemanager:8088"
    env_file:
      - ./hadoop.env

  historyserver:
    image: bde2020/hadoop-historyserver:2.0.0-hadoop3.2.1-java8
    container_name: historyserver
    restart: always
    environment:
      SERVICE_PRECONDITION: "namenode:9000 namenode:9870 datanode:9864 resourcemanager:8088"
    volumes:
      - hadoop_historyserver:/hadoop/yarn/timeline
    env_file:
      - ./hadoop.env

  spark-master:
    image: bde2020/spark-master:3.0.0-hadoop3.2
    container_name: spark-master
    depends_on:
      - namenode
      - datanode
    ports:
      - "8080:8080"
      - "7077:7077"
    environment:
      - INIT_DAEMON_STEP=setup_spark
      - CORE_CONF_fs_defaultFS=hdfs://namenode:9000

  spark-worker-1:
    image: bde2020/spark-worker:3.0.0-hadoop3.2
    container_name: spark-worker-1
    depends_on:
      - spark-master
    ports:
      - "8081:8081"
    environment:
      - "SPARK_MASTER=spark://spark-master:7077"
      - CORE_CONF_fs_defaultFS=hdfs://namenode:9000

  hive-server:
    image: bde2020/hive:2.3.2-postgresql-metastore
    container_name: hive-server
    depends_on:
      - namenode
      - datanode
    env_file:
      - ./hadoop-hive.env
    environment:
      HIVE_CORE_CONF_javax_jdo_option_ConnectionURL: "jdbc:postgresql://hive-metastore/metastore"
      SERVICE_PRECONDITION: "hive-metastore:9083"
    ports:
      - "10000:10000"

  hive-metastore:
    image: bde2020/hive:2.3.2-postgresql-metastore
    container_name: hive-metastore
    env_file:
      - ./hadoop-hive.env
    command: /opt/hive/bin/hive --service metastore
    environment:
      SERVICE_PRECONDITION: "namenode:9870 datanode:9864 hive-metastore-postgresql:5432"
    ports:
      - "9083:9083"

  hive-metastore-postgresql:
    image: bde2020/hive-metastore-postgresql:2.3.0
    container_name: hive-metastore-postgresql

  presto-coordinator:
    image: shawnzhu/prestodb:0.181
    container_name: presto-coordinator
    ports:
      - "8089:8089"

volumes:
  hadoop_namenode:
  hadoop_datanode:
  hadoop_historyserver:

However, I’m having trouble connecting my Spark code to the Spark container. Specifically, even though I have configured my Spark code to connect via port 9010, it seems to default to port 9000 instead. here is my Spark code with errors written inside the code as comment:

import org.apache.spark.sql.SparkSession

object HiveCheck {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("HiveMetastoreCheck")
      .master("spark://spark-master:7077")
      .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9010")
      .config("hive.metastore.uris", "thrift://hive-metastore:9083")
      .enableHiveSupport()
      .getOrCreate()

      println("Databases in Hive:")
      spark.sql("SHOW DATABASES").show()

    // Works fine. Output:
    //    +---------+
    //    |namespace|
    //    +---------+
    //    |  default|
    //    +---------+

      println("Tables in database 'default':")
      spark.sql("SHOW TABLES IN default").show()

    // Works fine. Output:
    //    +--------+----------+-----------+
    //    |database| tableName|isTemporary|
    //    +--------+----------+-----------+
    //    | default|test_table|      false|
    //    +--------+----------+-----------+

       println("Query a table in Hive:")
       spark.sql("SELECT * FROM default.test_table;").show()
    
    // Error:
    // Call From My-PC/123.456.78.9 to namenode:9000 failed on connection exception: java.net.ConnectException: Connection refused: no further information;
  }
}

Here is the output of the connectivity test done inside the spark-worker:

# ping namenode
PING namenode (172.18.0.9): 56 data bytes
64 bytes from 172.18.0.9: seq=0 ttl=64 time=0.280 ms
64 bytes from 172.18.0.9: seq=1 ttl=64 time=0.161 ms
64 bytes from 172.18.0.9: seq=2 ttl=64 time=0.138 ms
64 bytes from 172.18.0.9: seq=3 ttl=64 time=0.195 ms
64 bytes from 172.18.0.9: seq=4 ttl=64 time=0.192 ms
--- namenode ping statistics ---
2 packets transmitted, 2 packets received, 0% packet loss
round-trip min/avg/max = 0.138/0.193/0.280 ms
# nc -zv namenode 9000
namenode (172.18.0.9:9000) open
# ping hive-metastore
PING hive-metastore (172.18.0.6): 56 data bytes
64 bytes from 172.18.0.6: seq=0 ttl=64 time=0.193 ms
64 bytes from 172.18.0.6: seq=1 ttl=64 time=0.161 ms
--- hive-metastore ping statistics ---
2 packets transmitted, 2 packets received, 0% packet loss
round-trip min/avg/max = 0.161/0.177/0.193 ms
# nc -zv hive-metastore 9083
hive-metastore (172.18.0.6:9083) open
# netstat -tuln
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       
tcp        0      0 127.0.0.11:42043        0.0.0.0:*               LISTEN      
tcp        0      0 ::ffff:172.18.0.12:36585 :::*                    LISTEN      
tcp        0      0 :::8081                 :::*                    LISTEN      
udp        0      0 127.0.0.11:57887        0.0.0.0:*

Update:
I added e 9000:9000 instead of 9010:9000, then added hostname namenode with ip 127.0.0.1 to the windows hosts file:

127.0.0.1 namenode

Now, when I run the code, it doesn’t throw an error but continues to run indefinitely:

24/09/14 23:31:14 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20240914213107-0002/2 is now EXITED (Command exited with code 1)
24/09/14 23:31:14 INFO StandaloneSchedulerBackend: Executor app-20240914213107-0002/2 removed: Command exited with code 1
24/09/14 23:31:14 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20240914213107-0002/3 on worker-20240914210300-123.45.6.75-43893 (123.45.6.75:43893) with 8 core(s)
24/09/14 23:31:14 INFO BlockManagerMaster: Removal of executor 2 requested
24/09/14 23:31:14 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asked to remove non-existent executor 2
24/09/14 23:31:14 INFO StandaloneSchedulerBackend: Granted executor ID app-20240914213107-0002/3 on hostPort 123.45.6.75:43893 with 8 core(s), 1024.0 MiB RAM
24/09/14 23:31:14 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
24/09/14 23:31:14 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20240914213107-0002/3 is now RUNNING
24/09/14 23:31:15 INFO CodeGenerator: Code generated in 27.9508 ms
24/09/14 23:31:15 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 286.2 KiB, free 1983.0 MiB)
24/09/14 23:31:15 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 24.0 KiB, free 1983.0 MiB)
24/09/14 23:31:15 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on My-PC:55526 (size: 24.0 KiB, free: 1983.3 MiB)
24/09/14 23:31:15 INFO SparkContext: Created broadcast 0 from 
24/09/14 23:31:15 INFO FileInputFormat: Total input paths to process : 1
24/09/14 23:31:15 INFO SparkContext: Starting job: show at HiveTest2.scala:38
24/09/14 23:31:15 INFO DAGScheduler: Got job 0 (show at HiveTest2.scala:38) with 1 output partitions
24/09/14 23:31:15 INFO DAGScheduler: Final stage: ResultStage 0 (show at HiveTest2.scala:38)
24/09/14 23:31:15 INFO DAGScheduler: Parents of final stage: List()
24/09/14 23:31:15 INFO DAGScheduler: Missing parents: List()
24/09/14 23:31:15 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[5] at show at HiveTest2.scala:38), which has no missing parents
24/09/14 23:31:15 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 12.7 KiB, free 1983.0 MiB)
24/09/14 23:31:15 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 6.2 KiB, free 1983.0 MiB)
24/09/14 23:31:15 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on My-PC:55526 (size: 6.2 KiB, free: 1983.3 MiB)
24/09/14 23:31:15 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1200
24/09/14 23:31:15 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[5] at show at HiveTest2.scala:38) (first 15 tasks are for partitions Vector(0))
24/09/14 23:31:15 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
24/09/14 23:31:17 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20240914213107-0002/3 is now EXITED (Command exited with code 1)
24/09/14 23:31:17 INFO StandaloneSchedulerBackend: Executor app-20240914213107-0002/3 removed: Command exited with code 1
24/09/14 23:31:17 INFO BlockManagerMaster: Removal of executor 3 requested
24/09/14 23:31:17 INFO BlockManagerMasterEndpoint: Trying to remove executor 3 from BlockManagerMaster.
24/09/14 23:31:17 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asked to remove non-existent executor 3
24/09/14 23:31:17 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20240914213107-0002/4 on worker-20240914210300-123.45.6.75-43893 (123.45.6.75:43893) with 8 core(s)
24/09/14 23:31:17 INFO StandaloneSchedulerBackend: Granted executor ID app-20240914213107-0002/4 on hostPort 123.45.6.75:43893 with 8 core(s), 1024.0 MiB RAM
24/09/14 23:31:17 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20240914213107-0002/4 is now RUNNING
24/09/14 23:31:19 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20240914213107-0002/4 is now EXITED (Command exited with code 1)
24/09/14 23:31:19 INFO StandaloneSchedulerBackend: Executor app-20240914213107-0002/4 removed: Command exited with code 1
24/09/14 23:31:19 INFO BlockManagerMasterEndpoint: Trying to remove executor 4 from BlockManagerMaster.
24/09/14 23:31:19 INFO BlockManagerMaster: Removal of executor 4 requested

and the error repeats with the executor index increasing.

2

Answers


  1. Now it seems like executors are starting and then failing with an error. Have you tried to increase memory (and cores if available)?

    spark-worker-1:
        image: bde2020/spark-worker:3.0.0-hadoop3.2
        container_name: spark-worker-1
        depends_on:
          - spark-master
        ports:
          - "8081:8081"
        environment:
          - "SPARK_MASTER=spark://spark-master:7077"
          - CORE_CONF_fs_defaultFS=hdfs://namenode:9000
          - SPARK_WORKER_MEMORY=4g
          - SPARK_WORKER_CORES=4 #it's optional, if available
    

    UPD:

    What I have done:

    1. Cloned repo
    2. run docker compose up command with your docker-compose.yml
    3. started spark shell
       docker exec -it spark-master bash
       spark/bin/spark-shell --master spark://spark-master:7077
    
    1. Run your spark code.
       scala> object HiveCheck {
            |   def main(args: Array[String]): Unit = {
            |     val spark = SparkSession.builder()
            |       .appName("HiveMetastoreCheck")
            |       .master("spark://spark-master:7077")
            |       .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9010")
            |       .config("hive.metastore.uris", "thrift://hive-metastore:9083")
            |       .enableHiveSupport()
            |       .getOrCreate()
            | 
            |       println("Databases in Hive:")
            |       spark.sql("SHOW DATABASES").show()
            |
            | 
            |       println("Tables in database 'default':")
            |       spark.sql("SHOW TABLES IN default").show()
            | 
            | 
            |        println("Query a table in Hive:")
            |        spark.sql("SELECT * FROM default.test_table;").show()
            |     
            |   }
            | }
       defined object HiveCheck
    
    1. Created and filled the table
        scala> spark.sql("CREATE TABLE IF NOT EXISTS default.test_table2 (id_studente INT, nome_corso STRING, voto INT, data_esame STRING)")
           24/09/19 10:11:09 WARN HiveMetaStore: Location: file:/spark-warehouse/test_table2 specified for non-external table:test_table2
    
       scala> spark.sql("INSERT INTO default.test_table2 VALUES (1, 'Corso2', 29, '2024-09-06')")
       res23: org.apache.spark.sql.DataFrame = []
    
    1. I have no issues with querying a table
    scala> spark.sql("SELECT * FROM default.test_table2;").show()
    +-----------+----------+----+----------+
    |id_studente|nome_corso|voto|data_esame|
    +-----------+----------+----+----------+
    +-----------+----------+----+----------+
    
    Login or Signup to reply.
  2. I got it working by changing hostnames in your code to 127.0.0.1 so it looks like

        val spark = SparkSession.builder()
          .appName("HiveMetastoreCheck")
          .master("spark://127.0.0.1:7077")
          .config("spark.hadoop.fs.defaultFS", "hdfs://127.0.0.1:9010")
          .config("hive.metastore.uris", "thrift://127.0.0.1:9083")
          .enableHiveSupport()
          .getOrCreate()
    

    Step what I did to make it work

    1. git clone https://github.com/Marcel-Jan/docker-hadoop-spark .
    2. docker compose up -d
    3. Start spark scala shell
    4. Run code below
    import org.apache.spark.sql.SparkSession
    
    // change to one-liner to make in work in shell
    val spark = SparkSession.builder().appName("HiveMetastoreCheck").master("spark://127.0.0.1:7077").config("spark.hadoop.fs.defaultFS", "hdfs://127.0.0.1:9010").config("hive.metastore.uris", "thrift://127.0.0.1:9083").enableHiveSupport().getOrCreate()
          
    spark.sql("CREATE TABLE IF NOT EXISTS default.test_table (id INT)")
    
    println("Databases in Hive:")
    spark.sql("SHOW DATABASES").show()
    
    println("Tables in database 'default':")
    spark.sql("SHOW TABLES IN default").show()
    
    println("Query a table in Hive:")
    spark.sql("SELECT * FROM default.test_table;").show()
    
    1. Got result
    +---------+
    |namespace|
    +---------+
    |  default|
    +---------+
    
    +---------+----------+-----------+
    |namespace| tableName|isTemporary|
    +---------+----------+-----------+
    |  default|test_table|      false|
    +---------+----------+-----------+
    
    +---+
    | id|
    +---+
    +---+
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search