I have a pipeline written in Go that I want to execute with Spark runner, the Spark Standalone is installed on my local machine.
-
Apache Beam 2.56.0
-
Apache Spark 3.2.2
I started Spark master and worker from the installation dir with this commands.
# for master
./sbin/start-master.sh -h localhost
# for worker
./sbin/start-worker.sh spark://localhost:7077
Then I started the beam_spark3_job_server and mounted /tmp
docker run -v /tmp:/tmp --net=host apache/beam_spark3_job_server:2.56.0
--spark-master-url=spark://localhost:7077
Now, from the Go project running
go run main.go --runner PortableRunner
--endpoint localhost:8099
--environment_type LOOPBACK
works fine, but the environment_type
is set as LOOPBACK
.
So if I want to remove it and run the script again without it (by default it’s set to DOCKER
)
go run main.go --runner PortableRunner
--endpoint localhost:8099
with that I get this on the console
java.lang.IllegalStateException: No container running for id xxxxx
Though it’s different than in this thread No container running for id xxxxxx when running apache beam go sdk examples, because using the docker run command with -v solved the issue with not finding the file in the /tmp/beam-artifact-staging
.
Nonetheless the issue still persists.
These are some fragments of logs from Spark
24/07/02 15:21:37 DEBUG DockerEnvironmentFactory: Creating Docker Container with ID 1-1
24/07/02 15:21:39 DEBUG DockerEnvironmentFactory: Created Docker Container with Container ID 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4
24/07/02 15:21:39 INFO GrpcLoggingService: Beam Fn Logging client connected.
24/07/02 15:21:39 DEBUG : Initializing Go harness: /opt/apache/beam/boot --id=1-1 --provision_endpoint=localhost:39005
24/07/02 15:21:39 DEBUG LocalFileSystem: opening file /tmp/beam-artifact-staging/9b228b83e120b5aa87f4ce34788bacdf1c35d2f05311deb8efb494bfbea0ff0b/1-0:go-/tmp/worker-1-1719926483620669554
24/07/02 15:21:40 WARN GrpcLoggingService: Logging client failed unexpectedly.
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.StatusRuntimeException: CANCELLED: client cancelled
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Status.asRuntimeException(Status.java:529)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:370)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:359)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:910)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
24/07/02 15:21:42 DEBUG AwsRegionProviderChain: Unable to load region from software.amazon.awssdk.regions.providers.InstanceProfileRegionProvider@247a4d99:Unable to contact EC2 metadata service.
24/07/02 15:21:42 DEBUG LocalDiskShuffleMapOutputWriter: Writing shuffle index file for mapId 1 with length 8
24/07/02 15:21:42 DEBUG IndexShuffleBlockResolver: Shuffle index for mapId 1: [0,0,0,0,0,0,0,0]
24/07/02 15:21:42 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 6019 bytes result sent to driver
24/07/02 15:21:42 DEBUG ExecutorMetricsPoller: stageTCMP: (0, 0) -> 1
24/07/02 15:21:44 INFO DockerEnvironmentFactory: Still waiting for startup of environment apache/beam_go_sdk:2.56.0 for worker id 1-1
24/07/02 15:21:44 ERROR DockerEnvironmentFactory: Docker container 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4 logs:
2024/07/02 13:21:39 Provision info:
pipeline_options:{fields:{key:"beam:option:app_name:v1" value:{string_value:"go-job-1-1719926483620667092"}} fields:{key:"beam:option:experiments:v1" value:{list_value:{values:{string_value:"beam_fn_api"}}}} fields:{key:"beam:option:go_options:v1" value:{struct_value:{fields:{key:"options" value:{struct_value:{fields:{key:"endpoint" value:{string_value:"localhost:8099"}} fields:{key:"hookOrder" value:{string_value:"["default_remote_logging"]"}} fields:{key:"hooks" value:{string_value:"{"default_remote_logging":null}"}} fields:{key:"job" value:{string_value:"wordcount"}} fields:{key:"runner" value:{string_value:"spark"}}}}}}}} fields:{key:"beam:option:job_name:v1" value:{string_value:"go0job0101719926483620667092-root-0702132126-ff3f12ba"}} fields:{key:"beam:option:options_id:v1" value:{number_value:2}} fields:{key:"beam:option:parallelism:v1" value:{number_value:-1}} fields:{key:"beam:option:retain_docker_containers:v1" value:{bool_value:false}} fields:{key:"beam:option:runner:v1" value:{null_value:NULL_VALUE}} fields:{key:"beam:option:spark_master:v1" value:{string_value:"spark://localhost:7077"}}} retrieval_token:"go-job-1-1719926483620667092_8d8b0d53-0d18-49dc-908b-a85d0be89cc5" logging_endpoint:{url:"localhost:36449"} artifact_endpoint:{url:"localhost:36373"} control_endpoint:{url:"localhost:43091"} dependencies:{type_urn:"beam:artifact:type:file:v1" type_payload:"nx84x01/tmp/beam-artifact-staging/9b228b83e120b5aa87f4ce34788bacdf1c35d2f05311deb8efb494bfbea0ff0b/1-0:go-/tmp/worker-1-1719926483620669554" role_urn:"beam:artifact:role:go_worker_binary:v1"} runner_capabilities:"beam:protocol:control_response_elements_embedding:v1"
2024/07/02 13:21:40 Downloaded: /tmp/staged/1-worker-1-1719926483620669554 (sha256: 89580cb558dbc92138c20bdb88f8687d7c96386e9f6d0b717b07b68fe9327476, size: 122860883)
24/07/02 15:21:44 ERROR Executor: Exception in task 7.0 in stage 0.0 (TID 7)
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: No container running for id 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:458)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:443)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:310)
at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207)
at org.apache.beam.runners.spark.translation.SparkExecutableStageFunction.call(SparkExecutableStageFunction.java:142)
at org.apache.beam.runners.spark.translation.SparkExecutableStageFunction.call(SparkExecutableStageFunction.java:81)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitions$1(JavaRDDLike.scala:153)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalStateException: No container running for id 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4
at org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:137)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:259)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
... 39 more
Suppressed: java.io.IOException: Received exit code 1 for command 'docker kill 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4'. stderr: Error response from daemon: cannot kill container: 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4: container 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4 is not running
at org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:255)
at org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:181)
at org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer(DockerCommand.java:161)
at org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:161)
... 45 more
Most interesting lines being
24/07/02 15:21:44 INFO DockerEnvironmentFactory: Still waiting for startup of environment apache/beam_go_sdk:2.56.0 for worker id 1-1
24/07/02 15:21:44 ERROR DockerEnvironmentFactory: Docker container 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4 logs:
2
Answers
Seems like the issue was with the way I structured the code in Go as well as the way of how I wrote it. Neither Beam, Spark nor Docker were the problem in this case.
How did you solve the execution problem? I’m having a similar issue.