skip to Main Content

I have the following sample data incoming in a CSV file:

Identifer Key,Name,Address,City,State,ZIP  
WELD-424,Jane Doe,123 Main St,Whereverville,CA,90210  
MOWN-175,John Doe,555 Broadway Ave,New York,NY,10010

The processor flow I came up with so far is:

  1. GetFile
  2. UpdateAttribute to set the avro.schema property with the schema text
  3. PutMongoRecord uses CSVReader to load the records into the database

What would the Avro schema look like for this? Here is my best guess (based on the two fields I care about):

{
   "type" : "record",
   "namespace" : "TheNameSpace",
   "name" : "MySchema",
   "fields" : [
     { "name" : "Identifier Key" , "type" : ["string"]}
     { "name" : "Name" , "type" : ["string", "null"]}
   ]
}

Specifying "Identifier Key" above gives an error because it contains a space. Other fields like "Name" load fine however.

Some challenges I am facing:

  • How do you rename fields? Does that need to be done in another processor block outside of the ConvertRecord processor and schema ecosystem? This seems like a common scenario because you will want fields to have the same name coming from many different sources.
  • Avro does not like field names with spaces in them ( so going from "Identifier Key" -> "_id" is going to be problematic).
  • There does not seem to be a way to rename a field during a read and write operation. I thought the aliases capability would help (for example: going from "Name" -> "fullName")
  • Make a single field (i.e. Identifier Key) all lower case before importing to MongoDB?

I also tried using the ConvertRecord processor block to convert from CSV to JSON first so that it can be imported into MongoDB as JSON. It would need to look something like this (with the identifier key field all lowercase) but the field comes out null for the Identifier Key after ConvertRecord runs:

{"_id": "weld-424", "fullName": "Jane Doe", "updated": {"$date":"2018-11-01T04:00:00.000Z"}, "created": {"$date":"2018-11-01T04:00:00.000Z"}}
{"_id": "mown-175", "fullName": "John Doe", "updated": {"$date":"2018-11-01T04:00:00.000Z"}, "created": {"$date":"2018-11-01T04:00:00.000Z"}}

2

Answers


  1. About what avro I encourage you to read the avro specs is not a big document and will explain how to use Avro.
    About your questions:

    1. How do you rename fields? you can use a Jolt transform processor.
    2. Avro does not like field names with spaces in them:
      you have at least 3 options:

      • Ignore the header fields, so the schema field names will be used, note that you need to remove the header line and fields must come in the same order.
      • Use Jolt.
      • Change the names before use the schema, you can create your own processors!

    Your other questions have the same answers, as resume, if I were you I’ll create the Schema field names without spaces, then change the to this:

    1. Remove empty spaces from the header, like here, this way field names will complain Avro spec (if it doesn’t start with numbers).
    2. Use Jolt to transform the field names to the database names.
    3. Put your data in the database.

    Hope it helps.

    Login or Signup to reply.
  2. To Disable Name validation Avro Schema we need to define avro schema registry, Jira NiFI-4612 addressing this issue.

    • Validate Field Names

      false

    Once you define avro schema in AvroSchemaRegistry then we are able to use spaces in avro schema.


    To change field name use QueryRecord processor with Record Reader/Writer(with new alias name) controller services.

    Add new property in QueryRecord processor as

    select "Identifer Key" _id,Name,Address from FLOWFILE
    

    The output of QueryRecord processor will have _id,Name,Address as new field names.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search