skip to Main Content

I am trying to setup a service for interrogating a distant database based on Boost Beast.
Even if the context and the issues are totally different, I have found this question on SO:
HTTP Delay from AWS Instance to Bitmex with Boost Beast and Ubuntu 18
and I have tried to build a service from that example implementation.
However, in that example, the requests are sent within the handshake function

void
REST_on_handshake(beast::error_code ec)

and I need to be able to send requests after the handshake has been done (with a connection still alive). So, if I naively empty the body of REST_on_handshake, then when I send the requests from another function I get the error (which I suppose is expected behavior):

terminate called after throwing an instance of 'boost::wrapexcept<boost::system::system_error>'
what():  end of stream

A simple fix is to send an empty request to the server right after the function

rest_ioc.run();

then send new requests from a function; this seems to work, but the undesirable effect is that at every new request, that initial empty request is resent just before the legitimate new request. I have no idea why.

So, what is the correct way to send requests outside the function REST_on_handshake ?

EDIT:
Below is the original code from the example mentioned above, with the following modifications:

  1. The function write_after_handshake sends an incomplete request to establish and maintain connection, as REST_on_handshake is empty.

  2. REST_write_limit_order_bulk is (public and) outside REST_on_handshake as obviously in my own application I need to send requests after the initial handshake.

     // g++ -std=c++17 -pthread -o http_test.out http_test.cpp -lssl -lcrypto && ./http_test.out
    
     //Boost & Beast headers
     #include <boost/bind.hpp>
     #include <boost/beast/core.hpp>
     #include <boost/beast/core.hpp>
     #include <boost/beast/http.hpp>
     #include <boost/beast/ssl.hpp>
     #include <boost/beast/version.hpp>
     #include <boost/beast/websocket.hpp>
     #include <boost/beast/websocket/ssl.hpp>
     #include <boost/asio/strand.hpp>
     #include <boost/asio/connect.hpp>
     #include <boost/asio/ip/tcp.hpp>
     #include <boost/asio/ssl/stream.hpp>
     #include <boost/optional.hpp>
    
     #include <thread>
    
     //REST headers
     #include <sstream>
     #include <openssl/evp.h>
     #include <openssl/hmac.h>
    
     //Misc. headers
     #include <iomanip>
     #include <iostream>
     #include <string>
    
     namespace beast     = boost::beast;         // from <boost/beast.hpp>
     namespace http      = beast::http;          // from <boost/beast/http.hpp>
     namespace websocket = beast::websocket;     // from <boost/beast/websocket.hpp>
     namespace net       = boost::asio;          // from <boost/asio.hpp>
     namespace ssl       = boost::asio::ssl;     // from <boost/asio/ssl.hpp>
     using     tcp       = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
    
     using namespace std;
    
    
     class BitMEX_MM : public std::enable_shared_from_this<BitMEX_MM> {
    
         int n_tests = 1;
    
         //REST
         tcp::resolver rest_resolver;
         beast::ssl_stream<beast::tcp_stream> rest_stream;
         beast::flat_buffer rest_buffer;
    
         http::request<http::string_body>  post_req;
         http::response<http::string_body> post_res;
    
         string limit_order_msg;
    
         // Timing
         struct timespec start, end;
    
         //MEMBER VARIABLES
         string apiKey    = ""; //FILL IN API KEY
         string apiSecret = ""; //FILL IN API SEC
         int    apiKeyLen = apiKey.length();
         const char* apiKeyCStr = apiKey.c_str();
         int    apiSecLen = apiSecret.length();
         const char* apiSecCStr = apiSecret.c_str();
         int    expiry_t  = 5;
    
         //REST FUNCTIONS
         static size_t WriteCallback(void *contents, size_t size, size_t nmemb, void *userp)
         {
             ((string*)userp)->append((char*)contents, size * nmemb);
             return size * nmemb;
         }
    
         string HMAC_SHA256_hex_POST(string valid_till)
         {
             string data = "POST/api/v1/order" + valid_till + limit_order_msg;
    
             stringstream ss;
             unsigned int len;
             unsigned char out[EVP_MAX_MD_SIZE];
             HMAC_CTX *ctx = HMAC_CTX_new();
             HMAC_Init_ex(ctx, apiSecCStr, apiSecLen, EVP_sha256(), NULL);
             HMAC_Update(ctx, (unsigned char*)data.c_str(), data.length());
             HMAC_Final(ctx, out, &len);
             HMAC_CTX_free(ctx);
    
             for (int i = 0; i < len; ++i)
             {
                 ss << std::setw(2) << std::setfill('0') << hex << (unsigned int)out[i];
             }
             return ss.str();
         }
    
         void
         REST_on_resolve(
             beast::error_code ec,
             tcp::resolver::results_type results)
         {
             // Make the connection on the IP address we get from a lookup
             beast::get_lowest_layer(rest_stream).async_connect(
                 results,
                 beast::bind_front_handler(
                     &BitMEX_MM::REST_on_connect,
                     shared_from_this()));
         }
    
         void
         REST_on_connect(beast::error_code ec,
                         tcp::resolver::results_type::endpoint_type)
         {       
             // Perform the SSL handshake
             rest_stream.async_handshake(
                 ssl::stream_base::client,
                 beast::bind_front_handler(
                     &BitMEX_MM::REST_on_handshake,
                     shared_from_this()));
         }
    
         void
         REST_on_handshake(beast::error_code ec)
         {       
     /*        limit_order_msg += "{"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","clOrdID":"" + to_string(n_tests) 
                             + "","side":"Buy","price":10.0" 
                             + ","orderQty":2}]}";
             REST_write_limit_order_bulk();*/
         }
    
     public:
    
         explicit
         BitMEX_MM(net::io_context& rest_ioc, ssl::context& rest_ctx)
             : rest_resolver(net::make_strand(rest_ioc)),
             rest_stream(net::make_strand(rest_ioc), rest_ctx)
         { }
    
         void
         run_REST_service()
         {           
             // Set SNI Hostname (many hosts need this to handshake successfully)
             if(! SSL_set_tlsext_host_name(rest_stream.native_handle(), "www.bitmex.com"))
             {
                 beast::error_code ec{static_cast<int>(::ERR_get_error()), net::error::get_ssl_category()};
                 std::cerr << "ssl err " << ec.message() << "n";
                 return;
             }
    
             // Set up an HTTP GET request message
             post_req.version(11);
             post_req.method(http::verb::post);
             post_req.target("/api/v1/order");
             post_req.set(http::field::host, "www.bitmex.com");
             post_req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
             post_req.set(http::field::accept, "*/*");
             post_req.set(http::field::content_type, "application/json");
             post_req.set(http::field::connection, "Keep-Alive");
             post_req.set("api-key", apiKey);
             post_req.insert("Content-Length", "");
             post_req.insert("api-expires", "");
             post_req.insert("api-signature", "");
    
             // Look up the domain name
             rest_resolver.async_resolve(
                 "www.bitmex.com",
                 "443",
                 beast::bind_front_handler(
                     &BitMEX_MM::REST_on_resolve,
                     shared_from_this()));
    
         }
    
         void write_after_handshake()
         {
             limit_order_msg = "";  // Empty message to establish and maintain connection
    
             int valid_till        = time(0) + 5;
             string valid_till_str = to_string(valid_till);
    
             post_req.set("api-expires", valid_till_str);
             post_req.set("api-signature", HMAC_SHA256_hex_POST(valid_till_str));
             post_req.set("Content-Length", to_string(limit_order_msg.length()));
             post_req.body() = limit_order_msg;        
    
             beast::error_code _ec;
             std::size_t       _bt;
    
             http::write(rest_stream, post_req);
             http::read(rest_stream, rest_buffer, post_res);
    
             cout << "request (initial): n" << post_req << endl;
             cout << "response (initial): n" << post_res << endl;        
         }
    
         void REST_write_limit_order_bulk()
         {
             int valid_till        = time(0) + 5;
             string valid_till_str = to_string(valid_till);
    
             post_req.set("api-expires", valid_till_str);
             post_req.set("api-signature", HMAC_SHA256_hex_POST(valid_till_str));
             post_req.set("Content-Length", to_string(limit_order_msg.length()));
             post_req.body() = limit_order_msg;
    
             clock_gettime(CLOCK_MONOTONIC, &start);
    
             http::write(rest_stream, post_req);
             http::read(rest_stream, rest_buffer, post_res);
    
             cout << "request: n" << post_req << endl;
             cout << "response: n" << post_res << endl;
    
             beast::error_code _ec;
             std::size_t       _bt;
             process_limit_order_bulk_res(_ec, _bt);
         }
    
         void process_limit_order_bulk_res(beast::error_code ec,
                                           std::size_t bytes_transferred)
         {
             clock_gettime(CLOCK_MONOTONIC, &end);
             double time_taken;
             time_taken = (end.tv_sec  - start.tv_sec) + ((end.tv_nsec - start.tv_nsec) * 1e-9);
             cout << "response time: " << time_taken << endl;
    
             ++n_tests;
    
             if (n_tests <= 5)
             {
                 limit_order_msg = "{"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","side":"Buy","price":10.0,"orderQty":1.0}";
                 REST_write_limit_order_bulk();
             }
         }
     };
    
    
     int main(int argc, char** argv)
     {
    
         net::io_context rest_ioc;
         ssl::context    rest_ctx{ssl::context::tlsv12_client};
    
         auto algo = make_shared<BitMEX_MM>(rest_ioc, rest_ctx);
    
         cout << "Running http test." << endl;
    
         algo->run_REST_service();
    
         rest_ioc.run();
    
         algo->write_after_handshake();
    
         std::this_thread::sleep_for(std::chrono::milliseconds(30 * 1000));
    
         algo->REST_write_limit_order_bulk();  // Requests sent outside handshake
    
         return 0;
     }
    

