skip to Main Content

I use R to process log files of zigbee2mqtt which logs entries like this

head(data)
[1] "info  2024-03-11 14:08:01: MQTT publish: topic 'zigbee2mqtt/TempBadEG', payload '{"battery":17,"humidity":50.1,"linkquality":105,"temperature":17.25,"voltage":2500}'"                                                                                                                                                                                                                                        
[2] "info  2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/MotionBadOben', payload '{"battery":20,"battery_low":true,"illuminance":37914,"illuminance_lux":12369,"led_control":"off","linkquality":69,"occupancy":false,"occupancy_timeout":60,"tamper":false,"temperature":17,"update":{"installed_version":262147,"latest_version":262146,"state":"idle"},"voltage":2600}'"        
[3] "info  2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/SwitchBadOben', payload '{"linkquality":21,"state":"OFF"}'"                                                                                                                                                                                                                                                                                           
[4] "info  2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/MotionBadEG', payload '{"battery":60,"battery_low":false,"illuminance":31370,"illuminance_lux":1371,"led_control":"fault_only","linkquality":21,"occupancy":false,"occupancy_timeout":60,"tamper":false,"temperature":17.06,"update":{"installed_version":262147,"latest_version":262146,"state":"idle"},"voltage":2800}'"
[5] "info  2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/SwitchBadEG', payload '{"linkquality":21,"state":"OFF"}'"                                                                                                                                                                                                                                                                                             
[6] "info  2024-03-11 14:08:44: MQTT publish: topic 'zigbee2mqtt/TempKucheUG', payload '{"battery":14,"humidity":54.87,"linkquality":21,"temperature":14.41,"voltage":2500}'"

I have

