mirror of
https://github.com/CrowCpp/Crow.git
synced 2024-06-07 21:10:44 +00:00
replace dumb_timer_queue
with new task_timer
.
fixes #264, lays ground for #273 and #257. Signed-off-by: Luca Schlecker <luca.schlecker@hotmail.com>
This commit is contained in:
parent
22d3918345
commit
de92d28f29
@ -8,7 +8,7 @@
|
||||
#include "crow/json.h"
|
||||
#include "crow/mustache.h"
|
||||
#include "crow/logging.h"
|
||||
#include "crow/dumb_timer_queue.h"
|
||||
#include "crow/task_timer.h"
|
||||
#include "crow/utility.h"
|
||||
#include "crow/common.h"
|
||||
#include "crow/http_request.h"
|
||||
|
@ -18,7 +18,7 @@
|
||||
#include "crow/middleware_context.h"
|
||||
#include "crow/http_request.h"
|
||||
#include "crow/http_server.h"
|
||||
#include "crow/dumb_timer_queue.h"
|
||||
#include "crow/task_timer.h"
|
||||
#ifdef CROW_ENABLE_COMPRESSION
|
||||
#include "crow/compression.h"
|
||||
#endif
|
||||
@ -35,10 +35,6 @@
|
||||
|
||||
namespace crow
|
||||
{
|
||||
#ifdef CROW_MAIN
|
||||
int detail::dumb_timer_queue::tick = 5;
|
||||
#endif
|
||||
|
||||
#ifdef CROW_ENABLE_SSL
|
||||
using ssl_context_t = boost::asio::ssl::context;
|
||||
#endif
|
||||
@ -131,7 +127,7 @@ namespace crow
|
||||
///Set the connection timeout in seconds (default is 5)
|
||||
self_t& timeout(std::uint8_t timeout)
|
||||
{
|
||||
detail::dumb_timer_queue::tick = timeout;
|
||||
timeout_ = timeout;
|
||||
return *this;
|
||||
}
|
||||
|
||||
@ -284,7 +280,7 @@ namespace crow
|
||||
#ifdef CROW_ENABLE_SSL
|
||||
if (use_ssl_)
|
||||
{
|
||||
ssl_server_ = std::move(std::unique_ptr<ssl_server_t>(new ssl_server_t(this, bindaddr_, port_, server_name_, &middlewares_, concurrency_, &ssl_context_)));
|
||||
ssl_server_ = std::move(std::unique_ptr<ssl_server_t>(new ssl_server_t(this, bindaddr_, port_, server_name_, &middlewares_, concurrency_, timeout_, &ssl_context_)));
|
||||
ssl_server_->set_tick_function(tick_interval_, tick_function_);
|
||||
notify_server_start();
|
||||
ssl_server_->run();
|
||||
@ -292,7 +288,7 @@ namespace crow
|
||||
else
|
||||
#endif
|
||||
{
|
||||
server_ = std::move(std::unique_ptr<server_t>(new server_t(this, bindaddr_, port_, server_name_, &middlewares_, concurrency_, nullptr)));
|
||||
server_ = std::move(std::unique_ptr<server_t>(new server_t(this, bindaddr_, port_, server_name_, &middlewares_, concurrency_, timeout_, nullptr)));
|
||||
server_->set_tick_function(tick_interval_, tick_function_);
|
||||
server_->signal_clear();
|
||||
for (auto snum : signals_)
|
||||
@ -424,6 +420,7 @@ namespace crow
|
||||
}
|
||||
|
||||
private:
|
||||
std::uint8_t timeout_{5};
|
||||
uint16_t port_ = 80;
|
||||
uint16_t concurrency_ = 1;
|
||||
bool validated_ = false;
|
||||
|
@ -1,83 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <deque>
|
||||
#include <functional>
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
|
||||
#include "crow/logging.h"
|
||||
|
||||
namespace crow
|
||||
{
|
||||
namespace detail
|
||||
{
|
||||
|
||||
/// Fast timer queue for fixed tick value.
|
||||
class dumb_timer_queue
|
||||
{
|
||||
public:
|
||||
static int tick;
|
||||
using key = std::pair<dumb_timer_queue*, int>;
|
||||
|
||||
void cancel(key& k)
|
||||
{
|
||||
auto self = k.first;
|
||||
k.first = nullptr;
|
||||
if (!self)
|
||||
return;
|
||||
|
||||
unsigned int index = static_cast<unsigned>(k.second - self->step_);
|
||||
if (index < self->dq_.size())
|
||||
self->dq_[index].second = nullptr;
|
||||
}
|
||||
|
||||
/// Add a function to the queue.
|
||||
key add(std::function<void()> f)
|
||||
{
|
||||
dq_.emplace_back(std::chrono::steady_clock::now(), std::move(f));
|
||||
int ret = step_+dq_.size()-1;
|
||||
|
||||
CROW_LOG_DEBUG << "timer add inside: " << this << ' ' << ret ;
|
||||
return {this, ret};
|
||||
}
|
||||
|
||||
/// Process the queue: take functions out in time intervals and execute them.
|
||||
void process()
|
||||
{
|
||||
if (!io_service_)
|
||||
return;
|
||||
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
while(!dq_.empty())
|
||||
{
|
||||
auto& x = dq_.front();
|
||||
if (now - x.first < std::chrono::seconds(tick))
|
||||
break;
|
||||
if (x.second)
|
||||
{
|
||||
CROW_LOG_DEBUG << "timer call: " << this << ' ' << step_;
|
||||
// we know that timer handlers are very simple currenty; call here
|
||||
x.second();
|
||||
}
|
||||
dq_.pop_front();
|
||||
step_++;
|
||||
}
|
||||
}
|
||||
|
||||
void set_io_service(boost::asio::io_service& io_service)
|
||||
{
|
||||
io_service_ = &io_service;
|
||||
}
|
||||
|
||||
dumb_timer_queue() noexcept
|
||||
{
|
||||
}
|
||||
|
||||
private:
|
||||
boost::asio::io_service* io_service_{};
|
||||
std::deque<std::pair<decltype(std::chrono::steady_clock::now()), std::function<void()>>> dq_;
|
||||
int step_{};
|
||||
};
|
||||
}
|
||||
}
|
@ -14,7 +14,7 @@
|
||||
#include "crow/http_response.h"
|
||||
#include "crow/logging.h"
|
||||
#include "crow/settings.h"
|
||||
#include "crow/dumb_timer_queue.h"
|
||||
#include "crow/task_timer.h"
|
||||
#include "crow/middleware_context.h"
|
||||
#include "crow/socket_adaptors.h"
|
||||
#include "crow/compression.h"
|
||||
@ -193,7 +193,7 @@ namespace crow
|
||||
const std::string& server_name,
|
||||
std::tuple<Middlewares...>* middlewares,
|
||||
std::function<std::string()>& get_cached_date_str_f,
|
||||
detail::dumb_timer_queue& timer_queue,
|
||||
detail::task_timer& task_timer,
|
||||
typename Adaptor::context* adaptor_ctx_
|
||||
)
|
||||
: adaptor_(io_service, adaptor_ctx_),
|
||||
@ -202,7 +202,7 @@ namespace crow
|
||||
server_name_(server_name),
|
||||
middlewares_(middlewares),
|
||||
get_cached_date_str(get_cached_date_str_f),
|
||||
timer_queue(timer_queue),
|
||||
task_timer_(task_timer),
|
||||
res_stream_threshold_(handler->stream_threshold())
|
||||
{
|
||||
#ifdef CROW_ENABLE_DEBUG
|
||||
@ -653,15 +653,15 @@ namespace crow
|
||||
|
||||
void cancel_deadline_timer()
|
||||
{
|
||||
CROW_LOG_DEBUG << this << " timer cancelled: " << timer_cancel_key_.first << ' ' << timer_cancel_key_.second;
|
||||
timer_queue.cancel(timer_cancel_key_);
|
||||
CROW_LOG_DEBUG << this << " timer cancelled: " << &task_timer_ << ' ' << task_id_;
|
||||
task_timer_.cancel(task_id_);
|
||||
}
|
||||
|
||||
void start_deadline(/*int timeout = 5*/)
|
||||
{
|
||||
cancel_deadline_timer();
|
||||
|
||||
timer_cancel_key_ = timer_queue.add([this]
|
||||
|
||||
task_id_ = task_timer_.set_timeout([this]
|
||||
{
|
||||
if (!adaptor_.is_open())
|
||||
{
|
||||
@ -670,7 +670,7 @@ namespace crow
|
||||
adaptor_.shutdown_readwrite();
|
||||
adaptor_.close();
|
||||
});
|
||||
CROW_LOG_DEBUG << this << " timer added: " << timer_cancel_key_.first << ' ' << timer_cancel_key_.second;
|
||||
CROW_LOG_DEBUG << this << " timer added: " << &task_timer_ << ' ' << task_id_;
|
||||
}
|
||||
|
||||
private:
|
||||
@ -692,8 +692,7 @@ namespace crow
|
||||
std::string date_str_;
|
||||
std::string res_body_copy_;
|
||||
|
||||
//boost::asio::deadline_timer deadline_;
|
||||
detail::dumb_timer_queue::key timer_cancel_key_;
|
||||
detail::task_timer::identifier_type task_id_;
|
||||
|
||||
bool is_reading{};
|
||||
bool is_writing{};
|
||||
@ -705,7 +704,7 @@ namespace crow
|
||||
detail::context<Middlewares...> ctx_;
|
||||
|
||||
std::function<std::string()>& get_cached_date_str;
|
||||
detail::dumb_timer_queue& timer_queue;
|
||||
detail::task_timer& task_timer_;
|
||||
|
||||
size_t res_stream_threshold_;
|
||||
};
|
||||
|
@ -16,7 +16,7 @@
|
||||
#include "crow/version.h"
|
||||
#include "crow/http_connection.h"
|
||||
#include "crow/logging.h"
|
||||
#include "crow/dumb_timer_queue.h"
|
||||
#include "crow/task_timer.h"
|
||||
|
||||
namespace crow
|
||||
{
|
||||
@ -27,7 +27,7 @@ namespace crow
|
||||
class Server
|
||||
{
|
||||
public:
|
||||
Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = std::string("Crow/") + VERSION, std::tuple<Middlewares...>* middlewares = nullptr, uint16_t concurrency = 1, typename Adaptor::context* adaptor_ctx = nullptr)
|
||||
Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = std::string("Crow/") + VERSION, std::tuple<Middlewares...>* middlewares = nullptr, uint16_t concurrency = 1, uint8_t timeout = 5, typename Adaptor::context* adaptor_ctx = nullptr)
|
||||
: acceptor_(io_service_, tcp::endpoint(boost::asio::ip::address::from_string(bindaddr), port)),
|
||||
signals_(io_service_, SIGINT, SIGTERM),
|
||||
tick_timer_(io_service_),
|
||||
@ -37,7 +37,8 @@ namespace crow
|
||||
port_(port),
|
||||
bindaddr_(bindaddr),
|
||||
middlewares_(middlewares),
|
||||
adaptor_ctx_(adaptor_ctx)
|
||||
adaptor_ctx_(adaptor_ctx),
|
||||
timeout_(timeout)
|
||||
{
|
||||
}
|
||||
|
||||
@ -64,7 +65,7 @@ namespace crow
|
||||
for(int i = 0; i < concurrency_; i++)
|
||||
io_service_pool_.emplace_back(new boost::asio::io_service());
|
||||
get_cached_date_str_pool_.resize(concurrency_);
|
||||
timer_queue_pool_.resize(concurrency_);
|
||||
task_timer_pool_.resize(concurrency_);
|
||||
|
||||
std::vector<std::future<void>> v;
|
||||
std::atomic<int> init_count(0);
|
||||
@ -101,23 +102,10 @@ namespace crow
|
||||
return date_str;
|
||||
};
|
||||
|
||||
// initializing timer queue
|
||||
detail::dumb_timer_queue timer_queue;
|
||||
timer_queue_pool_[i] = &timer_queue;
|
||||
|
||||
timer_queue.set_io_service(*io_service_pool_[i]);
|
||||
boost::asio::deadline_timer timer(*io_service_pool_[i]);
|
||||
timer.expires_from_now(boost::posix_time::seconds(1));
|
||||
|
||||
std::function<void(const boost::system::error_code& ec)> handler;
|
||||
handler = [&](const boost::system::error_code& ec){
|
||||
if (ec)
|
||||
return;
|
||||
timer_queue.process();
|
||||
timer.expires_from_now(boost::posix_time::seconds(1));
|
||||
timer.async_wait(handler);
|
||||
};
|
||||
timer.async_wait(handler);
|
||||
// initializing task timers
|
||||
detail::task_timer task_timer(*io_service_pool_[i]);
|
||||
task_timer.set_default_timeout(timeout_);
|
||||
task_timer_pool_[i] = &task_timer;
|
||||
|
||||
init_count ++;
|
||||
while(1)
|
||||
@ -202,8 +190,7 @@ namespace crow
|
||||
asio::io_service& is = pick_io_service();
|
||||
auto p = new Connection<Adaptor, Handler, Middlewares...>(
|
||||
is, handler_, server_name_, middlewares_,
|
||||
get_cached_date_str_pool_[roundrobin_index_], *timer_queue_pool_[roundrobin_index_],
|
||||
adaptor_ctx_);
|
||||
get_cached_date_str_pool_[roundrobin_index_], *task_timer_pool_[roundrobin_index_], adaptor_ctx_);
|
||||
acceptor_.async_accept(p->socket(),
|
||||
[this, p, &is](boost::system::error_code ec)
|
||||
{
|
||||
@ -225,7 +212,7 @@ namespace crow
|
||||
private:
|
||||
asio::io_service io_service_;
|
||||
std::vector<std::unique_ptr<asio::io_service>> io_service_pool_;
|
||||
std::vector<detail::dumb_timer_queue*> timer_queue_pool_;
|
||||
std::vector<detail::task_timer*> task_timer_pool_;
|
||||
std::vector<std::function<std::string()>> get_cached_date_str_pool_;
|
||||
tcp::acceptor acceptor_;
|
||||
boost::asio::signal_set signals_;
|
||||
@ -233,6 +220,7 @@ namespace crow
|
||||
|
||||
Handler* handler_;
|
||||
uint16_t concurrency_{1};
|
||||
std::uint8_t timeout_;
|
||||
std::string server_name_;
|
||||
uint16_t port_;
|
||||
std::string bindaddr_;
|
||||
|
138
include/crow/task_timer.h
Normal file
138
include/crow/task_timer.h
Normal file
@ -0,0 +1,138 @@
|
||||
#pragma once
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <chrono>
|
||||
#include <functional>
|
||||
#include <map>
|
||||
#include <vector>
|
||||
|
||||
#include "crow/logging.h"
|
||||
|
||||
namespace crow {
|
||||
namespace detail {
|
||||
/**
|
||||
* @brief A class for scheduling functions to be called after a specific amount
|
||||
* of ticks. A tick is equal to 1 second.
|
||||
*
|
||||
*/
|
||||
class task_timer
|
||||
{
|
||||
public:
|
||||
using task_type = std::function<void()>;
|
||||
using identifier_type = size_t;
|
||||
|
||||
private:
|
||||
using clock_type = std::chrono::steady_clock;
|
||||
using time_type = clock_type::time_point;
|
||||
|
||||
public:
|
||||
task_timer(boost::asio::io_service& io_service)
|
||||
: io_service_(io_service), deadline_timer_(io_service_)
|
||||
{
|
||||
deadline_timer_.expires_from_now(boost::posix_time::seconds(1));
|
||||
deadline_timer_.async_wait(
|
||||
std::bind(&task_timer::tick_handler, this, std::placeholders::_1));
|
||||
}
|
||||
|
||||
~task_timer() { deadline_timer_.cancel(); }
|
||||
|
||||
void cancel(identifier_type id)
|
||||
{
|
||||
tasks_.erase(id);
|
||||
CROW_LOG_DEBUG << "timer cancelled: " << this << ' ' << id;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Schedule the given task to be executed after the default amount of
|
||||
* ticks.
|
||||
*
|
||||
* @return identifier_type Used to cancel the thread.
|
||||
* It is not bound to this task_timer instance and in some cases could lead to
|
||||
* undefined behavior if used with other task_timer objects or after the task
|
||||
* has been successfully executed.
|
||||
*/
|
||||
identifier_type set_timeout(const task_type& task)
|
||||
{
|
||||
tasks_.insert(
|
||||
{++highest_id_,
|
||||
{clock_type::now() + std::chrono::seconds(get_default_timeout()),
|
||||
task}});
|
||||
CROW_LOG_DEBUG << "timer add inside: " << this << ' ' << highest_id_;
|
||||
return highest_id_;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Schedule the given task to be executed after the given time.
|
||||
*
|
||||
* @param timeout The amount of ticks (seconds) to wait before execution.
|
||||
*
|
||||
* @return identifier_type Used to cancel the thread.
|
||||
* It is not bound to this task_timer instance and in some cases could lead to
|
||||
* undefined behavior if used with other task_timer objects or after the task
|
||||
* has been successfully executed.
|
||||
*/
|
||||
identifier_type set_timeout(const task_type& task, std::uint8_t timeout)
|
||||
{
|
||||
tasks_.insert({++highest_id_,
|
||||
{clock_type::now() + std::chrono::seconds(timeout), task}});
|
||||
CROW_LOG_DEBUG << "timer add inside: " << this << ' ' << highest_id_;
|
||||
return highest_id_;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Set the default timeout for this task_timer instance. (Default: 5)
|
||||
*
|
||||
* @param timeout The amount of ticks (seconds) to wait before execution.
|
||||
*/
|
||||
void set_default_timeout(std::uint8_t timeout) { default_timeout_ = timeout; }
|
||||
|
||||
/**
|
||||
* @brief Get the default timeout. (Default: 5)
|
||||
*
|
||||
* @return std::uint8_t
|
||||
*/
|
||||
std::uint8_t get_default_timeout() const { return default_timeout_; }
|
||||
|
||||
private:
|
||||
void process_tasks()
|
||||
{
|
||||
time_type current_time = clock_type::now();
|
||||
std::vector<identifier_type> finished_tasks;
|
||||
|
||||
for (const auto& task : tasks_) {
|
||||
if (task.second.first < current_time) {
|
||||
(task.second.second)();
|
||||
finished_tasks.push_back(task.first);
|
||||
CROW_LOG_DEBUG << "timer call: " << this << ' ' << task.first;
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& task : finished_tasks) tasks_.erase(task);
|
||||
|
||||
// If no task is currently scheduled, reset the issued ids back to 0.
|
||||
if (tasks_.empty()) highest_id_ = 0;
|
||||
}
|
||||
|
||||
void tick_handler(const boost::system::error_code& ec)
|
||||
{
|
||||
if (ec) return;
|
||||
|
||||
process_tasks();
|
||||
|
||||
deadline_timer_.expires_from_now(boost::posix_time::seconds(1));
|
||||
deadline_timer_.async_wait(
|
||||
std::bind(&task_timer::tick_handler, this, std::placeholders::_1));
|
||||
}
|
||||
|
||||
private:
|
||||
std::uint8_t default_timeout_{5};
|
||||
boost::asio::io_service& io_service_;
|
||||
boost::asio::deadline_timer deadline_timer_;
|
||||
std::map<identifier_type, std::pair<time_type, task_type>> tasks_;
|
||||
|
||||
// A continuosly increasing number to be issued to threads to identify them.
|
||||
// If no tasks are scheduled, it will be reset to 0.
|
||||
identifier_type highest_id_{0};
|
||||
};
|
||||
} // namespace detail
|
||||
} // namespace crow
|
Loading…
Reference in New Issue
Block a user