skip to Main Content

I have a few records in elastic search I want to group the record by user_id and fetch the latest record which is event_type is 1

If the latest record event_type value is not 1 then we should not fetch that record. I did it in MySQL query. Please let me know how can I do that same in elastic search.

enter image description here

After executing the MySQL query

SELECT * FROM user_events
     WHERE id IN( SELECT max(id) FROM `user_events` group by user_id ) AND event_type=1;

enter image description here

I need the same output in elasticsearch aggregations.

Elasticsearch Query:

GET test_analytic_report/_search
{
  "from": 0,
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "event_date": {
              "gte": "2022-10-01",
              "lte": "2023-02-06"
            }
          }
        }
      ]
    }
  },
  "sort": {
    "event_date": {
      "order": "desc"
    }
  },
  "aggs": {
    "group": {
      "terms": {
        "field": "user_id"
      },
      "aggs": {
        "group_docs": {
          "top_hits": {
            "size": 1,
            "_source": ["user_id", "event_date", "event_type"],
            "sort": {
              "user_id": "desc"
            }
          }
        }
      }
    }
  }
}

I have the above query I have two users whose user_id is 55 and 56. So, in my aggregations, it should not come. But It fetched the other event_type data but I want only event_types=1 with the latest one. if the user’s last record does not have event_type=1, it should not come.

In the above table, user_id 56 latest record event_type contains 2 so it should not come in our aggregations.

I tried but it’s not returning the exact result that I want.

Note: event_date is the current date and time. As per the above image, I have inserted it manually that’s why the date differs

4

Answers


  1. GET user_events/_search
    {
      "size": 1,
      "query": {
        "term": {
          "event_type": 1
          }
        },
      "sort": [
        {
          "id": {
            "order": "desc"
          }
        }
      ]
    }
    

    Explanation: This is an Elasticsearch API request in JSON format. It retrieves the latest event of type 1 (specified by "event_type": 1 in the query) from the "user_events" index, with a size of 1 (specified by "size": 1) and sorts the results in descending order by the "id" field (specified by "order": "desc" in the sort).

    Login or Signup to reply.
  2. If your ES version supports, you can do it with field collapse feature. Here is an example query:

    {
      "_source": false,
      "query": {
        "bool": {
          "filter": {
            "term": {
              "event_type": 1
            }
          }
        }
      },
      "collapse": {
        "field": "user_id",
        "inner_hits": {
          "name": "the_record",
          "size": 1,
          "sort": [
            {
              "id": "desc"
            }
          ]
        }
      },
      "sort": [
        {
          "id": {
            "order": "desc"
          }
        }
      ]
    }
    

    In the response, you will see that the document you want is in inner_hits under the name you give. In my example it is the_record. You can change the size of the inner hits if you want more records in each group and sort them.

    Login or Signup to reply.
  3. Tldr;

    They are many ways to go about it:

    All those solution are approximate of what you could get with sql.

    But my personal favourite is transform

    Solution – transform jobs

    Set up

    We create 2 users, with 2 events.

    PUT 75324839/_bulk
    {"create":{}}
    {"user_id": 1, "type": 2, "date": "2015-01-01T00:00:00.000Z"}
    {"create":{}}
    {"user_id": 1, "type": 1, "date": "2016-01-01T00:00:00.000Z"}
    {"create":{}}
    {"user_id": 2, "type": 1, "date": "2015-01-01T00:00:00.000Z"}
    {"create":{}}
    {"user_id": 2, "type": 2, "date": "2016-01-01T00:00:00.000Z"}
    

    Transform job

    This transform job is going to run against the index 75324839.
    It will find the latest document, with regard to the user_id, based of the value in date field.

    And the results are going to be stored in latest_75324839.

    PUT _transform/75324839
    {
      "source": {
        "index": [
          "75324839"
        ]
      },
      "latest": {
        "unique_key": [
          "user_id"
        ],
        "sort": "date"
      },
      "dest": {
        "index": "latest_75324839"
      }
    }
    

    If you were to query latest_75324839
    You would find:

    {
        "hits": [
          {
            "_index": "latest_75324839",
            "_id": "AGvuZWuqqz7c5ytICzX5Z74AAAAAAAAA",
            "_score": 1,
            "_source": {
              "date": "2017-01-01T00:00:00.000Z",
              "user_id": 1,
              "type": 1
            }
          },
          {
            "_index": "latest_75324839",
            "_id": "AA3tqz9zEwuio1D73_EArycAAAAAAAAA",
            "_score": 1,
            "_source": {
              "date": "2016-01-01T00:00:00.000Z",
              "user_id": 2,
              "type": 2
            }
          }
        ]
      }
    }
    

    Get the final results

    To get the amount of user with type=1.
    A simple search query such as:

    GET latest_75324839/_search
    {
      "query": {
        "term": {
          "type": {
            "value": 1
          }
        }
      },
      "aggs": {
        "number_of_user": {
          "cardinality": {
            "field": "user_id"
          }
        }
      }
    }
    

    Side notes

    This transform job has been running in batch, this means it will only run once.

    It is possible to run it in a continuous fashion, to get all the time the latest event for a user_id.

    Here are some examples.

    Login or Signup to reply.
  4. Your are looking for an SQL HAVING clause, which would allow you to filter results after grouping. But sadly there is nothing equivalent on Elastic.

    So it is not possible to

    So basically seen, Elastic is not a database. Any sorting or relation to other documents should be based on scoring. And the score should be calculated independently for each document, distributed on shards.

    But there is a tiny loophole, which might be the solution for your use case. It is based on a top_metrics aggregation followed by bucket selector to eliminate the unwanted event types:

    GET test_analytic_report/_search

    {
      "size": 0,
      "aggs": {
        "by_id": {
          "terms": {
            "field": "user_id",
            "size": 100
          },
          "aggs": {
            "tm": {
              "top_metrics": {
                "metrics": {
                  "field": "event_type"
                },
                "sort": [
                  {
                    "id": {
                      "order": "desc"
                    }
                  }
                ]
              }
            },
            "event_type_filter": {
              "bucket_selector": {
                "buckets_path": {
                  "event_type": "tm.event_type"
                },
                "script": "params.event_type == 1"
              }
            }
          }
        }
      }
    }
    

    If you require more fields from the source document you can add them to the top_metrics.
    It is sorted by id now, but you can also use event_date.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search