I am trying to setup a service for interrogating a distant database based on Boost Beast.
Even if the context and the issues are totally different, I have found this question on SO:
HTTP Delay from AWS Instance to Bitmex with Boost Beast and Ubuntu 18
and I have tried to build a service from that example implementation.
However, in that example, the requests are sent within the handshake function
void
REST_on_handshake(beast::error_code ec)
and I need to be able to send requests after the handshake has been done (with a connection still alive). So, if I naively empty the body of REST_on_handshake, then when I send the requests from another function I get the error (which I suppose is expected behavior):
terminate called after throwing an instance of 'boost::wrapexcept<boost::system::system_error>'
what(): end of stream
A simple fix is to send an empty request to the server right after the function
rest_ioc.run();
then send new requests from a function; this seems to work, but the undesirable effect is that at every new request, that initial empty request is resent just before the legitimate new request. I have no idea why.
So, what is the correct way to send requests outside the function REST_on_handshake ?
EDIT:
Below is the original code from the example mentioned above, with the following modifications:
-
The function write_after_handshake sends an incomplete request to establish and maintain connection, as REST_on_handshake is empty.
-
REST_write_limit_order_bulk is (public and) outside REST_on_handshake as obviously in my own application I need to send requests after the initial handshake.
// g++ -std=c++17 -pthread -o http_test.out http_test.cpp -lssl -lcrypto && ./http_test.out //Boost & Beast headers #include <boost/bind.hpp> #include <boost/beast/core.hpp> #include <boost/beast/core.hpp> #include <boost/beast/http.hpp> #include <boost/beast/ssl.hpp> #include <boost/beast/version.hpp> #include <boost/beast/websocket.hpp> #include <boost/beast/websocket/ssl.hpp> #include <boost/asio/strand.hpp> #include <boost/asio/connect.hpp> #include <boost/asio/ip/tcp.hpp> #include <boost/asio/ssl/stream.hpp> #include <boost/optional.hpp> #include <thread> //REST headers #include <sstream> #include <openssl/evp.h> #include <openssl/hmac.h> //Misc. headers #include <iomanip> #include <iostream> #include <string> namespace beast = boost::beast; // from <boost/beast.hpp> namespace http = beast::http; // from <boost/beast/http.hpp> namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp> namespace net = boost::asio; // from <boost/asio.hpp> namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp> using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp> using namespace std; class BitMEX_MM : public std::enable_shared_from_this<BitMEX_MM> { int n_tests = 1; //REST tcp::resolver rest_resolver; beast::ssl_stream<beast::tcp_stream> rest_stream; beast::flat_buffer rest_buffer; http::request<http::string_body> post_req; http::response<http::string_body> post_res; string limit_order_msg; // Timing struct timespec start, end; //MEMBER VARIABLES string apiKey = ""; //FILL IN API KEY string apiSecret = ""; //FILL IN API SEC int apiKeyLen = apiKey.length(); const char* apiKeyCStr = apiKey.c_str(); int apiSecLen = apiSecret.length(); const char* apiSecCStr = apiSecret.c_str(); int expiry_t = 5; //REST FUNCTIONS static size_t WriteCallback(void *contents, size_t size, size_t nmemb, void *userp) { ((string*)userp)->append((char*)contents, size * nmemb); return size * nmemb; } string HMAC_SHA256_hex_POST(string valid_till) { string data = "POST/api/v1/order" + valid_till + limit_order_msg; stringstream ss; unsigned int len; unsigned char out[EVP_MAX_MD_SIZE]; HMAC_CTX *ctx = HMAC_CTX_new(); HMAC_Init_ex(ctx, apiSecCStr, apiSecLen, EVP_sha256(), NULL); HMAC_Update(ctx, (unsigned char*)data.c_str(), data.length()); HMAC_Final(ctx, out, &len); HMAC_CTX_free(ctx); for (int i = 0; i < len; ++i) { ss << std::setw(2) << std::setfill('0') << hex << (unsigned int)out[i]; } return ss.str(); } void REST_on_resolve( beast::error_code ec, tcp::resolver::results_type results) { // Make the connection on the IP address we get from a lookup beast::get_lowest_layer(rest_stream).async_connect( results, beast::bind_front_handler( &BitMEX_MM::REST_on_connect, shared_from_this())); } void REST_on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type) { // Perform the SSL handshake rest_stream.async_handshake( ssl::stream_base::client, beast::bind_front_handler( &BitMEX_MM::REST_on_handshake, shared_from_this())); } void REST_on_handshake(beast::error_code ec) { /* limit_order_msg += "{"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","clOrdID":"" + to_string(n_tests) + "","side":"Buy","price":10.0" + ","orderQty":2}]}"; REST_write_limit_order_bulk();*/ } public: explicit BitMEX_MM(net::io_context& rest_ioc, ssl::context& rest_ctx) : rest_resolver(net::make_strand(rest_ioc)), rest_stream(net::make_strand(rest_ioc), rest_ctx) { } void run_REST_service() { // Set SNI Hostname (many hosts need this to handshake successfully) if(! SSL_set_tlsext_host_name(rest_stream.native_handle(), "www.bitmex.com")) { beast::error_code ec{static_cast<int>(::ERR_get_error()), net::error::get_ssl_category()}; std::cerr << "ssl err " << ec.message() << "n"; return; } // Set up an HTTP GET request message post_req.version(11); post_req.method(http::verb::post); post_req.target("/api/v1/order"); post_req.set(http::field::host, "www.bitmex.com"); post_req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING); post_req.set(http::field::accept, "*/*"); post_req.set(http::field::content_type, "application/json"); post_req.set(http::field::connection, "Keep-Alive"); post_req.set("api-key", apiKey); post_req.insert("Content-Length", ""); post_req.insert("api-expires", ""); post_req.insert("api-signature", ""); // Look up the domain name rest_resolver.async_resolve( "www.bitmex.com", "443", beast::bind_front_handler( &BitMEX_MM::REST_on_resolve, shared_from_this())); } void write_after_handshake() { limit_order_msg = ""; // Empty message to establish and maintain connection int valid_till = time(0) + 5; string valid_till_str = to_string(valid_till); post_req.set("api-expires", valid_till_str); post_req.set("api-signature", HMAC_SHA256_hex_POST(valid_till_str)); post_req.set("Content-Length", to_string(limit_order_msg.length())); post_req.body() = limit_order_msg; beast::error_code _ec; std::size_t _bt; http::write(rest_stream, post_req); http::read(rest_stream, rest_buffer, post_res); cout << "request (initial): n" << post_req << endl; cout << "response (initial): n" << post_res << endl; } void REST_write_limit_order_bulk() { int valid_till = time(0) + 5; string valid_till_str = to_string(valid_till); post_req.set("api-expires", valid_till_str); post_req.set("api-signature", HMAC_SHA256_hex_POST(valid_till_str)); post_req.set("Content-Length", to_string(limit_order_msg.length())); post_req.body() = limit_order_msg; clock_gettime(CLOCK_MONOTONIC, &start); http::write(rest_stream, post_req); http::read(rest_stream, rest_buffer, post_res); cout << "request: n" << post_req << endl; cout << "response: n" << post_res << endl; beast::error_code _ec; std::size_t _bt; process_limit_order_bulk_res(_ec, _bt); } void process_limit_order_bulk_res(beast::error_code ec, std::size_t bytes_transferred) { clock_gettime(CLOCK_MONOTONIC, &end); double time_taken; time_taken = (end.tv_sec - start.tv_sec) + ((end.tv_nsec - start.tv_nsec) * 1e-9); cout << "response time: " << time_taken << endl; ++n_tests; if (n_tests <= 5) { limit_order_msg = "{"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","side":"Buy","price":10.0,"orderQty":1.0}"; REST_write_limit_order_bulk(); } } }; int main(int argc, char** argv) { net::io_context rest_ioc; ssl::context rest_ctx{ssl::context::tlsv12_client}; auto algo = make_shared<BitMEX_MM>(rest_ioc, rest_ctx); cout << "Running http test." << endl; algo->run_REST_service(); rest_ioc.run(); algo->write_after_handshake(); std::this_thread::sleep_for(std::chrono::milliseconds(30 * 1000)); algo->REST_write_limit_order_bulk(); // Requests sent outside handshake return 0; }
The initial request sent in write_after_handshake is resent every time REST_write_limit_order_bulk is called.
EDIT 2:
Thanks for the time and effort @sehe. However, I am still missing some points:
-
The only use of write_after_handshake() is to send a first (dummy) request, that should not perform anything but to maintain the connection alive for the following actual requests (if I comment it I get an error: terminate called after throwing an instance of ‘boost::wrapexceptboost::system::system_error’ what(): end of stream) as stated above). In the context here, that would be a request with an empty message, or 0 quantity, which returns an error (as expected) from the server. Is there a cleaner way to do that, in the sense a request for just establishing and maintaining the connection ?
-
If I use write_after_handshake(), then all the following requests are correctly sent, but you can see with the calls to cout that the response for the first request in write_after_handshake is always displayed:{"error":{"message":"Invalid orderQty","name":"ValidationError"}}{"orderID":"... OK here...}
and so I understand that the first request is always resent ? This is the main reason of my question, how to avoid that ?
It seems to be the expected behavior: the cache is not cleared. Should I manage something myself here (is a cache too large a problem over time) ?
Anyway, to display only the last response this seems to work:
response_ = {};
- Beyond implementation issues, my initial goal was just the following:
a) Establish and keep alive a connection to a distant server,
b) Inside an infinite loop, when some event occurs, like a user asking for data, then form and send the request to the exchange, with the minimal delay possible (as the connection would be alive). The 30 sec. sleep delay is used to simulate a delay between establishing the connection and waiting to send an actual request for a user.
2
Answers
The biggest issue looks to be that you’re mixing async and sync IO. From the fact that you’re trying to "do stuff" after the
rest_ioc.run()
I’m concluding that you don’t really know how the async code works, and probably don’t need it anyways.Therefore, I’d rewrite to be synchronous.
In reply to
That’s inaccurate. It doesn’t send a request, not even an incomplete one. It just connects the SSL connection.
That’s exactly what you write in
write_after_handshake()
…That clears the request. If you don’t want that, don’t do that.
Working back from the remark:
I can assume that you want the initial message to actually be
{"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","side":"Buy","price":10.0,"orderQty":1.0}
like inprocess_limit_order_bulkd_res
. So let me fix it:Complete Example
Many style fixes, C++ over C improvements etc. included.
Live On Compiler Explorer
Which on my system shows (obviously failing because of missing API key & secrets):
I posted one answer that simplified removing the asynchronous IO. This is great for your synchronous use case.
However, it’s not great for detecting server disconnect early. For that, you would need to go the other way:
Just for completeness I went ahead and implemented a
BitMEX_MM
class that has a synchronous interface (write_after_handshake
andwrite_limit_order_bulk
) for lack of better names.The internal implementation is almost fully asynchronous.
The important thing is that read and write are asynchronous operations, meaning that
read
will be signaled with EOF when the server disconnects. When that happens, we do max. 3 reconnect attempts (with increasing delay).If the reconnect fails we propagate the exception to the reply for the currently processed request:
Live On Compiler Explorer
Testing on my system:
The output
That tells us that