skip to Main Content

I have the following configuration setup.
build.gradle:

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.1.0-SNAPSHOT'
    id 'io.spring.dependency-management' version '1.1.0'
    id 'org.sonarqube' version "4.0.0.2929"
    id "jacoco"
}

group = 'com.realtime'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

jacoco {
    toolVersion = "0.8.9"
}


repositories {
    mavenCentral()
    maven { url 'https://repo.spring.io/milestone' }
    maven { url 'https://repo.spring.io/snapshot' }
}

ext {
    set('snippetsDir', file("build/generated-snippets"))
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    implementation 'org.springframework.boot:spring-boot-starter-cache'
    implementation 'org.springframework.boot:spring-boot-starter-data-elasticsearch'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    implementation 'com.fasterxml.jackson:jackson-bom'
//  implementation 'org.springframework.boot:spring-boot-starter-data-ldap'
//  implementation 'org.springframework.boot:spring-boot-starter-data-mongodb'
//  implementation 'org.springframework.boot:spring-boot-starter-data-mongodb-reactive'
//  implementation 'org.springframework.boot:spring-boot-starter-data-redis'
//  implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive'
    implementation 'org.springframework.boot:spring-boot-starter-data-rest'
    implementation 'org.springframework.boot:spring-boot-starter-graphql'
    implementation 'org.springframework.security:spring-security-oauth2-authorization-server:1.0.2'
//  implementation 'org.springframework.boot:spring-boot-starter-oauth2-client'
//  implementation 'org.springframework.boot:spring-boot-starter-oauth2-resource-server'
    implementation 'org.springframework.boot:spring-boot-starter-security'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.apache.kafka:kafka-streams'
    implementation 'org.springframework.kafka:spring-kafka'
    implementation 'org.springframework.session:spring-session-data-redis'
    implementation 'io.debezium:debezium-embedded:2.2.0.Final'
    implementation 'io.debezium:debezium-connector-postgres:2.2.0.Final'
    implementation 'org.apache.commons:commons-lang3:3.12.0'
    developmentOnly 'org.springframework.boot:spring-boot-devtools'
    runtimeOnly 'io.micrometer:micrometer-registry-prometheus'
    runtimeOnly 'org.postgresql:postgresql'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'io.projectreactor:reactor-test'
    testImplementation 'org.springframework.batch:spring-batch-test'
    testImplementation 'org.springframework.graphql:spring-graphql-test'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
    testImplementation 'org.springframework.restdocs:spring-restdocs-mockmvc'
    testImplementation 'org.springframework.security:spring-security-test'
}

tasks.named('test') {
    outputs.dir snippetsDir
    useJUnitPlatform()
    finalizedBy jacocoTestReport
}

jacocoTestReport {
    reports {
        xml.enabled(true)
        html.outputLocation = layout.buildDirectory.dir('jacocoHtml')
    }
}

sonar {
    properties {
        property "sonar.java.coveragePlugin", "jacoco"
        property "sonar.projectKey", "project"
        property "sonar.projectName", "project"
        property "sonar.coverage.jacoco.xmlReportPath", "${buildDir}/reports/jacoco.xml"
    }
}





@Configuration
public class DebeziumConfig {

    @Bean
    public io.debezium.config.Configuration studentConnector() {
        return io.debezium.config.Configuration.create()
                .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
                .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
                .with("offset.storage.file.filename", "/home/amith/downloads/test.dat")
                .with("offset.flush.interval.ms", 60000)
                .with("name", "auth")
                .with("database.server.name", "auth")
                .with("database.hostname", "localhost")
                .with("database.port", "5432")
                .with("database.user", "admin")
                .with("database.password", "password")
                .with("database.dbname", "auth")
                .with("table.whitelist", "public.client").build();

    }
}



@Slf4j
@Component
public class CDCListener {

    private final Executor executor = Executors.newSingleThreadExecutor();
    private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;

    public CDCListener(Configuration customerConnectorConfiguration) {

        this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
                .using(customerConnectorConfiguration.asProperties())
                .notifying(this::handleChangeEvent)
                .build();

    }

    private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
        SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();

        log.info("Key = '" + sourceRecord.key() + "' value = '" + sourceRecord.value() + "'");

        Struct sourceRecordChangeValue= (Struct) sourceRecord.value();

