skip to Main Content

I’m quite new to Apache Beam and I’m trying to create a new column at index 0 that uses groupbykey on multiple columns and construct a new unique id. How am I able to achieve this?

Also I want to write the new data to a newline delimited JSON format file (where each line is one unique_id with an array of objects that belong to that unique_id.

I’ve currently written:

import apache_beam as beam

pipe = beam.Pipeline()

id = (pipe
            |beam.io.ReadFromText('data.csv')
            |beam.Map(lambda x:x.split(","))
            |beam.Map(print))

Which basically converts each row into a list of strings.

This post has the sample data input and the solutions use pandas to do so but how do I achieve the same in the pipeline using Beam?

Thank you!

2

Answers


  1. Have you tried CombinePerKey like this?

    import apache_beam as beam
    
    p = beam.Pipeline()
    
    test = (
        p
        | beam.Create([(0, "ttt"), (0, "ttt1"), (0, "ttt2"), (1, "xxx"), (1, "xxx2"), (2, "yyy")])
        | beam.CombinePerKey(lambda v: ",".join(v))
        | beam.Map(print)
    )
    
    Login or Signup to reply.
  2. Is it important to you to have the unique IDs be integers from 0 to n_groups like in your linked example?

    If not, then I don’t think there’s any need to use a grouping operation here. Consider the following:

    import apache_beam as beam
    
    def make_unique_id(row):
      """
      Example function for extracting a unique ID from the row.
    
      You could wrap the value in uuid.UUID to make a more standard format for the ID.
      """
      return ",".join([row[0], row[1]])
    
    pipe = beam.Pipeline()
    
    id = (pipe
                | beam.io.ReadFromText('data.csv')
                | beam.Map(lambda x: x.split(","))
                | beam.Map(lambda x: [make_unique_id(x)] + x)
                | beam.Map(print))
    
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search