skip to Main Content

`I am trying to write an aggregator query for the complex DB document below, ideally i am looking for creating multiple streams from the given activities set, identifying the parent child hierarchy and recursively processing the input document.

Title: Aggregator Query for Identifying Workflow Streams in Hierarchical Document

Body:

I’m trying to write an aggregation query in MongoDB to process a complex document representing a set of activities with parent-child relationships. The goal is to identify multiple workflows (streams) from this data.

Input Document:

[
  {
    "id": "123",
    "activities": [
      {
        "activityId": "A1",
        "children": [
          {
            "activityId": "A2"
          },
          {
            "activityId": "A7"
          }
        ]
      },
      {
        "activityId": "A2",
        "children": [
          {
            "activityId": "A3"
          }
        ]
      },
      {
        "activityId": "A3",
        "children": [
          {
            "activityId": "A4"
          }
        ]
      },
      {
        "activityId": "A4",
        "children": [
          {
            "activityId": "A5"
          }
        ]
      },
      {
        "activityId": "A4",
        "children": [
          {
            "activityId": "A6"
          }
        ]
      },
      {
        "activityId": "A6",
        "children": []
      },
      {
        "activityId": "A5",
        "children": []
      },
      {
        "activityId": "A7",
        "children": []
      },
      {
        "activityId": "B1",
        "children": [
          {
            "activityId": "B2"
          },
          {
            "activityId": "B3"
          }
        ]
      },
      {
        "activityId": "B2",
        "children": []
      },
      {
        "activityId": "B3",
        "children": []
      },
      {
        "activityId": "C1",
        "children": []
      }
    ]
  },
  {
    "id": "897",
    "activities": [
      {
        "activityId": "X1",
        "children": [
          {
            "activityId": "X2"
          },
          {
            "activityId": "X7"
          }
        ]
      },
      {
        "activityId": "X2",
        "children": [
          {
            "activityId": "X3"
          }
        ]
      },
      {
        "activityId": "X3",
        "children": [
          {
            "activityId": "X4"
          }
        ]
      },
      {
        "activityId": "X4",
        "children": [
          {
            "activityId": "X5"
          }
        ]
      },
      {
        "activityId": "X4",
        "children": [
          {
            "activityId": "X6"
          }
        ]
      },
      {
        "activityId": "X6",
        "children": []
      },
      {
        "activityId": "X5",
        "children": []
      },
      {
        "activityId": "X7",
        "children": []
      },
      {
        "activityId": "Y1",
        "children": [
          {
            "activityId": "Y2"
          },
          {
            "activityId": "Y3"
          }
        ]
      },
      {
        "activityId": "Y2",
        "children": []
      },
      {
        "activityId": "Y3",
        "children": []
      },
      {
        "activityId": "Z1",
        "children": []
      }
    ]
  }
]

** Aggregator query: **

db.collection.aggregate([
  {
    "$match": {
      "activities.children": []
    }
  },
  {
    "$graphLookup": {
      "from": "collection",
      "startWith": "$activities.activityId",
      "connectFromField": "activities.activityId",
      "connectToField": "activities.children.activityId",
      "as": "workflowStreams"
    }
  },
  {
    "$set": {
      "workflowStreams": {
        "$setUnion": [
          [
            "$activities.activityId"
          ],
          "$workflowStreams.activities.activityId"
        ]
      }
    }
  },
  {
    "$group": {
      "_id": "$id",
      "workflowStreams": {
        "$push": "$workflowStreams"
      }
    }
  }
])

** Expected Response **

[
  {
    "id": "123",
    "workflowStreams": [
      [
        "A1",
        "A2",
        "A3",
        "A4",
        "A6"
      ],
      [
        "A1",
        "A2",
        "A3",
        "A4",
        "A5"
      ],
      [
        "A1",
        "A7"
      ],
      [
        "B1",
        "B2"
      ],
      [
        "B1",
        "B3"
      ],
      [
        "C1"
      ]
    ]
  },
  {
    "id": "897",
    "workflowStreams": [
      [
        "X1",
        "X2",
        "X3",
        "X4",
        "X6"
      ],
      [
        "X1",
        "X2",
        "X3",
        "X4",
        "X5"
      ],
      [
        "X1",
        "X7"
      ],
      [
        "Y1",
        "Y2"
      ],
      [
        "Y1",
        "Y3"
      ],
      [
        "Z1"
      ]
    ]
  }
]

2

Answers


  1. The most seamless way to reuse the solution from your previous question I can think of is to create a database view to "flatten" the activities entries into individual documents. You can then perform the same query as before.

    db.runCommand( { create: "activityView", viewOn: "streams", pipeline: [
        {
            $match: {
                // put your criteria here
                id: "123"
            }
        },
        {
            $unwind: "$activities"
        },
        {
            $replaceRoot: {
                newRoot: "$activities"
              }
        }
    ]} )
    

    Then, you can reuse the same solution on the view.

    db.activityView.aggregate([
      {
        "$match": {
          "children": []
        }
      },
      {
        "$graphLookup": {
          "from": "collection",
          "startWith": "$activityId",
          "connectFromField": "activityId",
          "connectToField": "children.activityId",
          "as": "workflowStreams"
        }
      },
      {
        "$set": {
          "workflowStreams": {
            "$setUnion": [
              [
                "$activityId"
              ],
              "$workflowStreams.activityId"
            ]
          }
        }
      },
      {
        "$group": {
          "_id": "",
          "workflowStreams": {
            "$push": "$workflowStreams"
          }
        }
      }
    ])
    

    I would like to reemphasize the importance of refactoring your schema as in the comments. Currently, you are prone to a potential 16MB single document size limit breach, various performance issues and query complexity.

    Login or Signup to reply.
  2. This query should produce a response similar to the one expected:

    db.collection.aggregate([ 
      { 
        $unwind: "$activities" 
      }, 
      { 
        $graphLookup: {
          from: "collection",
          startWith: "$activities.activityId",
          connectFromField: "activities.activityId",
          connectToField: "activities.children.activityId",
          as: "workflowStream",
          restrictSearchWithMatch: { "id": "$id" }  
        } 
      }, 
      { 
        $set: {
          workflowStream: {
            $setUnion: [ ["$activities.activityId"], "$workflowStream.activities.activityId" ]
          }
        }
      }, 
      { 
        $group: {
          _id: "$id",
          workflowStreams: {
            $addToSet: "$workflowStream"
          }
        }
      }
    ])
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search