skip to Main Content

With the task in mind – do a HGETALL on all keys matching a pattern (potentially millions).

I get keys on one connection and execute HGETALL concurrently on another connection. This still does not solve the HGETALL round trip latency, which I would like to completely get rid of.

What I want to do is push HGETALL requests with an in-flight window like there is no tomorrow.

I know I can do a single request with multiple HGETALL-s in it*, but then I still need to wait for a response and once in a while pay that latency.
*- although I still yet to figure out how to shape the response type for such request so it is not statically sized.

Is there a better way?

I am using coroutines syntax, so the code now would look like:

auto request = redis::request{};
request.push("HGETALL", key);
auto response = redis::response<std::vector<std::string>>{};
co_await conn->async_exec(request, response, asio::deferred);

Thanks!

2

Answers


  1. Looking at things, you can probably get everything you want from redis::generic_response. It’s gonna be some work, but if you know your target application and the commands are all the same you will probably be able to make it work without too much effort:

    redis::request request;
    for (auto const& key : keys)
        request.push("HGETALL", key);
    request.push("QUIT");
    
    redis::generic_response response;
    co_await conn->async_exec(request, response, asio::deferred);
    
    auto const& gen = response.value();
    fmt::print("generic: {}n", gen);
    

    It might be advantageous for the interpretation side to batch in a transaction (I’m not a Redis expert, so I have no idea whether this hurts server performance):

    redis::request request;
    request.push("MULTI");
    for (auto const& key : keys)
        request.push("HGETALL", key);
    request.push("EXEC");
    request.push("QUIT");
    

    Using simple formatters to display the structure of the responses:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/redis/src.hpp>
    #include <fmt/ostream.h>
    #include <fmt/ranges.h>
    namespace redis = boost::redis;
    namespace asio  = boost::asio;
    
    template <> struct fmt::formatter<redis::resp3::type> : fmt::ostream_formatter {};
    template <> struct fmt::formatter<redis::resp3::node> : fmt::formatter<std::string> {
        template <typename FormatContext> auto format(redis::resp3::node const& node, FormatContext& ctx) const {
            return format_to(ctx.out(), "({}@{}, {}, {})", node.data_type, node.depth, node.value,
                             node.aggregate_size);
        }
    };
    
    asio::awaitable<void> use_redis(std::vector<std::string> keys) {
        auto conn = std::make_shared<redis::connection>(co_await asio::this_coro::executor);
        redis::config cfg;
        conn->async_run(cfg, {}, asio::consign(asio::detached, conn));
    
        {
            redis::request request;
            for (auto const& key : keys)
                request.push("HGETALL", key);
            request.push("QUIT");
    
            redis::generic_response response;
            co_await conn->async_exec(request, response, asio::deferred);
    
            fmt::print("generic: {}n", response.value());
        }
    
        conn->cancel();
        conn->async_run(cfg, {}, asio::consign(asio::detached, conn));
    
        {
            redis::request request;
            request.push("MULTI");
            for (auto const& key : keys)
                request.push("HGETALL", key);
            request.push("EXEC");
            request.push("QUIT");
    
            redis::generic_response response;
            co_await conn->async_exec(request, response, asio::deferred);
    
            fmt::print("multi: {}n", response.value());
        }
    
        conn->cancel();
    }
    
    int main() {
        asio::io_context ioc(1);
        co_spawn(ioc, use_redis({"foo", "bar"}), asio::detached);
        ioc.run();
    }
    

    With a local demo:

    HSET foo bar baz
    HSET bar qux quuz
    
    $ redis-cli 
    127.0.0.1:6379> HGETALL foo
    1) "bar"
    2) "baz"
    127.0.0.1:6379> HGETALL bar
    1) "qux"
    2) "quuuz"
    127.0.0.1:6379> 
    
    
    $ ./build/sotest 
    generic: [(map@0, , 1), (blob_string@1, bar, 1), (blob_string@1, baz, 1), (map@0, , 1), (blob_string@1, qux, 1), (blob_string@1, quuuz, 1), (simple_string@0, OK, 1)]
    multi: [(simple_string@0, OK, 1), (simple_string@0, QUEUED, 1), (simple_string@0, QUEUED, 1), (array@0, , 2), (map@1, , 1), (blob_string@2, bar, 1), (blob_string@2, baz, 1), (map@1, , 1), (blob_string@2, qux, 1), (blob_string@2, quuuz, 1), (simple_string@0, OK, 1)]
    
    Login or Signup to reply.
  2. I couldn’t find a solution using boost::redis::generic_response. Hence I used boost::redis::response itself to read responses for pipelined requests. I used parameterization of tuples as shown here: Parameterize of tuple with repeated type. My code looks like the following:

    template<typename T, typename Seq>
    struct expander;
    
    template<typename T, std::size_t... Is>
    struct expander<T, std::index_sequence<Is...>> {
        template<typename E, std::size_t>
        using elem = E;
    
        using type = boost::redis::response<elem<T, Is>...>;
    };
    
    template <size_t N, class Type>
    struct my_tuple
    {
       using type = typename expander<Type, std::make_index_sequence<N>>::type;
    };
    

    The following is a snippet from a google test case demonstrating pipelining and getting response using boost::redis::response

    {
      request req;
      response<boost::redis::ignore_t> resp;
      req.push("HSET", "testkey1", "name", "abc");
      conn->async_exec(req, resp, yield[ec]);
      ASSERT_EQ((bool)ec, false);
    }
    {
      request req;
      response<boost::redis::ignore_t> resp;
      req.push("HSET", "testkey2", "name", "def");
      conn->async_exec(req, resp, yield[ec]);
      ASSERT_EQ((bool)ec, false);
    }
    {
      std::vector<std::string> fields;
      fields.push_back("name");
      request req;
      req.push_range("HMGET", "testkey1", fields);
      req.push_range("HMGET", "testkey2", fields);
          
      ASSERT_EQ(req.get_commands(), 2);
      my_tuple<2, std::vector<std::string>>::type resp;
      conn->async_exec(req, resp, yield[ec]);
      ASSERT_EQ((bool)ec, false);
    
      std::cout << "HMGET: " << std::get<0>(resp).value()[0] << std::endl;
      std::cout << "HMGET: " << std::get<1>(resp).value()[0] << std::endl;
     }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search