        if (sourceRecordChangeValue != null) {
            Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));

            if(operation != Operation.READ) {
                String record = operation == Operation.DELETE ? BEFORE : AFTER; // Handling Update & Insert operations.

                Struct struct = (Struct) sourceRecordChangeValue.get(record);
                Map<String, Object> payload = struct.schema().fields().stream()
                        .map(Field::name)
                        .filter(fieldName -> struct.get(fieldName) != null)
                        .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
                        .collect(toMap(Pair::getKey, Pair::getValue));

                log.info("Updated Data: {} with Operation: {}", payload, operation.name());
            }
        }
    }

    @PostConstruct
    private void start() {
        this.executor.execute(debeziumEngine);
    }

    @PreDestroy
    private void stop() throws IOException {
        if (this.debeziumEngine != null) {
            this.debeziumEngine.close();
        }
    }
}

NOTE: I also have spring security configured and this particular application is acting as an authorization server using the new Oauth2 authorization server configurations.

version: '3.9'
services:
  postgres:
    image: 'debezium/postgres'
    container_name: postgres
    ports:
      - '5432:5432'
    environment:
      - POSTGRES_DB=auth
      - POSTGRES_USER=admin
      - POSTGRES_PASSWORD=password
package com.realtime.project.controller;

import com.realtime.project.entity.Client;
import com.realtime.project.service.RegisteredClientService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.oauth2.server.authorization.client.RegisteredClient;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;

/**
 * CLIENT CONTROLLER THAT PROVIDES END POINTS USED TO REGISTER A NEW
 * CLIENT OR GET CLIENT DETAILS
 */
@Slf4j
@RestController
@RequestMapping(value = "/client")
public class ClientController {

    @Autowired
    private RegisteredClientService registeredClientService;

    /**
     * USED TO REGISTER A NEW CLIENT
     * @param client
     * @return
     */
    @PostMapping(value = "/add")
    public Mono<Client> registerClient(@RequestBody Client client) {
        log.info("Entered ClientController ---> registerClient ---> Attempting to register new client");
        registeredClientService.save(client);
        log.info("Exited ClientController --> registerClient ---> Client Registered Successfully");
        return Mono.just(client);
    }


    /**
     * USED TO GET CLIENT DETAILS
     * @return
     */
    @GetMapping("/get")
    public Mono<RegisteredClient> getClient(@RequestParam String clientID) {
        log.info("Entered ClientController ---> getClient ---> Fetching Client Details");
        RegisteredClient clientId = registeredClientService.findByClientId(clientID);
        log.info("Exited ClientController ---> getClient ---> Client Details fetched");
        return Mono.just(clientId);

    }
}


When i hit the endpoint using postman i am getting the response and the data has been updated in the database. But when i look at the logs there is nothing displayed regarding the transaction.I have used loggers inside the listener class ideally if it had detected a change then the data should be displayed in the console.

POSTMAN RESPONSE:
enter image description here

CONSOLE LOGS:

8:48:55 pm: Executing ':ProjectApplication.main()'...

Starting Gradle Daemon...
Gradle Daemon started in 391 ms
> Task :compileJava UP-TO-DATE
> Task :processResources UP-TO-DATE
> Task :classes UP-TO-DATE

> Task :ProjectApplication.main()

  .   ____          _            __ _ _
 /\ / ___'_ __ _ _(_)_ __  __ _    