The initial request sent in write_after_handshake is resent every time REST_write_limit_order_bulk is called.

EDIT 2:

Thanks for the time and effort @sehe. However, I am still missing some points:

  1. The only use of write_after_handshake() is to send a first (dummy) request, that should not perform anything but to maintain the connection alive for the following actual requests (if I comment it I get an error: terminate called after throwing an instance of ‘boost::wrapexceptboost::system::system_error’ what(): end of stream) as stated above). In the context here, that would be a request with an empty message, or 0 quantity, which returns an error (as expected) from the server. Is there a cleaner way to do that, in the sense a request for just establishing and maintaining the connection ?

  2. If I use write_after_handshake(), then all the following requests are correctly sent, but you can see with the calls to cout that the response for the first request in write_after_handshake is always displayed:

     {"error":{"message":"Invalid orderQty","name":"ValidationError"}}{"orderID":"... OK here...}
    

and so I understand that the first request is always resent ? This is the main reason of my question, how to avoid that ?

It seems to be the expected behavior: the cache is not cleared. Should I manage something myself here (is a cache too large a problem over time) ?
Anyway, to display only the last response this seems to work:

response_ = {};
  1. Beyond implementation issues, my initial goal was just the following:
    a) Establish and keep alive a connection to a distant server,
    b) Inside an infinite loop, when some event occurs, like a user asking for data, then form and send the request to the exchange, with the minimal delay possible (as the connection would be alive). The 30 sec. sleep delay is used to simulate a delay between establishing the connection and waiting to send an actual request for a user.

