I’m trying to read files on s3 using Flink 1.18.1. I’m moving plugin jar to /opt/flink/plugins/s3-fs-hadoop/
in my Dockerfile. I checked it’s OK the plugin is in the right folder but when I’m running my pod I have:
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
In the logs by the way there is
[] - Plugin loader with ID found, reusing it: s3-fs-hadoop
[] - Delegation token receiver s3-hadoop loaded and initialized
When I add "hadoop-aws" dependency it works but in my understanding it is redundancy and should work without it ?
Is it classloading problem ? How I can resolve it ? I read the doc but it’s still not clear for me.
I’m using:
- Flink 1.18.1
- Scala 2.12.6
My YAML:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-test
namespace: mynamespace
finalizers:
- flinkdeployments.flink.apache.org/finalizer
spec:
image: .../flink/test:v1
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "4"
taskmanager.memory.managed.fraction: "0.1"
classloader.resolve-order: parent-first
metrics.reporters: prom
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: "9250"
serviceAccount: flink
jobManager:
resource:
memory: 2048m
cpu: 1
taskManager:
replicas: 1
resource:
memory: 4096m
cpu: 4
job:
jarURI: local:///tmp/myjar-0.0.1.jar
parallelism: 2
upgradeMode: stateless # stateless or savepoint or last-state
entryClass: com.org.MyMainClass
args: [...]
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: flink-test
spec:
containers:
- name: flink-main-container
securityContext:
runAsUser: 9999 # UID of a non-root user
runAsNonRoot: true
ports:
- name: metrics
containerPort: 9250
protocol: TCP
volumeMounts:
- mountPath: /etc/keystore
name: user-cert
readOnly: true
- mountPath: /etc/truststore
name: ca-cert
readOnly: true
volumes:
- name: user-cert
secret:
secretName: the-user
- name: ca-cert
secret:
secretName: cluster-cert
---
apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
name: flink-test
labels:
release: prometheus
spec:
selector:
matchLabels:
app: test
podMetricsEndpoints:
- port: metrics
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: flink-test-ingress
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /$2
spec:
ingressClassName: ingress-nginx-iec
rules:
- host: "hostname"
http:
paths:
- pathType: Prefix
path: "/flink/flink-test(/|$)(.*)"
backend:
service:
name: flink-test-rest
port:
number: 8081 # Replace with your service port
tls:
- hosts:
- host
secretName: verysecretname
My Dockerfile:
FROM flink:1.18.1-scala_2.12
USER root
RUN mkdir -p /opt/flink/log/ /opt/flink/conf/ /opt/flink/plugins/s3-fs-hadoop &&
cp /opt/flink/opt/flink-s3-fs-hadoop-1.18.1.jar /opt/flink/plugins/s3-fs-hadoop/ &&
chown -R flink:flink /opt/flink/ &&
chmod -R 755 /opt/flink/
# You can choose whatever directory you want
WORKDIR /opt/flink/lib
# Copy your JAR files
COPY ../target/scala-2.12/test-0.0.1.jar /tmp/test-0.0.1.jar
Assembly settings:
assemblyMergeStrategy := {
case PathList("META-INF", xs@_*) => MergeStrategy.discard
case PathList("META-INF", "services", xs@_*) => MergeStrategy.concat
case PathList("reference.conf") => MergeStrategy.concat
case _ => MergeStrategy.first
}
2
Answers
I figured out. My problem was with hadoop-common dependency and because I tried to access to a file through
org.apache.hadoop.fs.FileSystem
andorg.apache.hadoop.fs.Path
so they are conflicted with plugin. Finally using Flink FileSystem and/or flinkFileSource.forRecordStreamFormat(...)
I can read on s3.And for plugin activation: via Docker file or using
ENABLE_BUILT_IN_PLUGINS
both ways work's fine but we shouldn't mix them together, if we do it we have next error:Flink provides built-in support for several of the supported filesystem plugins like AWS, S3, Azure, etc. via the
ENABLE_BUILT_IN_PLUGINS
environment variable as seen below to ensure it’s available:These can vary depending on what specifically you need to target but typically align with the version of Flink that you are targeting (e.g.
$plugin-$flinkVersion.jar
).