From de92d28f29678abaaf9cc43ef5d8a76b173cd376 Mon Sep 17 00:00:00 2001 From: Luca Schlecker Date: Fri, 19 Nov 2021 18:42:25 +0100 Subject: [PATCH] replace `dumb_timer_queue` with new `task_timer`. fixes #264, lays ground for #273 and #257. Signed-off-by: Luca Schlecker --- include/crow.h | 2 +- include/crow/app.h | 13 ++- include/crow/dumb_timer_queue.h | 83 ------------------- include/crow/http_connection.h | 21 +++-- include/crow/http_server.h | 36 +++------ include/crow/task_timer.h | 138 ++++++++++++++++++++++++++++++++ 6 files changed, 166 insertions(+), 127 deletions(-) delete mode 100644 include/crow/dumb_timer_queue.h create mode 100644 include/crow/task_timer.h diff --git a/include/crow.h b/include/crow.h index 93ca96d1e..1687f9647 100644 --- a/include/crow.h +++ b/include/crow.h @@ -8,7 +8,7 @@ #include "crow/json.h" #include "crow/mustache.h" #include "crow/logging.h" -#include "crow/dumb_timer_queue.h" +#include "crow/task_timer.h" #include "crow/utility.h" #include "crow/common.h" #include "crow/http_request.h" diff --git a/include/crow/app.h b/include/crow/app.h index 49b3471b6..fdafe80f1 100644 --- a/include/crow/app.h +++ b/include/crow/app.h @@ -18,7 +18,7 @@ #include "crow/middleware_context.h" #include "crow/http_request.h" #include "crow/http_server.h" -#include "crow/dumb_timer_queue.h" +#include "crow/task_timer.h" #ifdef CROW_ENABLE_COMPRESSION #include "crow/compression.h" #endif @@ -35,10 +35,6 @@ namespace crow { -#ifdef CROW_MAIN - int detail::dumb_timer_queue::tick = 5; -#endif - #ifdef CROW_ENABLE_SSL using ssl_context_t = boost::asio::ssl::context; #endif @@ -131,7 +127,7 @@ namespace crow ///Set the connection timeout in seconds (default is 5) self_t& timeout(std::uint8_t timeout) { - detail::dumb_timer_queue::tick = timeout; + timeout_ = timeout; return *this; } @@ -284,7 +280,7 @@ namespace crow #ifdef CROW_ENABLE_SSL if (use_ssl_) { - ssl_server_ = std::move(std::unique_ptr(new ssl_server_t(this, bindaddr_, port_, server_name_, &middlewares_, concurrency_, &ssl_context_))); + ssl_server_ = std::move(std::unique_ptr(new ssl_server_t(this, bindaddr_, port_, server_name_, &middlewares_, concurrency_, timeout_, &ssl_context_))); ssl_server_->set_tick_function(tick_interval_, tick_function_); notify_server_start(); ssl_server_->run(); @@ -292,7 +288,7 @@ namespace crow else #endif { - server_ = std::move(std::unique_ptr(new server_t(this, bindaddr_, port_, server_name_, &middlewares_, concurrency_, nullptr))); + server_ = std::move(std::unique_ptr(new server_t(this, bindaddr_, port_, server_name_, &middlewares_, concurrency_, timeout_, nullptr))); server_->set_tick_function(tick_interval_, tick_function_); server_->signal_clear(); for (auto snum : signals_) @@ -424,6 +420,7 @@ namespace crow } private: + std::uint8_t timeout_{5}; uint16_t port_ = 80; uint16_t concurrency_ = 1; bool validated_ = false; diff --git a/include/crow/dumb_timer_queue.h b/include/crow/dumb_timer_queue.h deleted file mode 100644 index 8b48662b5..000000000 --- a/include/crow/dumb_timer_queue.h +++ /dev/null @@ -1,83 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include - -#include "crow/logging.h" - -namespace crow -{ - namespace detail - { - - /// Fast timer queue for fixed tick value. - class dumb_timer_queue - { - public: - static int tick; - using key = std::pair; - - void cancel(key& k) - { - auto self = k.first; - k.first = nullptr; - if (!self) - return; - - unsigned int index = static_cast(k.second - self->step_); - if (index < self->dq_.size()) - self->dq_[index].second = nullptr; - } - - /// Add a function to the queue. - key add(std::function f) - { - dq_.emplace_back(std::chrono::steady_clock::now(), std::move(f)); - int ret = step_+dq_.size()-1; - - CROW_LOG_DEBUG << "timer add inside: " << this << ' ' << ret ; - return {this, ret}; - } - - /// Process the queue: take functions out in time intervals and execute them. - void process() - { - if (!io_service_) - return; - - auto now = std::chrono::steady_clock::now(); - while(!dq_.empty()) - { - auto& x = dq_.front(); - if (now - x.first < std::chrono::seconds(tick)) - break; - if (x.second) - { - CROW_LOG_DEBUG << "timer call: " << this << ' ' << step_; - // we know that timer handlers are very simple currenty; call here - x.second(); - } - dq_.pop_front(); - step_++; - } - } - - void set_io_service(boost::asio::io_service& io_service) - { - io_service_ = &io_service; - } - - dumb_timer_queue() noexcept - { - } - - private: - boost::asio::io_service* io_service_{}; - std::deque>> dq_; - int step_{}; - }; - } -} diff --git a/include/crow/http_connection.h b/include/crow/http_connection.h index 3eac9ab8d..805a1d894 100644 --- a/include/crow/http_connection.h +++ b/include/crow/http_connection.h @@ -14,7 +14,7 @@ #include "crow/http_response.h" #include "crow/logging.h" #include "crow/settings.h" -#include "crow/dumb_timer_queue.h" +#include "crow/task_timer.h" #include "crow/middleware_context.h" #include "crow/socket_adaptors.h" #include "crow/compression.h" @@ -193,7 +193,7 @@ namespace crow const std::string& server_name, std::tuple* middlewares, std::function& get_cached_date_str_f, - detail::dumb_timer_queue& timer_queue, + detail::task_timer& task_timer, typename Adaptor::context* adaptor_ctx_ ) : adaptor_(io_service, adaptor_ctx_), @@ -202,7 +202,7 @@ namespace crow server_name_(server_name), middlewares_(middlewares), get_cached_date_str(get_cached_date_str_f), - timer_queue(timer_queue), + task_timer_(task_timer), res_stream_threshold_(handler->stream_threshold()) { #ifdef CROW_ENABLE_DEBUG @@ -653,15 +653,15 @@ namespace crow void cancel_deadline_timer() { - CROW_LOG_DEBUG << this << " timer cancelled: " << timer_cancel_key_.first << ' ' << timer_cancel_key_.second; - timer_queue.cancel(timer_cancel_key_); + CROW_LOG_DEBUG << this << " timer cancelled: " << &task_timer_ << ' ' << task_id_; + task_timer_.cancel(task_id_); } void start_deadline(/*int timeout = 5*/) { cancel_deadline_timer(); - - timer_cancel_key_ = timer_queue.add([this] + + task_id_ = task_timer_.set_timeout([this] { if (!adaptor_.is_open()) { @@ -670,7 +670,7 @@ namespace crow adaptor_.shutdown_readwrite(); adaptor_.close(); }); - CROW_LOG_DEBUG << this << " timer added: " << timer_cancel_key_.first << ' ' << timer_cancel_key_.second; + CROW_LOG_DEBUG << this << " timer added: " << &task_timer_ << ' ' << task_id_; } private: @@ -692,8 +692,7 @@ namespace crow std::string date_str_; std::string res_body_copy_; - //boost::asio::deadline_timer deadline_; - detail::dumb_timer_queue::key timer_cancel_key_; + detail::task_timer::identifier_type task_id_; bool is_reading{}; bool is_writing{}; @@ -705,7 +704,7 @@ namespace crow detail::context ctx_; std::function& get_cached_date_str; - detail::dumb_timer_queue& timer_queue; + detail::task_timer& task_timer_; size_t res_stream_threshold_; }; diff --git a/include/crow/http_server.h b/include/crow/http_server.h index 28b493771..1f7319ace 100644 --- a/include/crow/http_server.h +++ b/include/crow/http_server.h @@ -16,7 +16,7 @@ #include "crow/version.h" #include "crow/http_connection.h" #include "crow/logging.h" -#include "crow/dumb_timer_queue.h" +#include "crow/task_timer.h" namespace crow { @@ -27,7 +27,7 @@ namespace crow class Server { public: - Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = std::string("Crow/") + VERSION, std::tuple* middlewares = nullptr, uint16_t concurrency = 1, typename Adaptor::context* adaptor_ctx = nullptr) + Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = std::string("Crow/") + VERSION, std::tuple* middlewares = nullptr, uint16_t concurrency = 1, uint8_t timeout = 5, typename Adaptor::context* adaptor_ctx = nullptr) : acceptor_(io_service_, tcp::endpoint(boost::asio::ip::address::from_string(bindaddr), port)), signals_(io_service_, SIGINT, SIGTERM), tick_timer_(io_service_), @@ -37,7 +37,8 @@ namespace crow port_(port), bindaddr_(bindaddr), middlewares_(middlewares), - adaptor_ctx_(adaptor_ctx) + adaptor_ctx_(adaptor_ctx), + timeout_(timeout) { } @@ -64,7 +65,7 @@ namespace crow for(int i = 0; i < concurrency_; i++) io_service_pool_.emplace_back(new boost::asio::io_service()); get_cached_date_str_pool_.resize(concurrency_); - timer_queue_pool_.resize(concurrency_); + task_timer_pool_.resize(concurrency_); std::vector> v; std::atomic init_count(0); @@ -101,23 +102,10 @@ namespace crow return date_str; }; - // initializing timer queue - detail::dumb_timer_queue timer_queue; - timer_queue_pool_[i] = &timer_queue; - - timer_queue.set_io_service(*io_service_pool_[i]); - boost::asio::deadline_timer timer(*io_service_pool_[i]); - timer.expires_from_now(boost::posix_time::seconds(1)); - - std::function handler; - handler = [&](const boost::system::error_code& ec){ - if (ec) - return; - timer_queue.process(); - timer.expires_from_now(boost::posix_time::seconds(1)); - timer.async_wait(handler); - }; - timer.async_wait(handler); + // initializing task timers + detail::task_timer task_timer(*io_service_pool_[i]); + task_timer.set_default_timeout(timeout_); + task_timer_pool_[i] = &task_timer; init_count ++; while(1) @@ -202,8 +190,7 @@ namespace crow asio::io_service& is = pick_io_service(); auto p = new Connection( is, handler_, server_name_, middlewares_, - get_cached_date_str_pool_[roundrobin_index_], *timer_queue_pool_[roundrobin_index_], - adaptor_ctx_); + get_cached_date_str_pool_[roundrobin_index_], *task_timer_pool_[roundrobin_index_], adaptor_ctx_); acceptor_.async_accept(p->socket(), [this, p, &is](boost::system::error_code ec) { @@ -225,7 +212,7 @@ namespace crow private: asio::io_service io_service_; std::vector> io_service_pool_; - std::vector timer_queue_pool_; + std::vector task_timer_pool_; std::vector> get_cached_date_str_pool_; tcp::acceptor acceptor_; boost::asio::signal_set signals_; @@ -233,6 +220,7 @@ namespace crow Handler* handler_; uint16_t concurrency_{1}; + std::uint8_t timeout_; std::string server_name_; uint16_t port_; std::string bindaddr_; diff --git a/include/crow/task_timer.h b/include/crow/task_timer.h new file mode 100644 index 000000000..27ef2b9d8 --- /dev/null +++ b/include/crow/task_timer.h @@ -0,0 +1,138 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "crow/logging.h" + +namespace crow { +namespace detail { +/** + * @brief A class for scheduling functions to be called after a specific amount + * of ticks. A tick is equal to 1 second. + * + */ +class task_timer +{ + public: + using task_type = std::function; + using identifier_type = size_t; + + private: + using clock_type = std::chrono::steady_clock; + using time_type = clock_type::time_point; + + public: + task_timer(boost::asio::io_service& io_service) + : io_service_(io_service), deadline_timer_(io_service_) + { + deadline_timer_.expires_from_now(boost::posix_time::seconds(1)); + deadline_timer_.async_wait( + std::bind(&task_timer::tick_handler, this, std::placeholders::_1)); + } + + ~task_timer() { deadline_timer_.cancel(); } + + void cancel(identifier_type id) + { + tasks_.erase(id); + CROW_LOG_DEBUG << "timer cancelled: " << this << ' ' << id; + } + + /** + * @brief Schedule the given task to be executed after the default amount of + * ticks. + * + * @return identifier_type Used to cancel the thread. + * It is not bound to this task_timer instance and in some cases could lead to + * undefined behavior if used with other task_timer objects or after the task + * has been successfully executed. + */ + identifier_type set_timeout(const task_type& task) + { + tasks_.insert( + {++highest_id_, + {clock_type::now() + std::chrono::seconds(get_default_timeout()), + task}}); + CROW_LOG_DEBUG << "timer add inside: " << this << ' ' << highest_id_; + return highest_id_; + } + + /** + * @brief Schedule the given task to be executed after the given time. + * + * @param timeout The amount of ticks (seconds) to wait before execution. + * + * @return identifier_type Used to cancel the thread. + * It is not bound to this task_timer instance and in some cases could lead to + * undefined behavior if used with other task_timer objects or after the task + * has been successfully executed. + */ + identifier_type set_timeout(const task_type& task, std::uint8_t timeout) + { + tasks_.insert({++highest_id_, + {clock_type::now() + std::chrono::seconds(timeout), task}}); + CROW_LOG_DEBUG << "timer add inside: " << this << ' ' << highest_id_; + return highest_id_; + } + + /** + * @brief Set the default timeout for this task_timer instance. (Default: 5) + * + * @param timeout The amount of ticks (seconds) to wait before execution. + */ + void set_default_timeout(std::uint8_t timeout) { default_timeout_ = timeout; } + + /** + * @brief Get the default timeout. (Default: 5) + * + * @return std::uint8_t + */ + std::uint8_t get_default_timeout() const { return default_timeout_; } + + private: + void process_tasks() + { + time_type current_time = clock_type::now(); + std::vector finished_tasks; + + for (const auto& task : tasks_) { + if (task.second.first < current_time) { + (task.second.second)(); + finished_tasks.push_back(task.first); + CROW_LOG_DEBUG << "timer call: " << this << ' ' << task.first; + } + } + + for (const auto& task : finished_tasks) tasks_.erase(task); + + // If no task is currently scheduled, reset the issued ids back to 0. + if (tasks_.empty()) highest_id_ = 0; + } + + void tick_handler(const boost::system::error_code& ec) + { + if (ec) return; + + process_tasks(); + + deadline_timer_.expires_from_now(boost::posix_time::seconds(1)); + deadline_timer_.async_wait( + std::bind(&task_timer::tick_handler, this, std::placeholders::_1)); + } + + private: + std::uint8_t default_timeout_{5}; + boost::asio::io_service& io_service_; + boost::asio::deadline_timer deadline_timer_; + std::map> tasks_; + + // A continuosly increasing number to be issued to threads to identify them. + // If no tasks are scheduled, it will be reset to 0. + identifier_type highest_id_{0}; +}; +} // namespace detail +} // namespace crow