Merge pull request #269 from CrowCpp/ws_close

Close websockets when app is terminated
This commit is contained in:
Farook Al-Sammarraie 2022-05-18 14:34:23 +03:00 committed by GitHub
commit 45ced144a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 82 additions and 31 deletions

View File

@ -19,8 +19,8 @@ sock.onopen = ()=>{
sock.onerror = (e)=>{
console.log('error',e)
}
sock.onclose = ()=>{
console.log('close')
sock.onclose = (e)=>{
console.log('close', e)
}
sock.onmessage = (e)=>{
$("#log").val(

View File

@ -59,6 +59,9 @@ namespace crow
Crow()
{}
std::atomic<int> websocket_count{0};
/// Process an Upgrade request
///
@ -115,6 +118,11 @@ namespace crow
return *this;
}
std::vector<int> signals()
{
return signals_;
}
/// Set the port that Crow will handle requests on
self_t& port(std::uint16_t port)
{
@ -300,7 +308,6 @@ namespace crow
{
server_ = std::move(std::unique_ptr<server_t>(new server_t(this, bindaddr_, port_, server_name_, &middlewares_, concurrency_, timeout_, nullptr)));
server_->set_tick_function(tick_interval_, tick_function_);
server_->signal_clear();
for (auto snum : signals_)
{
server_->signal_add(snum);

View File

@ -161,9 +161,21 @@ namespace crow
void stop()
{
io_service_.stop();
shutting_down_ = true; //Prevent the acceptor from taking new connections
while (handler_->websocket_count.load(std::memory_order_release) != 0) //Wait for the websockets to close properly
{
}
for (auto& io_service : io_service_pool_)
io_service->stop();
{
if (io_service != nullptr)
{
CROW_LOG_INFO << "Closing IO service " << &io_service;
io_service->stop(); //Close all io_services (and HTTP connections)
}
}
CROW_LOG_INFO << "Closing main IO service (" << &io_service_ << ')';
io_service_.stop(); //Close main io_service
}
void signal_clear()
@ -195,33 +207,36 @@ namespace crow
void do_accept()
{
uint16_t service_idx = pick_io_service_idx();
asio::io_service& is = *io_service_pool_[service_idx];
task_queue_length_pool_[service_idx]++;
CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
if (!shutting_down_)
{
uint16_t service_idx = pick_io_service_idx();
asio::io_service& is = *io_service_pool_[service_idx];
task_queue_length_pool_[service_idx]++;
CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
auto p = new Connection<Adaptor, Handler, Middlewares...>(
is, handler_, server_name_, middlewares_,
get_cached_date_str_pool_[service_idx], *task_timer_pool_[service_idx], adaptor_ctx_, task_queue_length_pool_[service_idx]);
auto p = new Connection<Adaptor, Handler, Middlewares...>(
is, handler_, server_name_, middlewares_,
get_cached_date_str_pool_[service_idx], *task_timer_pool_[service_idx], adaptor_ctx_, task_queue_length_pool_[service_idx]);
acceptor_.async_accept(
p->socket(),
[this, p, &is, service_idx](boost::system::error_code ec) {
if (!ec)
{
is.post(
[p] {
p->start();
});
}
else
{
task_queue_length_pool_[service_idx]--;
CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
delete p;
}
do_accept();
});
acceptor_.async_accept(
p->socket(),
[this, p, &is, service_idx](boost::system::error_code ec) {
if (!ec)
{
is.post(
[p] {
p->start();
});
}
else
{
task_queue_length_pool_[service_idx]--;
CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
delete p;
}
do_accept();
});
}
}
private:
@ -230,6 +245,7 @@ namespace crow
std::vector<detail::task_timer*> task_timer_pool_;
std::vector<std::function<std::string()>> get_cached_date_str_pool_;
tcp::acceptor acceptor_;
bool shutting_down_ = false;
boost::asio::signal_set signals_;
boost::asio::deadline_timer tick_timer_;

View File

@ -1,6 +1,7 @@
#pragma once
#include <boost/algorithm/string/predicate.hpp>
#include <boost/array.hpp>
#include "crow/logging.h"
#include "crow/socket_adaptors.h"
#include "crow/http_request.h"
#include "crow/TinySHA1.hpp"
@ -60,6 +61,7 @@ namespace crow
//
/// A websocket connection.
template<typename Adaptor, typename Handler>
class Connection : public connection
{
@ -77,11 +79,13 @@ namespace crow
std::function<bool(const crow::request&)> accept_handler):
adaptor_(std::move(adaptor)),
handler_(handler),
websocket_count_(handler_->websocket_count),
open_handler_(std::move(open_handler)),
message_handler_(std::move(message_handler)),
close_handler_(std::move(close_handler)),
error_handler_(std::move(error_handler)),
accept_handler_(std::move(accept_handler))
accept_handler_(std::move(accept_handler)),
signals_(adaptor_.get_io_service())
{
if (!boost::iequals(req.get_header_value("upgrade"), "websocket"))
{
@ -100,6 +104,11 @@ namespace crow
}
}
signals_.clear();
for (auto snum : handler_->signals())
signals_.add(snum);
// Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
// Sec-WebSocket-Version: 13
std::string magic = req.get_header_value("Sec-WebSocket-Key") + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
@ -107,6 +116,15 @@ namespace crow
s.processBytes(magic.data(), magic.size());
uint8_t digest[20];
s.getDigestBytes(digest);
signals_.async_wait(
[&](const boost::system::error_code& e, int /*signal_number*/) {
if (!e)
{
CROW_LOG_INFO << "Quitting Websocket: " << this;
close("Server Application Terminated");
}
});
start(crow::utility::base64encode((unsigned char*)digest, 20));
}
@ -290,6 +308,7 @@ namespace crow
has_mask_ = false;
#else
close_connection_ = true;
adaptor_.shutdown_readwrite();
adaptor_.close();
if (error_handler_)
error_handler_(*this);
@ -315,6 +334,7 @@ namespace crow
else
{
close_connection_ = true;
adaptor_.shutdown_readwrite();
adaptor_.close();
if (error_handler_)
error_handler_(*this);
@ -352,6 +372,7 @@ namespace crow
else
{
close_connection_ = true;
adaptor_.shutdown_readwrite();
adaptor_.close();
if (error_handler_)
error_handler_(*this);
@ -386,6 +407,7 @@ namespace crow
else
{
close_connection_ = true;
adaptor_.shutdown_readwrite();
adaptor_.close();
if (error_handler_)
error_handler_(*this);
@ -422,6 +444,7 @@ namespace crow
close_connection_ = true;
if (error_handler_)
error_handler_(*this);
adaptor_.shutdown_readwrite();
adaptor_.close();
}
});
@ -460,6 +483,7 @@ namespace crow
close_connection_ = true;
if (error_handler_)
error_handler_(*this);
adaptor_.shutdown_readwrite();
adaptor_.close();
}
});
@ -539,6 +563,7 @@ namespace crow
}
else
{
adaptor_.shutdown_readwrite();
adaptor_.close();
close_connection_ = true;
if (!is_close_handler_called_)
@ -608,6 +633,7 @@ namespace crow
if (!is_close_handler_called_)
if (close_handler_)
close_handler_(*this, "uncleanly");
websocket_count_--;
if (sending_buffers_.empty() && !is_reading)
delete this;
}
@ -636,12 +662,14 @@ namespace crow
bool error_occured_{false};
bool pong_received_{false};
bool is_close_handler_called_{false};
std::atomic<int>& websocket_count_;
std::function<void(crow::websocket::connection&)> open_handler_;
std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler_;
std::function<void(crow::websocket::connection&, const std::string&)> close_handler_;
std::function<void(crow::websocket::connection&)> error_handler_;
std::function<bool(const crow::request&)> accept_handler_;
boost::asio::signal_set signals_;
};
} // namespace websocket
} // namespace crow