( ( )___ | '_ | '_| | '_ / _` |    
 \/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |___, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::       (v3.1.0-SNAPSHOT)

2023-04-26T20:49:00.411+05:30  INFO 116064 --- [  restartedMain] c.realtime.project.ProjectApplication    : Starting ProjectApplication using Java 20 with PID 116064 (/home/amith/Downloads/project/build/classes/java/main started by amith in /home/amith/Downloads/project)
2023-04-26T20:49:00.414+05:30  INFO 116064 --- [  restartedMain] c.realtime.project.ProjectApplication    : No active profile set, falling back to 1 default profile: "default"
2023-04-26T20:49:00.441+05:30  INFO 116064 --- [  restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable
2023-04-26T20:49:00.441+05:30  INFO 116064 --- [  restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : For additional web related logging consider setting the 'logging.level.web' property to 'DEBUG'
2023-04-26T20:49:01.182+05:30  INFO 116064 --- [  restartedMain] .s.d.r.c.RepositoryConfigurationDelegate : Multiple Spring Data modules found, entering strict repository configuration mode
2023-04-26T20:49:01.184+05:30  INFO 116064 --- [  restartedMain] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data Elasticsearch repositories in DEFAULT mode.
2023-04-26T20:49:01.202+05:30  INFO 116064 --- [  restartedMain] .RepositoryConfigurationExtensionSupport : Spring Data Elasticsearch - Could not safely identify store assignment for repository candidate interface com.realtime.project.repository.ClientRepository; If you want this repository to be a Elasticsearch repository, consider annotating your entities with one of these annotations: org.springframework.data.elasticsearch.annotations.Document (preferred), or consider extending one of the following types with your repository: org.springframework.data.elasticsearch.repository.ElasticsearchRepository, org.springframework.data.elasticsearch.repository.ElasticsearchRepository
2023-04-26T20:49:01.203+05:30  INFO 116064 --- [  restartedMain] .RepositoryConfigurationExtensionSupport : Spring Data Elasticsearch - Could not safely identify store assignment for repository candidate interface com.realtime.project.repository.UserRepository; If you want this repository to be a Elasticsearch repository, consider annotating your entities with one of these annotations: org.springframework.data.elasticsearch.annotations.Document (preferred), or consider extending one of the following types with your repository: org.springframework.data.elasticsearch.repository.ElasticsearchRepository, org.springframework.data.elasticsearch.repository.ElasticsearchRepository
2023-04-26T20:49:01.203+05:30  INFO 116064 --- [  restartedMain] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 16 ms. Found 0 Elasticsearch repository interfaces.
2023-04-26T20:49:01.206+05:30  INFO 116064 --- [  restartedMain] .s.d.r.c.RepositoryConfigurationDelegate : Multiple Spring Data modules found, entering strict repository configuration mode
2023-04-26T20:49:01.206+05:30  INFO 116064 --- [  restartedMain] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data Reactive Elasticsearch repositories in DEFAULT mode.
2023-04-26T20:49:01.208+05:30  INFO 116064 --- [  restartedMain] .RepositoryConfigurationExtensionSupport : Spring Data Reactive Elasticsearch - Could not safely identify store assignment for repository candidate interface com.realtime.project.repository.ClientRepository; If you want this repository to be a Reactive Elasticsearch repository, consider annotating your entities with one of these annotations: org.springframework.data.elasticsearch.annotations.Document (preferred), or consider extending one of the following types with your repository: org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository
2023-04-26T20:49:01.208+05:30  INFO 116064 --- [  restartedMain] .RepositoryConfigurationExtensionSupport : Spring Data Reactive Elasticsearch - Could not safely identify store assignment for repository candidate interface com.realtime.project.repository.UserRepository; If you want this repository to be a Reactive Elasticsearch repository, consider annotating your entities with one of these annotations: org.springframework.data.elasticsearch.annotations.Document (preferred), or consider extending one of the following types with your repository: org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository
2023-04-26T20:49:01.208+05:30  INFO 116064 --- [  restartedMain] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 2 ms. Found 0 Reactive Elasticsearch repository interfaces.
2023-04-26T20:49:01.213+05:30  INFO 116064 --- [  restartedMain] .s.d.r.c.RepositoryConfigurationDelegate : Multiple Spring Data modules found, entering strict repository configuration mode
2023-04-26T20:49:01.213+05:30  INFO 116064 --- [  restartedMain] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data JPA repositories in DEFAULT mode.
2023-04-26T20:49:01.265+05:30  INFO 116064 --- [  restartedMain] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 50 ms. Found 2 JPA repository interfaces.
2023-04-26T20:49:01.712+05:30  INFO 116064 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8081 (http)
2023-04-26T20:49:01.719+05:30  INFO 116064 --- [  restartedMain] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2023-04-26T20:49:01.720+05:30  INFO 116064 --- [  restartedMain] o.apache.catalina.core.StandardEngine    : Starting Servlet engine: [Apache Tomcat/10.1.8]
2023-04-26T20:49:01.752+05:30  INFO 116064 --- [  restartedMain] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2023-04-26T20:49:01.754+05:30  INFO 116064 --- [  restartedMain] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1312 ms
Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=on -Dswing.aatext=true
2023-04-26T20:49:01.892+05:30  INFO 116064 --- [  restartedMain] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2023-04-26T20:49:01.949+05:30  INFO 116064 --- [  restartedMain] com.zaxxer.hikari.pool.HikariPool        : HikariPool-1 - Added connection org.postgresql.jdbc.PgConnection@477c8d49
2023-04-26T20:49:01.950+05:30  INFO 116064 --- [  restartedMain] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2023-04-26T20:49:01.999+05:30  INFO 116064 --- [  restartedMain] o.hibernate.jpa.internal.util.LogHelper  : HHH000204: Processing PersistenceUnitInfo [name: default]
2023-04-26T20:49:02.024+05:30  INFO 116064 --- [  restartedMain] org.hibernate.Version                    : HHH000412: Hibernate ORM core version 6.2.1.Final
2023-04-26T20:49:02.025+05:30  INFO 116064 --- [  restartedMain] org.hibernate.cfg.Environment            : HHH000406: Using bytecode reflection optimizer
2023-04-26T20:49:02.087+05:30  INFO 116064 --- [  restartedMain] o.h.b.i.BytecodeProviderInitiator        : HHH000021: Bytecode provider name : bytebuddy
2023-04-26T20:49:02.140+05:30  INFO 116064 --- [  restartedMain] o.s.o.j.p.SpringPersistenceUnitInfo      : No LoadTimeWeaver setup: ignoring JPA class transformer
2023-04-26T20:49:02.171+05:30  WARN 116064 --- [  restartedMain] org.hibernate.dialect.Dialect            : HHH000511: The 9.6.0 version for [org.hibernate.dialect.PostgreSQLDialect] is no longer supported, hence certain features may not work properly. The minimum supported version is 10.0.0. Check the community dialects project for available legacy versions.
2023-04-26T20:49:02.173+05:30  INFO 116064 --- [  restartedMain] SQL dialect                              : HHH000400: Using dialect: org.hibernate.dialect.PostgreSQLDialect, version: org.hibernate.engine.jdbc.env.internal.JdbcEnvironmentInitiator$DialectResolutionInfoImpl@37b473de
2023-04-26T20:49:02.303+05:30  INFO 116064 --- [  restartedMain] o.h.b.i.BytecodeProviderInitiator        : HHH000021: Bytecode provider name : bytebuddy
2023-04-26T20:49:02.623+05:30  INFO 116064 --- [  restartedMain] o.h.e.t.j.p.i.JtaPlatformInitiator       : HHH000490: Using JtaPlatform implementation: [org.hibernate.engine.transaction.jta.platform.internal.NoJtaPlatform]
2023-04-26T20:49:03.253+05:30  INFO 116064 --- [  restartedMain] o.h.t.s.i.e.GenerationTargetToDatabase   : HHH000476: Executing script '[injected ScriptSourceInputNonExistentImpl script]'
2023-04-26T20:49:03.254+05:30  INFO 116064 --- [  restartedMain] j.LocalContainerEntityManagerFactoryBean : Initialized JPA EntityManagerFactory for persistence unit 'default'
2023-04-26T20:49:03.487+05:30  INFO 116064 --- [  restartedMain] o.a.k.connect.json.JsonConverterConfig   : JsonConverterConfig values: 
    converter.type = key
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = false

2023-04-26T20:49:03.489+05:30  INFO 116064 --- [  restartedMain] o.a.k.connect.json.JsonConverterConfig   : JsonConverterConfig values: 
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = false

2023-04-26T20:49:03.493+05:30  INFO 116064 --- [  restartedMain] i.d.e.EmbeddedEngine$EmbeddedConfig      : EmbeddedConfig values: 
    access.control.allow.methods = 
    access.control.allow.origin = 
    admin.listeners = null
    auto.include.jmx.reporter = true
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = use_all_dns_ips
    config.providers = []
    connector.client.config.override.policy = All
    header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
    key.converter = class org.apache.kafka.connect.json.JsonConverter
    listeners = [http://:8083]
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    offset.flush.interval.ms = 60000
    offset.flush.timeout.ms = 5000
    offset.storage.file.filename = /home/amith/downloads/test.dat
    offset.storage.partitions = null
    offset.storage.replication.factor = null
    offset.storage.topic = 
    plugin.path = null
    response.http.headers.config = 
    rest.advertised.host.name = null
    rest.advertised.listener = null
    rest.advertised.port = null
    rest.extension.classes = []
    ssl.cipher.suites = null
    ssl.client.auth = none
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    task.shutdown.graceful.timeout.ms = 5000
    topic.creation.enable = true
    topic.tracking.allow.reset = true
    topic.tracking.enable = true
    value.converter = class org.apache.kafka.connect.json.JsonConverter

2023-04-26T20:49:03.493+05:30  WARN 116064 --- [  restartedMain] o.a.kafka.connect.runtime.WorkerConfig   : Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.
2023-04-26T20:49:03.515+05:30  WARN 116064 --- [  restartedMain] JpaBaseConfiguration$JpaWebConfiguration : spring.jpa.open-in-view is enabled by default. Therefore, database queries may be performed during view rendering. Explicitly configure spring.jpa.open-in-view to disable this warning
2023-04-26T20:49:03.518+05:30 ERROR 116064 --- [pool-2-thread-1] io.debezium.embedded.EmbeddedEngine      : Connector configuration is not valid. The 'topic.prefix' value is invalid: A value is required
2023-04-26T20:49:03.796+05:30  INFO 116064 --- [  restartedMain] o.s.s.web.DefaultSecurityFilterChain     : Will secure org.springframework.security.oauth2.server.authorization.config.annotation.web.configurers.OAuth2AuthorizationServerConfigurer$$Lambda$1565/0x0000000801924498@423bf83e with [org.springframework.security.web.session.DisableEncodeUrlFilter@2402df0e, org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter@2d4755ed, org.springframework.security.web.context.SecurityContextHolderFilter@32538bbd, org.springframework.security.oauth2.server.authorization.config.annotation.web.configurers.AuthorizationServerContextFilter@5817f9a2, org.springframework.security.web.header.HeaderWriterFilter@19eafaea, org.springframework.security.web.csrf.CsrfFilter@234835fa, org.springframework.security.web.authentication.logout.LogoutFilter@5105fdb6, org.springframework.security.oauth2.server.authorization.web.OAuth2AuthorizationServerMetadataEndpointFilter@70902048, org.springframework.security.oauth2.server.authorization.web.OAuth2AuthorizationEndpointFilter@685357af, org.springframework.security.oauth2.server.authorization.web.NimbusJwkSetEndpointFilter@3c1b5897, org.springframework.security.oauth2.server.authorization.web.OAuth2ClientAuthenticationFilter@32cd4573, org.springframework.security.web.savedrequest.RequestCacheAwareFilter@6f1f9b35, org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter@b9ab673, org.springframework.security.web.authentication.AnonymousAuthenticationFilter@37e1cbca, org.springframework.security.web.access.ExceptionTranslationFilter@51a1d67d, org.springframework.security.web.access.intercept.AuthorizationFilter@58e88f32, org.springframework.security.oauth2.server.authorization.web.OAuth2TokenEndpointFilter@77565f3b, org.springframework.security.oauth2.server.authorization.web.OAuth2TokenIntrospectionEndpointFilter@6406f08f, org.springframework.security.oauth2.server.authorization.web.OAuth2TokenRevocationEndpointFilter@5b35331]
2023-04-26T20:49:04.163+05:30  INFO 116064 --- [  restartedMain] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 3 endpoint(s) beneath base path '/actuator'
2023-04-26T20:49:04.176+05:30  INFO 116064 --- [  restartedMain] o.s.s.web.DefaultSecurityFilterChain     : Will secure any request with [org.springframework.security.web.session.DisableEncodeUrlFilter@1b3adf3b, org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter@ce72c7, org.springframework.security.web.context.SecurityContextHolderFilter@f5bed41, org.springframework.security.web.header.HeaderWriterFilter@58d961f9, org.springframework.security.web.csrf.CsrfFilter@7a90e1f1, org.springframework.security.web.authentication.logout.LogoutFilter@38e0de3a, org.springframework.security.web.authentication.UsernamePasswordAuthenticationFilter@13e101ca, org.springframework.security.web.authentication.ui.DefaultLoginPageGeneratingFilter@52d3f23a, org.springframework.security.web.authentication.ui.DefaultLogoutPageGeneratingFilter@5db94450, org.springframework.security.web.savedrequest.RequestCacheAwareFilter@72b91a32, org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter@41168548, org.springframework.security.web.authentication.AnonymousAuthenticationFilter@462b5b02, org.springframework.security.web.access.ExceptionTranslationFilter@1bf50d20, org.springframework.security.web.access.intercept.AuthorizationFilter@a65916]
2023-04-26T20:49:04.184+05:30  INFO 116064 --- [  restartedMain] o.s.s.web.DefaultSecurityFilterChain     : Will secure any request with [org.springframework.security.web.session.DisableEncodeUrlFilter@2a226d50, org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter@5c6fba6c, org.springframework.security.web.context.SecurityContextHolderFilter@3b079a27, org.springframework.security.web.header.HeaderWriterFilter@135fb9fd, org.springframework.security.web.csrf.CsrfFilter@646c1ed, org.springframework.security.web.authentication.logout.LogoutFilter@39ded7e7, org.springframework.security.web.authentication.UsernamePasswordAuthenticationFilter@650ceff9, org.springframework.security.web.authentication.ui.DefaultLoginPageGeneratingFilter@46e1d0fe, org.springframework.security.web.authentication.ui.DefaultLogoutPageGeneratingFilter@1069097a, org.springframework.security.web.savedrequest.RequestCacheAwareFilter@b7e6d03, org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter@281f1451, org.springframework.security.web.authentication.AnonymousAuthenticationFilter@12185fa9, org.springframework.security.web.access.ExceptionTranslationFilter@13561297, org.springframework.security.web.access.intercept.AuthorizationFilter@219daaf9]
2023-04-26T20:49:04.561+05:30  INFO 116064 --- [  restartedMain] o.s.b.d.a.OptionalLiveReloadServer       : LiveReload server is running on port 35729
2023-04-26T20:49:04.598+05:30  INFO 116064 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8081 (http) with context path ''
2023-04-26T20:49:04.615+05:30  INFO 116064 --- [  restartedMain] c.realtime.project.ProjectApplication    : Started ProjectApplication in 4.414 seconds (process running for 4.735)
2023-04-26T20:49:04.672+05:30  INFO 116064 --- [  restartedMain] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2023-04-26T20:49:54.511+05:30  INFO 116064 --- [nio-8081-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2023-04-26T20:49:54.511+05:30  INFO 116064 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2023-04-26T20:49:54.513+05:30  INFO 116064 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 2 ms
2023-04-26T20:49:54.575+05:30  INFO 116064 --- [nio-8081-exec-1] c.r.project.controller.ClientController  : Entered ClientController ---> registerClient ---> Attempting to register new client
2023-04-26T20:49:54.575+05:30  INFO 116064 --- [nio-8081-exec-1] c.r.p.service.RegisteredClientService    : Entered RegisteredClientService ---> save() --> Attempting to persist client information
2023-04-26T20:49:54.630+05:30  INFO 116064 --- [nio-8081-exec-1] c.r.p.service.RegisteredClientService    : Exited RegisteredClientService ---> save() ---> Successfully persisted data
2023-04-26T20:49:54.689+05:30  INFO 116064 --- [nio-8081-exec-1] c.r.project.controller.ClientController  : Exited ClientController --> registerClient ---> Client Registered Successfully

2

Answers


  1. Chosen as BEST ANSWER

    It was due to the version of debezium packages in my build.gradle file. I downgraded the version to

        implementation 'io.debezium:debezium-embedded:1.4.2.Final'
        implementation 'io.debezium:debezium-api:1.4.2.Final'
        implementation 'io.debezium:debezium-connector-postgres:1.4.2.Final'
    

    and everything is working now. I think postgres:15-alpine image has some missmatch with 2.2.0.Final version of debezium.

    LOGS:

    5:46:09 pm: Executing ':ProjectApplication.main()'...
    
    > Task :compileJava UP-TO-DATE
    > Task :processResources
    > Task :classes
    
    > Task :ProjectApplication.main()
    
      .   ____          _            __ _ _
     /\ / ___'_ __ _ _(_)_ __  __ _    
    ( ( )___ | '_ | '_| | '_ / _` |    
     \/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |___, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::       (v3.1.0-SNAPSHOT)
    
    2023-04-27T17:46:17.155+05:30  INFO 84524 --- [nio-8081-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
    2023-04-27T17:46:17.155+05:30  INFO 84524 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
    2023-04-27T17:46:17.156+05:30  INFO 84524 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
    2023-04-27T17:46:17.212+05:30  INFO 84524 --- [nio-8081-exec-1] c.r.project.controller.ClientController  : Entered ClientController ---> registerClient ---> Attempting to register new client
    2023-04-27T17:46:17.219+05:30  INFO 84524 --- [nio-8081-exec-1] c.r.p.service.RegisteredClientService    : Entered RegisteredClientService ---> save() --> Attempting to persist client information
    2023-04-27T17:46:17.282+05:30  INFO 84524 --- [nio-8081-exec-1] c.r.p.service.RegisteredClientService    : Exited RegisteredClientService ---> save() ---> Successfully persisted data
    Hibernate: select c1_0.id,a1_0.client_id,a1_0.id,a1_0.value,c1_0.client_id,c1_0.client_id_issued_at,c1_0.client_name,c1_0.client_secret,c1_0.client_secret_expires_at,c1_0.redirect_uris,c1_0.scopes from client c1_0 left join grant_type a1_0 on c1_0.id=a1_0.client_id where c1_0.id=?
    Hibernate: select nextval('grant_type_seq')
    Hibernate: select nextval('grant_type_seq')
    Hibernate: select nextval('authentication_method_seq')
    Hibernate: insert into client (client_id,client_id_issued_at,client_name,client_secret,client_secret_expires_at,redirect_uris,scopes,id) values (?,?,?,?,?,?,?,?)
    Hibernate: insert into grant_type (client_id,value,id) values (?,?,?)
    Hibernate: insert into grant_type (client_id,value,id) values (?,?,?)
    Hibernate: insert into grant_type (client_id,value,id) values (?,?,?)
    Hibernate: insert into authentication_method (client_id,value,id) values (?,?,?)
    2023-04-27T17:46:17.367+05:30  INFO 84524 --- [nio-8081-exec-1] c.r.project.controller.ClientController  : Exited ClientController --> registerClient ---> Client Registered Successfully
    2023-04-27T17:46:18.050+05:30  INFO 84524 --- [pool-2-thread-1] c.realtime.project.listener.CDCListener  : Key = 'Struct{id=600f4335-97a5-488a-a0de-f21d5981ea6d_2023-04-27T17:35:30.545563915}' value = 'Struct{after=Struct{id=600f4335-97a5-488a-a0de-f21d5981ea6d_2023-04-27T17:35:30.545563915,client_id=articles-clients,client_id_issued_at=2023-04-21T09:11:09.628975Z,client_name=articles-clients,client_secret=$2a$10$Ikga7RKl3F5Qc9mlVPrzHuhuAzespJvmnc9xomv1IrrTaayXzj8by,client_secret_expires_at=2023-05-01T09:11:09.628977Z,redirect_uris=http://localhost:8081/api,scopes=openid},source=Struct{version=1.4.2.Final,connector=postgresql,name=customer-mysql-db-server,ts_ms=1682597777343,db=auth,schema=public,table=client,txId=817,lsn=28407976},op=c,ts_ms=1682597777791}'
    2023-04-27T17:46:18.051+05:30  INFO 84524 --- [pool-2-thread-1] c.realtime.project.listener.CDCListener  : Updated Data: {client_secret_expires_at=2023-05-01T09:11:09.628977Z, id=600f4335-97a5-488a-a0de-f21d5981ea6d_2023-04-27T17:35:30.545563915, client_id_issued_at=2023-04-21T09:11:09.628975Z, client_secret=$2a$10$Ikga7RKl3F5Qc9mlVPrzHuhuAzespJvmnc9xomv1IrrTaayXzj8by, redirect_uris=http://localhost:8081/api, scopes=openid, client_name=articles-clients, client_id=articles-clients} with Operation: CREATE
    2023-04-27T17:46:18.052+05:30  INFO 84524 --- [pool-2-thread-1] c.realtime.project.listener.CDCListener  : Key = 'Struct{id=1}' value = 'Struct{after=Struct{id=1,value=REFRESH_TOKEN,client_id=600f4335-97a5-488a-a0de-f21d5981ea6d_2023-04-27T17:35:30.545563915},source=Struct{version=1.4.2.Final,connector=postgresql,name=customer-mysql-db-server,ts_ms=1682597777343,db=auth,schema=public,table=grant_type,txId=817,lsn=28408472},op=c,ts_ms=1682597777794}'
    2023-04-27T17:46:18.052+05:30  INFO 84524 --- [pool-2-thread-1] c.realtime.project.listener.CDCListener  : Updated Data: {id=1, value=REFRESH_TOKEN, client_id=600f4335-97a5-488a-a0de-f21d5981ea6d_2023-04-27T17:35:30.545563915} with Operation: CREATE
    2023-04-27T17:46:18.052+05:30  INFO 84524 --- [pool-2-thread-1] c.realtime.project.listener.CDCListener  : Key = 'Struct{id=2}' value = 'Struct{after=Struct{id=2,value=CLIENT_CREDENTIALS,client_id=600f4335-97a5-488a-a0de-f21d5981ea6d_2023-04-27T17:35:30.545563915},source=Struct{version=1.4.2.Final,connector=postgresql,name=customer-mysql-db-server,ts_ms=1682597777343,db=auth,schema=public,table=grant_type,txId=817,lsn=28408832},op=c,ts_ms=1682597777795}'
    2023-04-27T17:46:18.052+05:30  INFO 84524 --- [pool-2-thread-1] c.realtime.project.listener.CDCListener  : Updated Data: {id=2, value=CLIENT_CREDENTIALS, client_id=600f4335-97a5-488a-a0de-f21d5981ea6d_2023-04-27T17:35:30.545563915} with Operation: CREATE
    2023-04-27T17:46:18.052+05:30  INFO 84524 --- [pool-2-thread-1] c.realtime.project.listener.CDCListener  : Key = 'Struct{id=3}' value = 'Struct{after=Struct{id=3,value=AUTHORIZATION_CODE,client_id=600f4335-97a5-488a-a0de-f21d5981ea6d_2023-04-27T17:35:30.545563915},source=Struct{version=1.4.2.Final,connector=postgresql,name=customer-mysql-db-server,ts_ms=1682597777343,db=auth,schema=public,table=grant_type,txId=817,lsn=28409048},op=c,ts_ms=1682597777795}'
    2023-04-27T17:46:18.052+05:30  INFO 84524 --- [pool-2-thread-1] c.realtime.project.listener.CDCListener  : Updated Data: {id=3, value=AUTHORIZATION_CODE, client_id=600f4335-97a5-488a-a0de-f21d5981ea6d_2023-04-27T17:35:30.545563915} with Operation: CREATE
    2023-04-27T17:46:18.052+05:30  INFO 84524 --- [pool-2-thread-1] c.realtime.project.listener.CDCListener  : Key = 'Struct{id=1}' value = 'Struct{after=Struct{id=1,value=CLIENT_SECRET_BASIC,client_id=600f4335-97a5-488a-a0de-f21d5981ea6d_2023-04-27T17:35:30.545563915},source=Struct{version=1.4.2.Final,connector=postgresql,name=customer-mysql-db-server,ts_ms=1682597777343,db=auth,schema=public,table=authentication_method,txId=817,lsn=28409264},op=c,ts_ms=1682597777795}'
    2023-04-27T17:46:18.052+05:30  INFO 84524 --- [pool-2-thread-1] c.realtime.project.listener.CDCListener  : Updated Data: {id=1, value=CLIENT_SECRET_BASIC, client_id=600f4335-97a5-488a-a0de-f21d5981ea6d_2023-04-27T17:35:30.545563915} with Operation: CREATE
    
    

  2. It is due to debezium version>2

    The solution is very simple as mentioned in your console logs:

    The ‘topic.prefix’ value is invalid: A value is required.

    just add topic.prefix with any meaningful name in your configuration and it will work.

    @Bean
    public io.debezium.config.Configuration studentConnector() {
        return io.debezium.config.Configuration.create()
                .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
                .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
                .with("offset.storage.file.filename", "/home/amith/downloads/test.dat")
                .with("offset.flush.interval.ms", 60000)
                .with("name", "auth")
                .with("topic.prefix", "my_prefix_topic")
                .with("database.server.name", "auth")
                .with("database.hostname", "localhost")
                .with("database.port", "5432")
                .with("database.user", "admin")
                .with("database.password", "password")
                .with("database.dbname", "auth")
                .with("table.whitelist", "public.client").build();
    }
    

    see:- https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-topic-prefix

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search