I use R to process log files of zigbee2mqtt which logs entries like this
head(data)
[1] "info 2024-03-11 14:08:01: MQTT publish: topic 'zigbee2mqtt/TempBadEG', payload '{"battery":17,"humidity":50.1,"linkquality":105,"temperature":17.25,"voltage":2500}'"
[2] "info 2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/MotionBadOben', payload '{"battery":20,"battery_low":true,"illuminance":37914,"illuminance_lux":12369,"led_control":"off","linkquality":69,"occupancy":false,"occupancy_timeout":60,"tamper":false,"temperature":17,"update":{"installed_version":262147,"latest_version":262146,"state":"idle"},"voltage":2600}'"
[3] "info 2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/SwitchBadOben', payload '{"linkquality":21,"state":"OFF"}'"
[4] "info 2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/MotionBadEG', payload '{"battery":60,"battery_low":false,"illuminance":31370,"illuminance_lux":1371,"led_control":"fault_only","linkquality":21,"occupancy":false,"occupancy_timeout":60,"tamper":false,"temperature":17.06,"update":{"installed_version":262147,"latest_version":262146,"state":"idle"},"voltage":2800}'"
[5] "info 2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/SwitchBadEG', payload '{"linkquality":21,"state":"OFF"}'"
[6] "info 2024-03-11 14:08:44: MQTT publish: topic 'zigbee2mqtt/TempKucheUG', payload '{"battery":14,"humidity":54.87,"linkquality":21,"temperature":14.41,"voltage":2500}'"
I have
data<-c("info 2024-03-11 14:08:01: MQTT publish: topic 'zigbee2mqtt/TempBadEG', payload '{"battery":17,"humidity":50.1,"linkquality":105,"temperature":17.25,"voltage":2500}'",
"info 2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/MotionBadOben', payload '{"battery":20,"battery_low":true,"illuminance":37914,"illuminance_lux":12369,"led_control":"off","linkquality":69,"occupancy":false,"occupancy_timeout":60,"tamper":false,"temperature":17,"update":{"installed_version":262147,"latest_version":262146,"state":"idle"},"voltage":2600}'",
"info 2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/SwitchBadOben', payload '{"linkquality":21,"state":"OFF"}'",
"info 2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/MotionBadEG', payload '{"battery":60,"battery_low":false,"illuminance":31370,"illuminance_lux":1371,"led_control":"fault_only","linkquality":21,"occupancy":false,"occupancy_timeout":60,"tamper":false,"temperature":17.06,"update":{"installed_version":262147,"latest_version":262146,"state":"idle"},"voltage":2800}'",
"info 2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/SwitchBadEG', payload '{"linkquality":21,"state":"OFF"}'",
"info 2024-03-11 14:08:44: MQTT publish: topic 'zigbee2mqtt/TempKucheUG', payload '{"battery":14,"humidity":54.87,"linkquality":21,"temperature":14.41,"voltage":2500}'"
and I do the following processing :
library(tidyverse)
library(jsonlite)
library(RColorBrewer)
library(ggthemes)
library(furrr)
plan(multisession, workers = availableCores() - 1)
log_data.start.timestamp <- Sys.time()
log_data <- data %>%
enframe(name = NULL, value = "line") %>%
mutate(timestamp = str_extract(line, "\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}"),
topic = str_extract(line, "(?<=topic ')[^']+(?=')"),
payload = str_extract(line, "(?<=payload ')[^']+(?=')")) %>%
mutate(topic = str_remove(topic, "zigbee2mqtt/")) %>%
distinct() %>%
mutate(payload = future_map(payload, ~{
tryCatch({
json <- fromJSON(.x, flatten = TRUE)
# Remove unwanted fields
json$data <- NULL
json$update <- NULL
json$type <- NULL
json$tamper <- NULL
json$power_on_behavior <- NULL
json$battery_low <- NULL
json$occupancy_timeout <- NULL
as_tibble(json)
}, error = function(e) {
tibble(value = .x)
})
})) %>%
select(-line) %>%
mutate(timestamp = as.POSIXct(timestamp, format = "%Y-%m-%d %H:%M:%S")) %>%
unnest(payload) %>%
mutate(date = as.Date(timestamp), time = format(timestamp, "%H:%M:%S")) %>%
distinct() %>%
arrange(timestamp)
Sys.time()-log_data.start.timestamp
and I get
head(log_data)
# A tibble: 6 × 15
timestamp topic battery humidity linkquality temperature voltage illuminance illuminance_lux led_control occupancy state action date time
<dttm> <chr> <int> <dbl> <int> <dbl> <int> <int> <int> <chr> <lgl> <chr> <chr> <date> <chr>
1 2024-03-11 14:08:01 TempBadEG 17 50.1 105 17.2 2500 NA NA NA NA NA NA 2024-03-11 14:08:01
2 2024-03-11 14:08:04 MotionBadOben 20 NA 69 17 2600 37914 12369 off FALSE NA NA 2024-03-11 14:08:04
3 2024-03-11 14:08:04 SwitchBadOben NA NA 21 NA NA NA NA NA NA OFF NA 2024-03-11 14:08:04
4 2024-03-11 14:08:22 MotionBadEG 60 NA 21 17.1 2800 31370 1371 fault_only FALSE NA NA 2024-03-11 14:08:22
5 2024-03-11 14:08:22 SwitchBadEG NA NA 21 NA NA NA NA NA NA OFF NA 2024-03-11 14:08:22
6 2024-03-11 14:08:44 TempKucheUG 14 54.9 21 14.4 2500 NA NA NA NA NA NA 2024-03-11 14:08:44
from there on I can do my analyses. However, for 5 days of logging (5 MB input) despite using furrr
i need 20 sec for this step. I suspect that payload processing and json flattening leads to this increased processing times.
I can not imagine how this would scale up if I get a years data. Since the different sensors report different payloads, I need a generic approach that will unflatten the json payload without prespecification of the variables. This also allows to add or change sensors that may report different variables.
Is there a way to make this more efficient and faster?
2
Answers
I didn’t benchmark it, but I’d guess all the parallel processing overhead and moving every single (relatively small) JSON to and from a worker costs more than you’d gain from it. If you need to parallelize, perhaps consider somewhat larger chunks, i.e. all daily or at least all hourly records.
Following is just a test of how
jsonlite::stream_in()
(with a defualt handler) might perform when combining all json strings to a single nd-json string first.Generated 25000 lines (~5.3MB on disk as text) for testing, took less than 6s to process.
Created on 2024-03-16 with reprex v2.1.0
System details:
Using the CRAN package rjsoncons and JMESpath, I pre-processed your data and focus on ‘payload’ so that the problem does not have extraneous information.
I then created a JMESpath query to select the fields from each row of your data. For your case this is just a JSON object
And from rjsoncons I used
j_pivot()
to query the entire column, coercing the result to a tibbleInstead of coercing a missing value to
NA
, rjsoncons returnsNULL
, and consequently each column is a list rather than vectorso I wrote a little helper function and used
mutate()
across everythingThus
This should be fast (looping is at the C level, instead of in R), even though the parser is not as fast as yyjson.
It would be a little faster and more memory efficient to start directly from a JSON file, if that is available.