Added functionality to close websocket connections before the app is

terminated.
This is incomplete and needs more work.
This commit is contained in:
The-EDev 2021-11-06 06:06:18 +03:00
parent b5137c52a1
commit fd6de9bc05
No known key found for this signature in database
GPG Key ID: 51C45DC0C413DCD9
5 changed files with 83 additions and 43 deletions

View File

@ -19,8 +19,8 @@ sock.onopen = ()=>{
sock.onerror = (e)=>{ sock.onerror = (e)=>{
console.log('error',e) console.log('error',e)
} }
sock.onclose = ()=>{ sock.onclose = (e)=>{
console.log('close') console.log('close', e)
} }
sock.onmessage = (e)=>{ sock.onmessage = (e)=>{
$("#log").val( $("#log").val(

View File

@ -62,6 +62,9 @@ namespace crow
{ {
} }
std::atomic<int> websocket_count{0};
///Process an Upgrade request ///Process an Upgrade request
/// ///
@ -69,7 +72,7 @@ namespace crow
template <typename Adaptor> template <typename Adaptor>
void handle_upgrade(const request& req, response& res, Adaptor&& adaptor) void handle_upgrade(const request& req, response& res, Adaptor&& adaptor)
{ {
router_.handle_upgrade(req, res, adaptor); router_.handle_upgrade(req, res, adaptor, websocket_count);
} }
///Process the request and generate a response for it ///Process the request and generate a response for it
@ -289,7 +292,6 @@ namespace crow
{ {
server_ = std::move(std::unique_ptr<server_t>(new server_t(this, bindaddr_, port_, server_name_, &middlewares_, concurrency_, nullptr))); server_ = std::move(std::unique_ptr<server_t>(new server_t(this, bindaddr_, port_, server_name_, &middlewares_, concurrency_, nullptr)));
server_->set_tick_function(tick_interval_, tick_function_); server_->set_tick_function(tick_interval_, tick_function_);
server_->signal_clear();
for (auto snum : signals_) for (auto snum : signals_)
{ {
server_->signal_add(snum); server_->signal_add(snum);

View File

@ -29,7 +29,7 @@ namespace crow
public: public:
Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = std::string("Crow/") + VERSION, std::tuple<Middlewares...>* middlewares = nullptr, uint16_t concurrency = 1, typename Adaptor::context* adaptor_ctx = nullptr) Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = std::string("Crow/") + VERSION, std::tuple<Middlewares...>* middlewares = nullptr, uint16_t concurrency = 1, typename Adaptor::context* adaptor_ctx = nullptr)
: acceptor_(io_service_, tcp::endpoint(boost::asio::ip::address::from_string(bindaddr), port)), : acceptor_(io_service_, tcp::endpoint(boost::asio::ip::address::from_string(bindaddr), port)),
signals_(io_service_, SIGINT, SIGTERM), signals_(io_service_),
tick_timer_(io_service_), tick_timer_(io_service_),
handler_(handler), handler_(handler),
concurrency_(concurrency == 0 ? 1 : concurrency), concurrency_(concurrency == 0 ? 1 : concurrency),
@ -169,9 +169,21 @@ namespace crow
void stop() void stop()
{ {
io_service_.stop(); should_close_ = false; //Prevent the acceptor from taking new connections
while (handler_->websocket_count.load(std::memory_order_release) != 0) //Wait for the websockets to close properly
{
}
for(auto& io_service:io_service_pool_) for(auto& io_service:io_service_pool_)
io_service->stop(); {
if (io_service != nullptr)
{
CROW_LOG_INFO << "Closing IO service " << &io_service;
io_service->stop(); //Close all io_services (and HTTP connections)
}
}
CROW_LOG_INFO << "Closing main IO service (" << &io_service_ << ')';
io_service_.stop(); //Close main io_service
} }
void signal_clear() void signal_clear()
@ -201,22 +213,25 @@ namespace crow
is, handler_, server_name_, middlewares_, is, handler_, server_name_, middlewares_,
get_cached_date_str_pool_[roundrobin_index_], *timer_queue_pool_[roundrobin_index_], get_cached_date_str_pool_[roundrobin_index_], *timer_queue_pool_[roundrobin_index_],
adaptor_ctx_); adaptor_ctx_);
acceptor_.async_accept(p->socket(), if (!should_close_)
[this, p, &is](boost::system::error_code ec) {
{ acceptor_.async_accept(p->socket(),
if (!ec) [this, p, &is](boost::system::error_code ec)
{ {
is.post([p] if (!ec)
{ {
p->start(); is.post([p]
}); {
} p->start();
else });
{ }
delete p; else
} {
do_accept(); delete p;
}); }
do_accept();
});
}
} }
private: private:
@ -225,6 +240,7 @@ namespace crow
std::vector<detail::dumb_timer_queue*> timer_queue_pool_; std::vector<detail::dumb_timer_queue*> timer_queue_pool_;
std::vector<std::function<std::string()>> get_cached_date_str_pool_; std::vector<std::function<std::string()>> get_cached_date_str_pool_;
tcp::acceptor acceptor_; tcp::acceptor acceptor_;
bool should_close_ = false;
boost::asio::signal_set signals_; boost::asio::signal_set signals_;
boost::asio::deadline_timer tick_timer_; boost::asio::deadline_timer tick_timer_;

View File

@ -49,7 +49,7 @@ namespace crow
} }
virtual void handle(const request&, response&, const routing_params&) = 0; virtual void handle(const request&, response&, const routing_params&) = 0;
virtual void handle_upgrade(const request&, response& res, SocketAdaptor&&) virtual void handle_upgrade(const request&, response& res, SocketAdaptor&&, std::atomic<int>&)
{ {
res = response(404); res = response(404);
res.end(); res.end();
@ -400,9 +400,9 @@ namespace crow
res.end(); res.end();
} }
void handle_upgrade(const request& req, response&, SocketAdaptor&& adaptor) override void handle_upgrade(const request& req, response&, SocketAdaptor&& adaptor, std::atomic<int>& websocket_count) override
{ {
new crow::websocket::Connection<SocketAdaptor>(req, std::move(adaptor), open_handler_, message_handler_, close_handler_, error_handler_, accept_handler_); new crow::websocket::Connection<SocketAdaptor>(req, std::move(adaptor), websocket_count, open_handler_, message_handler_, close_handler_, error_handler_, accept_handler_);
} }
#ifdef CROW_ENABLE_SSL #ifdef CROW_ENABLE_SSL
void handle_upgrade(const request& req, response&, SSLAdaptor&& adaptor) override void handle_upgrade(const request& req, response&, SSLAdaptor&& adaptor) override
@ -1397,7 +1397,7 @@ namespace crow
//TODO maybe add actual_method //TODO maybe add actual_method
template <typename Adaptor> template <typename Adaptor>
void handle_upgrade(const request& req, response& res, Adaptor&& adaptor) void handle_upgrade(const request& req, response& res, Adaptor&& adaptor, std::atomic<int>& websocket_count)
{ {
if (req.method >= HTTPMethod::InternalMethodCount) if (req.method >= HTTPMethod::InternalMethodCount)
return; return;
@ -1451,7 +1451,7 @@ namespace crow
// any uncaught exceptions become 500s // any uncaught exceptions become 500s
try try
{ {
rules[rule_index]->handle_upgrade(req, res, std::move(adaptor)); rules[rule_index]->handle_upgrade(req, res, std::move(adaptor), websocket_count);
} }
catch(std::exception& e) catch(std::exception& e)
{ {

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <boost/algorithm/string/predicate.hpp> #include <boost/algorithm/string/predicate.hpp>
#include <boost/array.hpp> #include <boost/array.hpp>
#include "crow/logging.h"
#include "crow/socket_adaptors.h" #include "crow/socket_adaptors.h"
#include "crow/http_request.h" #include "crow/http_request.h"
#include "crow/TinySHA1.hpp" #include "crow/TinySHA1.hpp"
@ -56,7 +57,7 @@ namespace crow
// +---------------------------------------------------------------+ // +---------------------------------------------------------------+
/// A websocket connection. /// A websocket connection.
template <typename Adaptor> template <typename Adaptor>
class Connection : public connection class Connection : public connection
{ {
public: public:
@ -65,19 +66,20 @@ namespace crow
/// ///
/// Requires a request with an "Upgrade: websocket" header.<br> /// Requires a request with an "Upgrade: websocket" header.<br>
/// Automatically handles the handshake. /// Automatically handles the handshake.
Connection(const crow::request& req, Adaptor&& adaptor, Connection(const crow::request& req, Adaptor&& adaptor, std::atomic<int>& websocket_count,
std::function<void(crow::websocket::connection&)> open_handler, std::function<void(crow::websocket::connection&)> open_handler,
std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler, std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler,
std::function<void(crow::websocket::connection&, const std::string&)> close_handler, std::function<void(crow::websocket::connection&, const std::string&)> close_handler,
std::function<void(crow::websocket::connection&)> error_handler, std::function<void(crow::websocket::connection&)> error_handler,
std::function<bool(const crow::request&)> accept_handler) std::function<bool(const crow::request&)> accept_handler)
: adaptor_(std::move(adaptor)), open_handler_(std::move(open_handler)), message_handler_(std::move(message_handler)), close_handler_(std::move(close_handler)), error_handler_(std::move(error_handler)) : adaptor_(std::move(adaptor)), websocket_count_(websocket_count), open_handler_(std::move(open_handler)), message_handler_(std::move(message_handler)), close_handler_(std::move(close_handler)), error_handler_(std::move(error_handler))
, accept_handler_(std::move(accept_handler)) , accept_handler_(std::move(accept_handler)), signals_(adaptor_.get_io_service(), SIGINT, SIGTERM)
{ {
if (!boost::iequals(req.get_header_value("upgrade"), "websocket")) if (!boost::iequals(req.get_header_value("upgrade"), "websocket"))
{ {
adaptor.close(); adaptor.close();
delete this; delete this;
return; return;
} }
@ -85,19 +87,28 @@ namespace crow
{ {
if (!accept_handler_(req)) if (!accept_handler_(req))
{ {
adaptor.close(); adaptor.close();
delete this; delete this;
return; return;
} }
} }
websocket_count_++;
// Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== // Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
// Sec-WebSocket-Version: 13 // Sec-WebSocket-Version: 13
std::string magic = req.get_header_value("Sec-WebSocket-Key") + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; std::string magic = req.get_header_value("Sec-WebSocket-Key") + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
sha1::SHA1 s; sha1::SHA1 s;
s.processBytes(magic.data(), magic.size()); s.processBytes(magic.data(), magic.size());
uint8_t digest[20]; uint8_t digest[20];
s.getDigestBytes(digest); s.getDigestBytes(digest);
signals_.async_wait(
[&](const boost::system::error_code& e, int /*signal_number*/){
if (!e){
CROW_LOG_INFO << "quitting " << this;
do_not_destroy_ = true;
close("Quitter");
}
});
start(crow::utility::base64encode((unsigned char*)digest, 20)); start(crow::utility::base64encode((unsigned char*)digest, 20));
} }
@ -588,11 +599,12 @@ namespace crow
if (!is_close_handler_called_) if (!is_close_handler_called_)
if (close_handler_) if (close_handler_)
close_handler_(*this, "uncleanly"); close_handler_(*this, "uncleanly");
if (sending_buffers_.empty() && !is_reading) websocket_count_--;
if (sending_buffers_.empty() && !is_reading && !do_not_destroy_)
delete this; delete this;
} }
private: private:
Adaptor adaptor_; Adaptor adaptor_;
std::vector<std::string> sending_buffers_; std::vector<std::string> sending_buffers_;
std::vector<std::string> write_buffers_; std::vector<std::string> write_buffers_;
@ -615,11 +627,21 @@ namespace crow
bool pong_received_{false}; bool pong_received_{false};
bool is_close_handler_called_{false}; bool is_close_handler_called_{false};
//**WARNING**
//SETTING THIS PREVENTS THE OBJECT FROM BEING DELETED,
//AND WILL ABSOLUTELY CAUSE A MEMORY LEAK!!
//ONLY USE IF THE APPLICATION IS BEING TERMINATED!!
bool do_not_destroy_{false};
//**WARNING**
std::atomic<int>& websocket_count_;
std::function<void(crow::websocket::connection&)> open_handler_; std::function<void(crow::websocket::connection&)> open_handler_;
std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler_; std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler_;
std::function<void(crow::websocket::connection&, const std::string&)> close_handler_; std::function<void(crow::websocket::connection&, const std::string&)> close_handler_;
std::function<void(crow::websocket::connection&)> error_handler_; std::function<void(crow::websocket::connection&)> error_handler_;
std::function<bool(const crow::request&)> accept_handler_; std::function<bool(const crow::request&)> accept_handler_;
boost::asio::signal_set signals_;
}; };
} }
} }