data<-c("info  2024-03-11 14:08:01: MQTT publish: topic 'zigbee2mqtt/TempBadEG', payload '{"battery":17,"humidity":50.1,"linkquality":105,"temperature":17.25,"voltage":2500}'", 
"info  2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/MotionBadOben', payload '{"battery":20,"battery_low":true,"illuminance":37914,"illuminance_lux":12369,"led_control":"off","linkquality":69,"occupancy":false,"occupancy_timeout":60,"tamper":false,"temperature":17,"update":{"installed_version":262147,"latest_version":262146,"state":"idle"},"voltage":2600}'", 
"info  2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/SwitchBadOben', payload '{"linkquality":21,"state":"OFF"}'", 
"info  2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/MotionBadEG', payload '{"battery":60,"battery_low":false,"illuminance":31370,"illuminance_lux":1371,"led_control":"fault_only","linkquality":21,"occupancy":false,"occupancy_timeout":60,"tamper":false,"temperature":17.06,"update":{"installed_version":262147,"latest_version":262146,"state":"idle"},"voltage":2800}'", 
"info  2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/SwitchBadEG', payload '{"linkquality":21,"state":"OFF"}'", 
"info  2024-03-11 14:08:44: MQTT publish: topic 'zigbee2mqtt/TempKucheUG', payload '{"battery":14,"humidity":54.87,"linkquality":21,"temperature":14.41,"voltage":2500}'"

and I do the following processing :

library(tidyverse)
library(jsonlite)
library(RColorBrewer)
library(ggthemes)
library(furrr)

plan(multisession, workers = availableCores() - 1)
log_data.start.timestamp <- Sys.time()

log_data <- data %>%
  enframe(name = NULL, value = "line") %>%
  mutate(timestamp = str_extract(line, "\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}"),
         topic = str_extract(line, "(?<=topic ')[^']+(?=')"),
         payload = str_extract(line, "(?<=payload ')[^']+(?=')")) %>%
  mutate(topic = str_remove(topic, "zigbee2mqtt/")) %>%
  distinct() %>% 
  mutate(payload = future_map(payload, ~{
    tryCatch({
      json <- fromJSON(.x, flatten = TRUE)
      # Remove unwanted fields
      json$data <- NULL
      json$update <- NULL
      json$type <- NULL
      json$tamper <- NULL
      json$power_on_behavior <- NULL
      json$battery_low <- NULL
      json$occupancy_timeout <- NULL
      as_tibble(json)
    }, error = function(e) {
      tibble(value = .x)
    })
  })) %>%
  select(-line) %>%
  mutate(timestamp = as.POSIXct(timestamp, format = "%Y-%m-%d %H:%M:%S")) %>%
  unnest(payload) %>%
  mutate(date = as.Date(timestamp), time = format(timestamp, "%H:%M:%S")) %>%
  distinct() %>%
  arrange(timestamp)

Sys.time()-log_data.start.timestamp

and I get

head(log_data)
# A tibble: 6 × 15
  timestamp           topic         battery humidity linkquality temperature voltage illuminance illuminance_lux led_control occupancy state action date       time    
  <dttm>              <chr>           <int>    <dbl>       <int>       <dbl>   <int>       <int>           <int> <chr>       <lgl>     <chr> <chr>  <date>     <chr>   
1 2024-03-11 14:08:01 TempBadEG          17     50.1         105        17.2    2500          NA              NA NA          NA        NA    NA     2024-03-11 14:08:01
2 2024-03-11 14:08:04 MotionBadOben      20     NA            69        17      2600       37914           12369 off         FALSE     NA    NA     2024-03-11 14:08:04
3 2024-03-11 14:08:04 SwitchBadOben      NA     NA            21        NA        NA          NA              NA NA          NA        OFF   NA     2024-03-11 14:08:04
4 2024-03-11 14:08:22 MotionBadEG        60     NA            21        17.1    2800       31370            1371 fault_only  FALSE     NA    NA     2024-03-11 14:08:22
5 2024-03-11 14:08:22 SwitchBadEG        NA     NA            21        NA        NA          NA              NA NA          NA        OFF   NA     2024-03-11 14:08:22
6 2024-03-11 14:08:44 TempKucheUG        14     54.9          21        14.4    2500          NA              NA NA          NA        NA    NA     2024-03-11 14:08:44

from there on I can do my analyses. However, for 5 days of logging (5 MB input) despite using furrr i need 20 sec for this step. I suspect that payload processing and json flattening leads to this increased processing times.

I can not imagine how this would scale up if I get a years data. Since the different sensors report different payloads, I need a generic approach that will unflatten the json payload without prespecification of the variables. This also allows to add or change sensors that may report different variables.

Is there a way to make this more efficient and faster?

2

Answers


  1. I didn’t benchmark it, but I’d guess all the parallel processing overhead and moving every single (relatively small) JSON to and from a worker costs more than you’d gain from it. If you need to parallelize, perhaps consider somewhat larger chunks, i.e. all daily or at least all hourly records.

    Following is just a test of how jsonlite::stream_in() (with a defualt handler) might perform when combining all json strings to a single nd-json string first.

    Generated 25000 lines (~5.3MB on disk as text) for testing, took less than 6s to process.

    library(dplyr)
    library(tidyr)
    
    data_raw <-
      c("info  2024-03-11 14:08:01: MQTT publish: topic 'zigbee2mqtt/TempBadEG', payload '{"battery":17,"humidity":50.1,"linkquality":105,"temperature":17.25,"voltage":2500}'", 
        "info  2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/MotionBadOben', payload '{"battery":20,"battery_low":true,"illuminance":37914,"illuminance_lux":12369,"led_control":"off","linkquality":69,"occupancy":false,"occupancy_timeout":60,"tamper":false,"temperature":17,"update":{"installed_version":262147,"latest_version":262146,"state":"idle"},"voltage":2600}'", 
        "info  2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/SwitchBadOben', payload '{"linkquality":21,"state":"OFF"}'", 
        "info  2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/MotionBadEG', payload '{"battery":60,"battery_low":false,"illuminance":31370,"illuminance_lux":1371,"led_control":"fault_only","linkquality":21,"occupancy":false,"occupancy_timeout":60,"tamper":false,"temperature":17.06,"update":{"installed_version":262147,"latest_version":262146,"state":"idle"},"voltage":2800}'", 
        "info  2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/SwitchBadEG', payload '{"linkquality":21,"state":"OFF"}'", 
        "info  2024-03-11 14:08:44: MQTT publish: topic 'zigbee2mqtt/TempKucheUG', payload '{"battery":14,"humidity":54.87,"linkquality":21,"temperature":14.41,"voltage":2500}'") |>
      rep(length.out = 25000)
      
    glimpse(data_raw)
    #>  chr [1:25000] "info  2024-03-11 14:08:01: MQTT publish: topic 'zigbee2mqtt/TempBadEG', payload '{"battery":17,"humidity":5"| __truncated__ ...
    
    tictoc::tic()
    data <-
      data_raw |>
      tibble(line = _) |>
      separate_wider_regex(line, patterns = c("^info\s+", 
                                              ts = ".*", ": MQTT publish: topic '", 
                                              topic = "[^']+", "',[^']+'",  
                                              payload = "[^']+", "'$")) |>
      mutate(ts = lubridate::ymd_hms(ts))
    
    data
    #> # A tibble: 25,000 × 3
    #>    ts                  topic                     payload                        
    #>    <dttm>              <chr>                     <chr>                          
    #>  1 2024-03-11 14:08:01 zigbee2mqtt/TempBadEG     "{"battery":17,"humidity":…
    #>  2 2024-03-11 14:08:04 zigbee2mqtt/MotionBadOben "{"battery":20,"battery_low…
    #>  3 2024-03-11 14:08:04 zigbee2mqtt/SwitchBadOben "{"linkquality":21,"state"…
    #>  4 2024-03-11 14:08:22 zigbee2mqtt/MotionBadEG   "{"battery":60,"battery_low…
    #>  5 2024-03-11 14:08:22 zigbee2mqtt/SwitchBadEG   "{"linkquality":21,"state"…
    #>  6 2024-03-11 14:08:44 zigbee2mqtt/TempKucheUG   "{"battery":14,"humidity":…
    #>  7 2024-03-11 14:08:01 zigbee2mqtt/TempBadEG     "{"battery":17,"humidity":…
    #>  8 2024-03-11 14:08:04 zigbee2mqtt/MotionBadOben "{"battery":20,"battery_low…
    #>  9 2024-03-11 14:08:04 zigbee2mqtt/SwitchBadOben "{"linkquality":21,"state"…
    #> 10 2024-03-11 14:08:22 zigbee2mqtt/MotionBadEG   "{"battery":60,"battery_low…
    #> # ℹ 24,990 more rows
    
    bind_cols(select(data, -payload),
              data$payload |>
                paste0(collapse = "n") |>
                textConnection() |>
                jsonlite::stream_in() |>
                unnest_wider(update, names_sep = ".")
    )
    #>  Found 500 records... Found 1000 records... Found 1500 records... Found 2000 records... Found 2500 records... Found 3000 records... Found 3500 records... Found 4000 records... Found 4500 records... Found 5000 records... Found 5500 records... Found 6000 records... Found 6500 records... Found 7000 records... Found 7500 records... Found 8000 records... Found 8500 records... Found 9000 records... Found 9500 records... Found 10000 records... Found 10500 records... Found 11000 records... Found 11500 records... Found 12000 records... Found 12500 records... Found 13000 records... Found 13500 records... Found 14000 records... Found 14500 records... Found 15000 records... Found 15500 records... Found 16000 records... Found 16500 records... Found 17000 records... Found 17500 records... Found 18000 records... Found 18500 records... Found 19000 records... Found 19500 records... Found 20000 records... Found 20500 records... Found 21000 records... Found 21500 records... Found 22000 records... Found 22500 records... Found 23000 records... Found 23500 records... Found 24000 records... Found 24500 records... Found 25000 records... Imported 25000 records. Simplifying...
    #> # A tibble: 25,000 × 18
    #>    ts                  topic    battery humidity linkquality temperature voltage
    #>    <dttm>              <chr>      <int>    <dbl>       <int>       <dbl>   <int>
    #>  1 2024-03-11 14:08:01 zigbee2…      17     50.1         105        17.2    2500
    #>  2 2024-03-11 14:08:04 zigbee2…      20     NA            69        17      2600
    #>  3 2024-03-11 14:08:04 zigbee2…      NA     NA            21        NA        NA
    #>  4 2024-03-11 14:08:22 zigbee2…      60     NA            21        17.1    2800
    #>  5 2024-03-11 14:08:22 zigbee2…      NA     NA            21        NA        NA
    #>  6 2024-03-11 14:08:44 zigbee2…      14     54.9          21        14.4    2500
    #>  7 2024-03-11 14:08:01 zigbee2…      17     50.1         105        17.2    2500
    #>  8 2024-03-11 14:08:04 zigbee2…      20     NA            69        17      2600
    #>  9 2024-03-11 14:08:04 zigbee2…      NA     NA            21        NA        NA
    #> 10 2024-03-11 14:08:22 zigbee2…      60     NA            21        17.1    2800
    #> # ℹ 24,990 more rows
    #> # ℹ 11 more variables: battery_low <lgl>, illuminance <int>,
    #> #   illuminance_lux <int>, led_control <chr>, occupancy <lgl>,
    #> #   occupancy_timeout <int>, tamper <lgl>, update.installed_version <int>,
    #> #   update.latest_version <int>, update.state <chr>, state <chr>
    tictoc::toc()
    #> 5.88 sec elapsed
    

    Created on 2024-03-16 with reprex v2.1.0


    System details:

    benchmarkme::get_cpu() |> unlist() |> tibble::enframe()
    #> # A tibble: 3 × 2
    #>   name        value                                   
    #>   <chr>       <chr>                                   
    #> 1 vendor_id   GenuineIntel                            
    #> 2 model_name  Intel(R) Core(TM) i5-8250U CPU @ 1.60GHz
    #> 3 no_of_cores 8
    benchmarkme::get_ram()
    #> 17 GB
    
    sessioninfo::session_info()
    #> ─ Session info ───────────────────────────────────────────────────────────────
    #>  setting  value
    #>  version  R version 4.3.3 (2024-02-29 ucrt)
    #>  os       Windows 10 x64 (build 19045)
    #>  system   x86_64, mingw32
    #>  ui       RTerm
    #>  language (EN)
    #>  collate  Estonian_Estonia.utf8
    #>  ctype    Estonian_Estonia.utf8
    #>  tz       Europe/Tallinn
    #>  date     2024-03-17
    #>  pandoc   3.1.11 @ C:/Program Files/RStudio/resources/app/bin/quarto/bin/tools/ (via rmarkdown)
    #> 
    #> ─ Packages ───────────────────────────────────────────────────────────────────
    #>  ! package         * version date (UTC) lib source
    #>  P benchmarkme       1.0.8   2022-06-12 [?] CRAN (R 4.3.3)
    #>  P benchmarkmeData   1.0.4   2020-04-23 [?] CRAN (R 4.3.3)
    #>  P cli               3.6.2   2023-12-11 [?] RSPM (R 4.3.0)
    #>  P codetools         0.2-19  2023-02-01 [3] CRAN (R 4.3.3)
    #>  P digest            0.6.34  2024-01-11 [?] RSPM (R 4.3.2)
    #>  P doParallel        1.0.17  2022-02-07 [?] CRAN (R 4.3.3)
    #>  P dplyr           * 1.1.4   2023-11-17 [?] RSPM (R 4.3.0)
    #>  P evaluate          0.23    2023-11-01 [?] RSPM (R 4.3.0)
    #>  P fansi             1.0.6   2023-12-08 [?] CRAN (R 4.3.2)
    #>  P fastmap           1.1.1   2023-02-24 [?] CRAN (R 4.3.1)
    #>  P foreach           1.5.2   2022-02-02 [?] CRAN (R 4.3.3)
    #>  P fs                1.6.3   2023-07-20 [?] CRAN (R 4.3.1)
    #>  P generics          0.1.3   2022-07-05 [?] CRAN (R 4.3.1)
    #>  P glue              1.7.0   2024-01-09 [?] CRAN (R 4.3.2)
    #>  P htmltools         0.5.7   2023-11-03 [?] RSPM (R 4.3.0)
    #>  P httr              1.4.7   2023-08-15 [?] CRAN (R 4.3.1)
    #>  P iterators         1.0.14  2022-02-05 [?] CRAN (R 4.3.3)
    #>  P jsonlite          1.8.8   2023-12-04 [?] RSPM (R 4.3.0)
    #>  P knitr             1.45    2023-10-30 [?] RSPM (R 4.3.0)
    #>  P lattice           0.22-5  2023-10-24 [3] CRAN (R 4.3.3)
    #>    lifecycle         1.0.4   2023-11-07 [1] CRAN (R 4.3.1)
    #>  P lubridate         1.9.3   2023-09-27 [?] CRAN (R 4.3.1)
    #>  P magrittr          2.0.3   2022-03-30 [?] CRAN (R 4.3.1)
    #>  P Matrix            1.6-5   2024-01-11 [?] CRAN (R 4.3.2)
    #>  P pillar            1.9.0   2023-03-22 [?] CRAN (R 4.3.1)
    #>  P pkgconfig         2.0.3   2019-09-22 [?] CRAN (R 4.3.1)
    #>  P purrr             1.0.2   2023-08-10 [?] CRAN (R 4.3.1)
    #>  P R.cache           0.16.0  2022-07-21 [?] CRAN (R 4.3.1)
    #>  P R.methodsS3       1.8.2   2022-06-13 [?] CRAN (R 4.3.0)
    #>  P R.oo              1.26.0  2024-01-24 [?] CRAN (R 4.3.2)
    #>  P R.utils           2.12.3  2023-11-18 [?] RSPM (R 4.3.0)
    #>  P R6                2.5.1   2021-08-19 [?] CRAN (R 4.3.1)
    #>  P reprex            2.1.0   2024-01-11 [?] RSPM (R 4.3.2)
    #>  P rlang             1.1.3   2024-01-10 [?] CRAN (R 4.3.2)
    #>  P rmarkdown         2.26    2024-03-05 [?] CRAN (R 4.3.3)
    #>  P rstudioapi        0.15.0  2023-07-07 [?] CRAN (R 4.3.1)
    #>  P sessioninfo       1.2.2   2021-12-06 [?] CRAN (R 4.3.2)
    #>  P stringi           1.8.3   2023-12-11 [?] RSPM (R 4.3.0)
    #>  P stringr           1.5.1   2023-11-14 [?] RSPM (R 4.3.0)
    #>  P styler            1.10.2  2023-08-29 [?] CRAN (R 4.3.1)
    #>  P tibble            3.2.1   2023-03-20 [?] CRAN (R 4.3.1)
    #>  P tictoc            1.2     2023-04-23 [?] CRAN (R 4.3.1)
    #>  P tidyr           * 1.3.1   2024-01-24 [?] CRAN (R 4.3.2)
    #>  P tidyselect        1.2.0   2022-10-10 [?] CRAN (R 4.3.1)
    #>  P timechange        0.3.0   2024-01-18 [?] CRAN (R 4.3.2)
    #>    utf8              1.2.4   2023-10-22 [1] CRAN (R 4.3.2)
    #>  P vctrs             0.6.5   2023-12-01 [?] RSPM (R 4.3.0)
    #>  P withr             3.0.0   2024-01-16 [?] CRAN (R 4.3.2)
    #>  P xfun              0.42    2024-02-08 [?] RSPM (R 4.3.0)
    #>  P yaml              2.3.8   2023-12-11 [?] RSPM (R 4.3.0)
    
    Login or Signup to reply.
  2. Using the CRAN package rjsoncons and JMESpath, I pre-processed your data and focus on ‘payload’ so that the problem does not have extraneous information.

    json <-
        data |>
        enframe(NULL, value = "line") |>
        mutate(payload = str_extract(line, "(?<=payload ')[^']+(?=')"))
    

    I then created a JMESpath query to select the fields from each row of your data. For your case this is just a JSON object

    query <- '{
        battery: battery, humidity: humidity, linkquality: linkquality,
        temperature:temperature, voltage:voltage, illuminance: illuminance,
        illuminance_lux: illuminance_lux, led_control: led_control,
        occupancy: occupancy, state: state, action: action
    }'
    

    And from rjsoncons I used j_pivot() to query the entire column, coercing the result to a tibble

    tbl <- 
        json |>
        pull(payload) |>
        j_pivot(query, as = "tibble")
    

    Instead of coercing a missing value to NA, rjsoncons returns NULL, and consequently each column is a list rather than vector

    > tbl |> select(battery)
    # A tibble: 6 × 1
      battery  
      <list>   
    1 <int [1]>
    2 <int [1]>
    3 <NULL>   
    4 <int [1]>
    5 <NULL>   
    6 <int [1]>
    

    so I wrote a little helper function and used mutate() across everything

    unlist_null_to_NA <- function(x) {
        x[lengths(x) == 0] <- NA
        unlist(x)
    }
    
    tbl |>
        mutate(across(everything(), unlist_null_to_NA))
    

    Thus

    json |>
        pull(payload) |>
        j_pivot(query, as = "tibble") |>
        mutate(across(everything(), unlist_null_to_NA))
    ## # A tibble: 6 × 11
    ##   battery humidity linkquality temperature voltage illuminance illuminance_lux
    ##     <int>    <dbl>       <int>       <dbl>   <int>       <int>           <int>
    ## 1      17     50.1         105        17.2    2500          NA              NA
    ## 2      20     NA            69        17      2600       37914           12369
    ## 3      NA     NA            21        NA        NA          NA              NA
    ## 4      60     NA            21        17.1    2800       31370            1371
    ## 5      NA     NA            21        NA        NA          NA              NA
    ## 6      14     54.9          21        14.4    2500          NA              NA
    ## # ℹ 4 more variables: led_control <chr>, occupancy <lgl>, state <chr>,
    ## #   action <lgl>
    

    This should be fast (looping is at the C level, instead of in R), even though the parser is not as fast as yyjson.

    It would be a little faster and more memory efficient to start directly from a JSON file, if that is available.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search