skip to Main Content

I have a bunch of event data which is identified by uid and timestamp. I have a number of aggregation pipelines which will calculate statistics for each user. These work if I run them one at a time, but I can’t figure out how to run them within a $group.

So this code works:

db.Events.aggregate([
    {"$match":{"uid":"S001"}},
    {"$facet":{
         "var1":[pipeline1...], 
         "var2":[pipeline2...]
    }}
];

However, I want to loop over the users, so I want something more like:

db.Events.aggregate([
    { "$group": {
    "_id": {"uid":"$uid"},
    "timestamp":{"$last":"$timestamp"},
    "data": { "$facet":{
        "var1":[pipeline1...],
        "var2":[pipeline2...]
        }}
    }}
])

But I get a error unknown group operator '$facet'.

What I’m looking for is output that looks like:

{
    uid:"S001",
    timestamp:"time 1",
    data: { ...}
},
{
    uid:"S002",
    timestamp:"time 2",
    data: { ...}
}, 
...

How can I run the $facet inside of the $group.


Update: Here is a more complete example.

First here is some input:

{
    "uid": 1,
    "timestamp": 1000000000,
    "verb": "initialized",
    "object": "game level",
    "data":{},
},
{
    "uid": 1,
    "timestamp": 1000000015,
    "verb": "initialized",
    "object": "game level",
    "data":{},
},
{
    "uid": 1,
    "timestamp": 1000000031,
    "verb": "passed",
    "object": "game level",
    "data":{},
},
{
    "uid": 2,
    "timestamp": 1000000000,
    "verb": "initialized",
    "object": "game level",
    "data":{},
},
{
    "uid": 2,
    "timestamp": 1000000024,
    "verb": "passed",
    "object": "game level",
    "data":{},
}

Here is my target pipeline:

db.collection.aggregate([
    { "$group": {
        "_id": {"uid":"$uid"},
        "uid": {"$first":"$uid"},
        "timestamp":{"$last":"$timestamp"},
        "data": { "$facet":{
            "NumberAttempts": [attempt_filter, attempt_map, attempt_sum],
             "time": [leveltime_filter, leveltime_map, leveltime_reduce]
     }}
]);

where the filter stages are calls to $match, the map stages are calls to $project and the reduce stages are calls to $group to accumulate over the matched events. The complete definitions for the stages in the pipeline can be found in:
https://mongoplayground.net/p/Cae0DkO8o0g

(Don’t get too hung up on the exact definitions, my real example has more different pipelines).

My desired output would be something like:

{
    "data": {
        "NumberAttempts": 2,
        "start_time": 1e+09,
        "total_time": 31
    },
    "timestamp": 1.000000031e+09,
    "uid": 1
},
{
    "data": {
        "NumberAttempts": 1,
        "start_time": 1e+09,
        "total_time": 24
    },
    "timestamp": 1.000000024e+09,
    "uid": 2
}   

Where I have one record for each uid which contains summary statistics. I can do this for one user at a time if I first match on the uid and then run the facet operation. [This is not quite working in the mongoplayground link. The problem is I need a custom aggregator for total_time and I can’t quite figure how to get it to work in the playground.] What I want to do is iterate this code over users, which seems to me would be the $group or $bucket operator.

I am not looking for a clever way to rewrite these particular pipelines using group accumulator functions. I would like my SMEs to be able to specify which events they are interested in, what fields in those events are relevant and select an accumulator, thus building a custom map-reduce chain.

2

Answers


  1. Chosen as BEST ANSWER

    It seems like the most straightforward way is to use a loop in the language I'm using to call Mongo.

    If I use javascript in the Mongo shell, I would get:

    db.collection.distinct("uid").foreach(
      function (u) {
         db.collection.aggregate([
            {"$match": {"uid":u}},
            {"$facet": ...},
            {"$project":...},
            {"$out",...}
         ]);
    });
    

    In my case, it might work a bit better, as data from different users will be complete or incomplete at any given time, so I can run the pipeline for the user when that user's data is complete.


  2. Not sure how much data we are talking about but if it is not millions of rows, you can always make an array and then easily iterate on it:

    db.foo.aggregate([
        { "$group": {
            "_id": "$uid",
            "uid": {"$first":"$uid"},
            "timestamp":{"$last":"$timestamp"},
            "X": {$push: '$$CURRENT'}
        }},
        {$project: {
            "uid": true,
            "timestamp": true,
            "data": {
                "NumberAttempts": {$reduce: {
                         input: "$X",
                         initialValue: 0,
                        in:{$cond: {
                            // Conditions to satisfy NumberAttempts:                  
                            if: {$and: [
                                {$eq:['$$this.verb','initialized']}
                                ,{$eq:['$$this.object','game level']}
                            ]},
    
                            then: {$add:['$$value', 1]} ,
                            else: '$$value'  // else return $$value unchanged         
                        }}
                }},
                'start_time': "$timestamp", // redundant...?                          
                'total_time': {$reduce: {
                         input: "$X",
                         initialValue: 0,
                        in:{$cond: {
    
                            // Conditions to satisfy total_time:                      
                            if: {$and: [
                                {$in:["$$this.verb", ["initialized","resumed",
                                              "suspended","exited","passed"]]},
                                ,{$eq:['$$this.object', 'game level']}
                            ]},
    
                            then: {$add:['$$value', 22]} , // proxy for safe_sum_acc  
                            else: '$$value'
                        }}
                }}
            }
        }}
    
    ]);
    

    I just hacked up the functions to calc NumberAttempts and total_time.

    It is also possible to do this with one reduce loop at the expense of simplicity of working with scalars vs. objects in reduce:

    db.foo.aggregate([
        { "$group": {
            "_id": "$uid",
            "uid": {"$first":"$uid"},
            "timestamp":{"$last":"$timestamp"},
            "X": {$push: '$$CURRENT'}
        }},
        {$project: {
            "uid": true,
            "timestamp": true,
            "data": {$reduce: {
                input: "$X",
                initialValue: {NumberAttempts:0, start_time:'$timestamp',total_time:0},
                in:
    
                //  Remember, in MQL $reduce we cannot not say:                       
                //                                                                    
                //    $$value.fld3 = $$value.fld3 + 7                                 
                //                                                                    
                //  Instead we have functional programming and we overlay/merge       
                //  new objects on top of older ones:                                 
                //                                                                    
                //    $mergeObjects:['$$value', {fld3: {$add['$$value.fld3',7]} } ]   
                //                                                                    
                {$mergeObjects: [ '$$value' , 
    
                  // Conditions to satisfy NumberAttempts:
                  {$cond: {
                    if: {$and: [
                        {$eq:['$$this.verb','initialized']}
                        ,{$eq:['$$this.object','game level']}
                    ]},
                      then: {'NumberAttempts': {$add:['$$value.NumberAttempts', 1]}} ,
                      else: {}  // blank merge (noop)
                  }}
    
                  ,
                  // Conditions to satisfy total_time:              
                  {$cond: {
                    if: {$and: [
                        {$in:["$$this.verb", ["initialized","resumed",
                                      "suspended","exited","passed"]]},
                        {$eq:['$$this.object', 'game level']}
                    ]},
                      then: {'total_time': {$add:['$$value.total_time', 17]}} , // safe_sum_acc proxy
                      else: {}  // blank merge (noop)
                  }}
    
                ]}   // end of parent mergeObjects
            }}
        }}
    ]);
    

    NOTE:

    If each uid only has a few entries, you might want to consider eliminating the grouping on the server side and let the client handle it. It is the same number of records examined on the server with significantly less compute pressure, at the expense of flowing X times more data over the wire.

    UPDATED

    Below is a semi-solution using $facet. It lacks some of the specific logic around total_time but I didn’t quite understand the safe_sum_acc and timecode bits. The key thing is to merge the
    two facet outputs then regroup on the _id:

    db.foo.aggregate([
        { '$facet': {
            'A': [
                { '$match': {'verb':'initialized', 'object':'game level'}},
                { "$group": {
                    "_id": "$uid",
                    "uid": {"$first":"$uid"},
                    "timestamp":{"$last":"$timestamp"} ,
                    "N": {$sum:1}
                }}
            ]
            ,'B': [
                { '$match': {'verb':'initialized', 'object':'game level'}},
                { "$group": {
                    "_id": "$uid",
                    "uid": {"$first":"$uid"},
                    "timestamp":{"$last":"$timestamp"} ,
                    "total_time": {$sum:17} // proxy for safe                         
                }}
            ]
        }}
    
        // Mush A and B together.  It is not guaranteed that the uid                  
        // are in order OR if A and B have the exact same set so we will              
        // use $group to collect same _id.                                            
        // We can use $first for the common attibutes _id and uid but we              
        // don't know the order of N and total_time -- but we know there is           
        // ONLY ONE; thus, we can safely use $sum to pick up the one value.           
        ,{$project: {X: {$concatArrays: ['$A','$B']}}}
        ,{$unwind: '$X'}
        ,{ "$group": {_id: '$X._id', uid:{$first:'$X.uid'}, 'N':{$sum: '$X.N'}, T:{$s
    um: '$X.total_time'} }}
    
        // Reformat; not particularly interesting.                                    
        ,{ "$project": {
            _id: true,
            uid: true,
            data: {
                NumberAttempts: '$N',
                total_time: '$T'
            }
        }}
    
    ]);
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search