2

Answers


  1. The biggest issue looks to be that you’re mixing async and sync IO. From the fact that you’re trying to "do stuff" after the rest_ioc.run() I’m concluding that you don’t really know how the async code works, and probably don’t need it anyways.

    Therefore, I’d rewrite to be synchronous.

    // Look up the domain name
    tcp::resolver resolver(stream.get_executor());
    beast::get_lowest_layer(stream)
        .connect(resolver.resolve("www.bitmex.com", "443"));
    // Perform the SSL handshake
    stream.handshake(ssl::stream_base::client);
    
    write_after_handshake();
    std::this_thread::sleep_for(30'000ms); // or just 30s...
    while (n_tests++ < 5) {
        write_limit_order_bulk();
    }
    

    In reply to

    1. The function write_after_handshake sends an incomplete request to establish and maintain connection, as REST_on_handshake is empty.

    That’s inaccurate. It doesn’t send a request, not even an incomplete one. It just connects the SSL connection.

    but the undesirable effect is that at every new request, that initial empty request is resent just before the legitimate new request. I have no idea why.

    That’s exactly what you write in write_after_handshake()

    limit_order_msg =
        ""; // Empty message to establish and maintain connection
    

    That clears the request. If you don’t want that, don’t do that.

    Working back from the remark:

    The initial request sent in write_after_handshake is resent every time REST_write_limit_order_bulk is called.

    I can assume that you want the initial message to actually be {"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","side":"Buy","price":10.0,"orderQty":1.0} like in process_limit_order_bulkd_res. So let me fix it:

    limit_order_msg =
        R"({"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","side":"Buy","price":10.0,"orderQty":1.0})";
    

    Complete Example

    Many style fixes, C++ over C improvements etc. included.

    Live On Compiler Explorer

    // Boost & Beast headers
    #include <boost/asio.hpp>
    #include <boost/beast.hpp>
    #include <boost/beast/ssl.hpp>
    #include <boost/beast/version.hpp>
    
     // REST headers
    #include <iomanip>
    #include <iostream>
    #include <openssl/evp.h>
    #include <openssl/hmac.h>
    #include <sstream>
    
    namespace beast = boost::beast;     // from <boost/beast.hpp>
    namespace http  = beast::http;      // from <boost/beast/http.hpp>
    namespace net   = boost::asio;      // from <boost/asio.hpp>
    namespace ssl   = boost::asio::ssl; // from <boost/asio/ssl.hpp>
    
    using namespace std::chrono_literals;
    using tcp = boost::asio::ip::tcp;
    using beast::error_code;
    
    struct BitMEX_MM {
      private:
        beast::ssl_stream<beast::tcp_stream> stream_;
    
        beast::flat_buffer                buffer_;
        http::request<http::string_body>  request_;
        http::response<http::string_body> response_;
    
        std::string const apiKey_    = ""; // FILL IN API KEY
        std::string const apiSecret_ = ""; // FILL IN API SEC
        int const         expiry_t_  = 5;
        int               n_tests_   = 0;
    
        void sign_request() {
            std::ostringstream buf;
            buf << request_.method() << request_.target()
                << request_.at("api-expires") << request_.body();
            auto const data = buf.str();
    
            std::cout << "DEBUG: " << data << "n";
    
            ::HMAC_CTX* ctx = HMAC_CTX_new();
            ::HMAC_Init_ex(ctx, apiSecret_.data(), apiSecret_.length(),
                           EVP_sha256(), nullptr);
            ::HMAC_Update(ctx, (unsigned char*)data.c_str(), data.length());
    
            unsigned int  len = 0;
            unsigned char out[EVP_MAX_MD_SIZE]{};
            ::HMAC_Final(ctx, out, &len);
            ::HMAC_CTX_free(ctx);
    
            std::stringstream signature;
            for (unsigned i = 0; i < len; ++i)
                signature << std::setw(2) << std::setfill('0') << std::hex << (unsigned int)out[i];
            request_.set("api-signature", signature.str());
        }
    
        void perform_request(std::string const& request_body) {
            request_.set("api-expires", std::to_string(time(0) + expiry_t_));
            request_.body() = request_body;
            request_.prepare_payload();
            sign_request();
    
            auto start = std::chrono::steady_clock::now();
    
            http::write(stream_, request_);
            http::read(stream_, buffer_, response_);
    
            double time_taken = (std::chrono::steady_clock::now() - start)/1.0s;
    
            std::cout << " ------- request: n" << request_ << std::endl;
            std::cout << " ------- response: n" << response_ << std::endl;
            std::cout << " ------- response time: " << time_taken << std::endl;
        }
    
        void write_after_handshake() {
            perform_request(R"({"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","side":"Buy","price":10.0,"orderQty":1.0})");
        }
    
        void write_limit_order_bulk() {
            perform_request(
                R"({"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","clOrdID":")" +
                std::to_string(n_tests_) +
                R"(","side":"Buy","price":10.0,"orderQty":2}]})");
        }
      public:
        explicit BitMEX_MM(net::io_context& ioc, ssl::context& ssl_ctx)
            : stream_(make_strand(ioc.get_executor()),
                      ssl_ctx) // NOTE: strand not really required for sync
        {}
    
        void run() {
            // Set SNI Hostname (many hosts need this to handshake successfully)
            if (!::SSL_set_tlsext_host_name(stream_.native_handle(),
                                            "www.bitmex.com")) {
                throw boost::system::system_error(
                    static_cast<int>(::ERR_get_error()),
                    net::error::get_ssl_category());
            }
    
            // Set up an HTTP POST request message
            request_.version(11);
            request_.method(http::verb::post);
            request_.target("/api/v1/order");
            request_.set(http::field::host, "www.bitmex.com");
            request_.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
            request_.set(http::field::accept, "*/*");
            request_.set(http::field::content_type, "application/json");
            request_.set(http::field::connection, "Keep-Alive");
            request_.set("api-key", apiKey_);
            request_.insert("api-expires", "");
            request_.insert("api-signature", "");
    
            // Look up the domain name
            tcp::resolver resolver(stream_.get_executor());
            beast::get_lowest_layer(stream_)
                .connect(resolver.resolve("www.bitmex.com", "443"));
            // Perform the SSL handshake
            stream_.handshake(ssl::stream_base::client);
    
            write_after_handshake();
            std::this_thread::sleep_for(30'000ms); // or just 30s...
            while (n_tests_++ < 5) {
                write_limit_order_bulk();
            }
        }
    };
    
    int main() {
        try {
            net::io_context ioc;
            ssl::context    ctx{ssl::context::tlsv12_client};
    
            BitMEX_MM algo(ioc, ctx);
    
            std::cout << "Running http test." << std::endl;
            algo.run();
    
        } catch (boost::system::system_error const& se) {
            error_code ec = se.code();
            std::cerr << "Error: " << se.code().message();
            if (ec.has_location())
                std::cerr << " (from " << se.code().location() << ")";
            std::cerr << std::endl;
        }
    }
    

    Which on my system shows (obviously failing because of missing API key & secrets): enter image description here

    Login or Signup to reply.
  2. I posted one answer that simplified removing the asynchronous IO. This is great for your synchronous use case.

    However, it’s not great for detecting server disconnect early. For that, you would need to go the other way:

    "The "best practice" approach would always keep an async read pending to detect disconnects early. This requires you to go back to doing all IO asynchronously."

    Just for completeness I went ahead and implemented a BitMEX_MM class that has a synchronous interface (write_after_handshake and write_limit_order_bulk) for lack of better names.

    BitMEX_MM algo(make_strand(ioc), ctx);
    
    debug() << "Running http test." << std::endl;
    
    //debug() << algo.write_after_handshake().res << std::endl;
    
    for (unsigned id = 1; id <= 5; ++id) {
        auto delay = id * 20s; // * 0.5s;
        debug() << "Sleeping " << (delay / 1s) << " seconds" << std::endl;
        std::this_thread::sleep_for(delay);
    
        auto [req, time_taken, res] = algo.write_limit_order_bulk(id);
    
        debug() << " ------- response time: " << time_taken << std::endl;
        debug() << " ------- request: " << req.body() << std::endl;
        debug() << " ------- response: " << res.result_int() << " " << res.reason() << " " << res.body() << std::endl;
    }
    

    The internal implementation is almost fully asynchronous.

    The only thing I didn’t make asynchronous for brevity is the do_connect operation which would easily become twice the amount of code.

    The important thing is that read and write are asynchronous operations, meaning that read will be signaled with EOF when the server disconnects. When that happens, we do max. 3 reconnect attempts (with increasing delay).

    If the reconnect fails we propagate the exception to the reply for the currently processed request:

    if (ec && ec != net::error::operation_aborted)
        return do_propagate_error(do_connect(ec));
    

    Live On Compiler Explorer

    #include <boost/asio.hpp>
    #include <boost/beast.hpp>
    #include <boost/beast/ssl.hpp>
    
    #include <deque>
    #include <iomanip>
    #include <iostream>
    #include <openssl/hmac.h>
    #include <sstream>
    
    namespace beast = boost::beast; // from <boost/beast.hpp>
    namespace http  = beast::http;  // from <boost/beast/http.hpp>
    namespace net   = boost::asio;  // from <boost/asio.hpp>
    namespace ssl   = net::ssl;     // from <boost/asio/ssl.hpp>
    
    using namespace std::chrono_literals;
    using tcp = net::ip::tcp;
    using beast::error_code;
    
    static inline auto& debug() {
        static auto const s_program_start = std::chrono::steady_clock::now();
        auto cur = (std::chrono::steady_clock::now() - s_program_start) / 1.s;
        return std::cerr << std::fixed << std::setprecision(3) << std::setw(7)
                        << cur << "s - ";
    }
    
    struct BitMEX_MM {
    private:
        using stream_t     = net::ssl::stream<tcp::socket>;
        using request_msg  = http::request<http::string_body>;
        using response_msg = http::response<http::string_body>;
        using clock        = std::chrono::steady_clock;
    
        ssl::context&             ssl_ctx_;
        net::any_io_executor      executor_;
        boost::optional<stream_t> stream_;
        beast::flat_buffer        buffer_;
    
        struct reply_t {
            request_msg  request;
            double       time_taken;
            response_msg response;
        };
    
        using promise_t = std::promise<reply_t>;
        using future_t  = std::future<reply_t>;
    
        struct queued_t {
            request_msg       request;
            clock::time_point start;
            promise_t         promise_;
        };
    
        std::deque<queued_t> queue_;
        response_msg         incoming_;
    
        std::string const apiKey_    = ""; // FILL IN API KEY
        std::string const apiSecret_ = ""; // FILL IN API SEC
        int const         expiry_t_  = 5;
    
        std::string const host_    = "www.bitmex.com";
        std::string const service_ = "https";
        std::string const route_   = "/api/v1/order";
    
        void sign_request(request_msg& request_) {
            std::ostringstream buf;
            buf << request_.method() << request_.target()
                << request_.at("api-expires") << request_.body();
            auto const data = buf.str();
    
            ::HMAC_CTX* ctx = HMAC_CTX_new();
            ::HMAC_Init_ex(ctx, apiSecret_.data(), apiSecret_.length(),
                        EVP_sha256(), nullptr);
            ::HMAC_Update(ctx, (unsigned char*)data.c_str(), data.length());
    
            unsigned int  len = 0;
            unsigned char out[EVP_MAX_MD_SIZE]{};
            ::HMAC_Final(ctx, out, &len);
            ::HMAC_CTX_free(ctx);
    
            std::stringstream signature;
            for (unsigned i = 0; i < len; ++i)
                signature << std::setw(2) << std::setfill('0') << std::hex
                        << (unsigned int)out[i];
            request_.set("api-signature", signature.str());
        }
    
        void do_read() { // on the strand
            http::async_read(
                *stream_, buffer_, incoming_,
                beast::bind_front_handler(&BitMEX_MM::on_response, this));
        }
    
        void on_response(error_code ec, size_t n) { // on the strand
            debug() << "Received: " << n << " (" << ec.message() << ")n";
            auto& top = queue_.front();
    
            if (ec && ec != net::error::operation_aborted)
                return do_propagate_error(do_connect(ec));
    
            double time_taken = (clock::now() - top.start) / 1.0s;
    
            top.promise_.set_value({
                std::move(top.request),
                time_taken,
                std::move(incoming_),
            });
    
            do_read(); // incoming_ is free for the next read
    
            queue_.pop_front();
    
            if (!queue_.empty())
                do_send();
        }
    
        void do_perform_post(std::promise<reply_t> promise,
                            std::string const&    request_body) { // on the strand
            // Set up an HTTP POST request message
            request_msg req(http::verb::post, route_, 11);
            req.set(http::field::host, host_);
            req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
            req.set(http::field::accept, "*/*");
            req.set(http::field::content_type, "application/json");
            req.set(http::field::connection, "Keep-Alive");
            req.set("api-key", apiKey_);
            req.set("api-expires", std::to_string(time(0) + expiry_t_));
            req.body() = request_body;
            req.prepare_payload();
            sign_request(req);
    
            queue_.push_back(queued_t{std::move(req),
                                    std::chrono::steady_clock::now(),
                                    std::move(promise)});
    
            if (queue_.size() == 1)
                do_send();
        }
    
        void do_propagate_error(error_code ec) { // on the strand
            if (!ec.failed())
                return;
    
            debug() << "Unrecoverable (" << ec.message() << ")" << std::endl;
            if (queue_.empty())
                throw boost::system::system_error(ec);
    
            queue_.front().promise_.set_exception(
                std::make_exception_ptr(boost::system::system_error(ec)));
        }
    
        void do_send() { // on the strand
            if (queue_.empty())
                return;
            auto& top = queue_.front();
    
            http::async_write(
                *stream_, top.request, [this](error_code ec, size_t n) {
                    debug() << "Written: " << n << " (" << ec.message() << ")n";
    
                    if (ec && ec != net::error::operation_aborted)
                        return do_propagate_error(do_connect(ec));
                });
        }
    
        // TODO make async
        static constexpr int max_tries = 3;
        error_code do_connect(error_code ec = {}, int tries = max_tries) { // on the strand
            if (tries <= 0)
                return ec;
    
            auto nth = max_tries - tries;
            if (nth > 0) { // first try with no delay
                std::this_thread::sleep_for(500ms * nth);
            }
    
            if (ec)
                debug() << "Reconnecting (" << ec.message() << ") #" << tries << std::endl;
            else
                debug() << "Connecting" << std::endl;
    
            ec.clear();
    
            if (stream_->next_layer().is_open()) {
                if (!ec) {
                    stream_->shutdown(ec);
                    debug() << "Shutdown: " << ec.message() << std::endl;
                }
                if (!ec) {
                    stream_->next_layer().close(ec);
                    debug() << "Close: " << ec.message() << std::endl;
                }
            }
            buffer_.clear(); // IMPORTANT
            stream_.emplace(executor_, ssl_ctx_);
    
            auto& sock = stream_->next_layer();
            tcp::resolver resolver(executor_);
            if (!ec)
                net::connect(sock, resolver.resolve(host_, service_, ec), ec);
            if (!ec)
                sock.set_option(tcp::no_delay(true), ec);
    
            auto ep = sock.remote_endpoint(ec);
            debug() << "Connected to " << ep << " (" << ec.message() << ")n";
    
            // Set SNI Hostname (many hosts need this to handshake successfully)
            if (!::SSL_set_tlsext_host_name(stream_->native_handle(), host_.c_str()))
                throw boost::system::system_error(
                    static_cast<int>(::ERR_get_error()),
                    net::error::get_ssl_category());
    
            stream_->handshake(ssl::stream_base::client, ec);
            if (ec)
                return do_connect(ec, tries - 1);
    
            do_read();
            do_send(); // and also start a write if requests queued
    
            return ec;
        }
    
    public:
        explicit BitMEX_MM(net::any_io_executor ex, ssl::context& ssl_ctx)
            : ssl_ctx_(ssl_ctx)
            , executor_(ex)
            , stream_(boost::in_place_init, executor_, ssl_ctx_) //
        {
            do_connect();
        }
    
        void stop() {
            return net::post(executor_, std::packaged_task<void()>{[this] {
                                stream_->next_layer().cancel();
                            }})
                .get();
        }
    
        future_t perform_post(std::string body) {
            std::promise<reply_t> prom;
            auto fut = prom.get_future();
            net::post(executor_,
                    [this, b = std::move(body), p = std::move(prom)]() mutable {
                        return do_perform_post(std::move(p), std::move(b));
                    });
            return fut;
        }
    
        reply_t write_after_handshake() {
            return perform_post(
                    R"({"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","side":"Buy","price":10.0,"orderQty":1.0})")
                .get();
        }
    
        reply_t write_limit_order_bulk(unsigned test_id) {
            return perform_post(
                    R"({"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","clOrdID":")" +
                    std::to_string(test_id) +
                    R"(","side":"Buy","price":10.0,"orderQty":2}]})")
                .get();
        }
    };
    
    int main() {
        try {
            net::thread_pool ioc{/*1*/};
            ssl::context     ctx{ssl::context::tlsv12_client};
    
            BitMEX_MM algo(make_strand(ioc), ctx);
    
            debug() << "Running http test." << std::endl;
    
            //debug() << algo.write_after_handshake().response << std::endl;
    
            for (unsigned id = 1; id <= 5; ++id) {
                auto delay = id * 20s; // * 0.5s;
                debug() << "Sleeping " << (delay / 1s) << " seconds" << std::endl;
                std::this_thread::sleep_for(delay);
    
                auto [req, time_taken, res] = algo.write_limit_order_bulk(id);
    
                debug() << " ------- response time: " << time_taken << std::endl;
                // debug() << " ------- request: " << req << std::endl;
                // debug() << " ------- response: " << res << std::endl;
                debug() << " ------- request: " << req.body() << std::endl;
                debug() << " ------- response: " << res.result_int() << " " << res.reason() << " " << res.body() << std::endl;
            }
    
            algo.stop();
            ioc.join();
        } catch (boost::system::system_error const& se) {
            error_code ec = se.code();
            if (ec.has_location()) {
                debug() << "Error: " << ec.message() << " (from " << ec.location() << ")" << std::endl;
            } else {
                debug() << "Error: " << ec.message() << std::endl;
            }
        }
    }
    

    Testing on my system:

    enter image description here

    The output

      0.000s - Connecting
      0.019s - Connected to 172.64.146.66:443 (Success)
      0.035s - Running http test.
      0.035s - Sleeping 20 seconds
     15.024s - Received: 0 (end of stream)
     15.024s - Reconnecting (end of stream) #3
     15.024s - Shutdown: Success
     15.025s - Close: Success
     15.043s - Connected to 104.18.41.190:443 (Success)
     20.035s - Written: 419 (Success)
     20.081s - Received: 1163 (Success)
     20.081s -  ------- response time: 0.046
     20.081s -  ------- request: {"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","clOrdID":"1","side":"Buy","price":10.0,"orderQty":2}]}
     20.081s -  ------- response: 400 Bad Request {"error":{"message":"Missing API key.","name":"HTTPError"}}
     20.081s - Sleeping 40 seconds
     60.082s - Written: 419 (Success)
     60.125s - Received: 1163 (Success)
     60.125s -  ------- response time: 0.044
     60.125s -  ------- request: {"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","clOrdID":"2","side":"Buy","price":10.0,"orderQty":2}]}
     60.125s -  ------- response: 400 Bad Request {"error":{"message":"Missing API key.","name":"HTTPError"}}
     60.125s - Sleeping 60 seconds
    120.126s - Written: 419 (Success)
    120.170s - Received: 1163 (Success)
    120.170s -  ------- response time: 0.044
    120.170s -  ------- request: {"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","clOrdID":"3","side":"Buy","price":10.0,"orderQty":2}]}
    120.170s -  ------- response: 400 Bad Request {"error":{"message":"Missing API key.","name":"HTTPError"}}
    120.170s - Sleeping 80 seconds
    200.170s - Written: 419 (Success)
    200.252s - Received: 1163 (Success)
    200.252s -  ------- response time: 0.082
    200.252s -  ------- request: {"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","clOrdID":"4","side":"Buy","price":10.0,"orderQty":2}]}
    200.252s -  ------- response: 400 Bad Request {"error":{"message":"Missing API key.","name":"HTTPError"}}
    200.252s - Sleeping 100 seconds
    300.253s - Written: 419 (Success)
    300.300s - Received: 1163 (Success)
    300.300s -  ------- response time: 0.047
    300.300s -  ------- request: {"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","clOrdID":"5","side":"Buy","price":10.0,"orderQty":2}]}
    300.300s -  ------- response: 400 Bad Request {"error":{"message":"Missing API key.","name":"HTTPError"}}
    300.300s - Received: 0 (Operation canceled)
    

    That tells us that

    • bitmex.com drops a connection when the first request isn’t received within 15s of connection (see spontaneous reconnect (end of stream) at 15.024s)
    • the documented keep-alive time limit of 90 is not strictly adhered to ("Sleeping 100s" in the log is not followed by a disconnect)
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search