skip to Main Content

I’m currently working on a project where I need the data factory pipeline to copy based off the last run date.

The process breakdown….

  1. Data is ingested into a storage account
  2. The data ingested is in the directory format topic/yyyy/mm/dd i.e., multiple files being brought in a single directory hence it’s files are partitioned by date which looks like this day format and month and year etc
  3. The process currently filters based on the last high water mark date which updates each time the pipeline is run and triggers daily at 4am, once the copy is successful, a set variable increases the high-water mark value by 1 (I.e., one day), though files are not brought over on the weekends (this is the problem)
  4. The date value (HWM) will not increase if no files are brought over and will continue to loop through the same date.
  5. How to I get the pipeline to increase or look for the next file in that directory given that I use the HWV as the directory to the file, copy and update the HWM value only when completed dynamically. Current update logic
  6. current lookup of HWV lookup and directory path to copy files

2

Answers


  1. Instead of adding 1 to last high water mark value, we can try to update current UTC as watermark value. So that, even when pipeline is not triggered data will be copied to the correct destination folder. I have tried to repro in my environment and below is the approach.

    • Watermark table is taken initially with watermark value as ‘1970-01-01’.

    enter image description here

    • This table is referred in the Lookup Activity.

    enter image description here

    • Copy data activity is added and in source, query is given as
      select * from tab1 where lastmodified > '@{activity('Lookup1').output.firstRow.watermark_value}'

    enter image description here

    • In Sink, Blob storage is taken. In order to have folder structure as year/month/day,
      @concat(formatDateTime(utcnow(),'yyyy'),'/', formatDateTime(utcnow(),'mm'),'/',formatDateTime(utcnow(),'dd'))
      is given in folder path.

    enter image description here

    • File is copied as in below path.

    enter image description here

    • Once file is copied, Watermark value is updated with the current UTC time.
    update watermark_table
    set
    watermark_value='@{formatDateTime(utcnow(),'yyyy-MM-dd')}'
    where tab_name='tab1'
    

    enter image description here

    enter image description here

    • When pipeline is triggered next day, data will be copied from the watermark value and once file is copied, value of current UTC is updated as watermark value.
    Login or Signup to reply.
  2. I think reading the post a couple of time , what I understood is

    • You already have a water mark logic .
    • On the weekend when there are NO files in the folder , the current logic does NOT increment the watermark and so you are facing issues .

    If I understand the ask correctly . please use the @dayOfWeek() function . Add a If statement and let the current logic only execute when the day of the week is Monday(2)-Friday(6) .

    https://learn.microsoft.com/en-us/azure/data-factory/data-flow-expressions-usage#dayofweek

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search