skip to Main Content

I’m trying to read files on s3 using Flink 1.18.1. I’m moving plugin jar to /opt/flink/plugins/s3-fs-hadoop/ in my Dockerfile. I checked it’s OK the plugin is in the right folder but when I’m running my pod I have:

Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

In the logs by the way there is

[] - Plugin loader with ID found, reusing it: s3-fs-hadoop
[] - Delegation token receiver s3-hadoop loaded and initialized

When I add "hadoop-aws" dependency it works but in my understanding it is redundancy and should work without it ?

Is it classloading problem ? How I can resolve it ? I read the doc but it’s still not clear for me.

I’m using:

  • Flink 1.18.1
  • Scala 2.12.6

My YAML:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-test
  namespace: mynamespace
  finalizers:
    - flinkdeployments.flink.apache.org/finalizer
spec:
  image: .../flink/test:v1
  flinkVersion: v1_18
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "4"
    taskmanager.memory.managed.fraction: "0.1"
    classloader.resolve-order: parent-first
    metrics.reporters: prom
    metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    metrics.reporter.prom.port: "9250"
  serviceAccount: flink
  jobManager:
    resource:
      memory: 2048m
      cpu: 1
  taskManager:
    replicas: 1
    resource:
      memory: 4096m
      cpu: 4
  job:
    jarURI: local:///tmp/myjar-0.0.1.jar
    parallelism: 2
    upgradeMode: stateless  # stateless or savepoint or last-state
    entryClass: com.org.MyMainClass
    args: [...]
  podTemplate:
    apiVersion: v1
    kind: Pod
    metadata:
      name: flink-test
    spec:
      containers:
        - name: flink-main-container
          securityContext:
            runAsUser: 9999 # UID of a non-root user
            runAsNonRoot: true
          ports:
            - name: metrics
              containerPort: 9250
              protocol: TCP
          volumeMounts:
            - mountPath: /etc/keystore
              name: user-cert
              readOnly: true
            - mountPath: /etc/truststore
              name: ca-cert
              readOnly: true
      volumes:
        - name: user-cert
          secret:
            secretName: the-user
        - name: ca-cert
          secret:
            secretName: cluster-cert

---
apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  name: flink-test
  labels:
    release: prometheus
spec:
  selector:
    matchLabels:
      app: test
  podMetricsEndpoints:
    - port: metrics

---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: flink-test-ingress
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /$2
spec:
  ingressClassName: ingress-nginx-iec
  rules:
    - host: "hostname"
      http:
        paths:
          - pathType: Prefix
            path: "/flink/flink-test(/|$)(.*)"
            backend:
              service:
                name: flink-test-rest
                port:
                  number: 8081 # Replace with your service port
  tls:
    - hosts:
        - host
      secretName: verysecretname

My Dockerfile:

FROM flink:1.18.1-scala_2.12

USER root

RUN mkdir -p /opt/flink/log/ /opt/flink/conf/ /opt/flink/plugins/s3-fs-hadoop && 
    cp /opt/flink/opt/flink-s3-fs-hadoop-1.18.1.jar /opt/flink/plugins/s3-fs-hadoop/ && 
    chown -R flink:flink /opt/flink/ &&
    chmod -R 755 /opt/flink/

# You can choose whatever directory you want
WORKDIR /opt/flink/lib

# Copy your JAR files
COPY ../target/scala-2.12/test-0.0.1.jar /tmp/test-0.0.1.jar

Assembly settings:

  assemblyMergeStrategy := {
    case PathList("META-INF", xs@_*) => MergeStrategy.discard
    case PathList("META-INF", "services", xs@_*) => MergeStrategy.concat
    case PathList("reference.conf") => MergeStrategy.concat
    case _ => MergeStrategy.first
  }

2

Answers


  1. Chosen as BEST ANSWER

    I figured out. My problem was with hadoop-common dependency and because I tried to access to a file through org.apache.hadoop.fs.FileSystem and org.apache.hadoop.fs.Path so they are conflicted with plugin. Finally using Flink FileSystem and/or flink FileSource.forRecordStreamFormat(...) I can read on s3.

    And for plugin activation: via Docker file or using ENABLE_BUILT_IN_PLUGINS both ways work's fine but we shouldn't mix them together, if we do it we have next error:

    Caused by: java.lang.IllegalStateException: Delegation token provider with service name s3-hadoop has multiple implementations
    

  2. Flink provides built-in support for several of the supported filesystem plugins like AWS, S3, Azure, etc. via the ENABLE_BUILT_IN_PLUGINS environment variable as seen below to ensure it’s available:

    containers:
      - name: flink-main-container
        ...
        env:
          - name: ENABLE_BUILT_IN_PLUGINS
            value: flink-s3-fs-hadoop-1.18.1.jar
    

    These can vary depending on what specifically you need to target but typically align with the version of Flink that you are targeting (e.g. $plugin-$flinkVersion.jar).

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search