#pragma once #include #include #include #ifdef CROW_ENABLE_SSL #include #endif #include #include #include #include #include #include "crow/http_connection.h" #include "crow/logging.h" #include "crow/dumb_timer_queue.h" namespace crow { using namespace boost; using tcp = asio::ip::tcp; template class Server { public: Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = "Crow/0.2", std::tuple* middlewares = nullptr, uint16_t concurrency = 1, typename Adaptor::context* adaptor_ctx = nullptr) : acceptor_(io_service_, tcp::endpoint(boost::asio::ip::address::from_string(bindaddr), port)), signals_(io_service_, SIGINT, SIGTERM), tick_timer_(io_service_), handler_(handler), concurrency_(concurrency == 0 ? 1 : concurrency), server_name_(server_name), port_(port), bindaddr_(bindaddr), middlewares_(middlewares), adaptor_ctx_(adaptor_ctx) { } void set_tick_function(std::chrono::milliseconds d, std::function f) { tick_interval_ = d; tick_function_ = f; } void on_tick() { tick_function_(); tick_timer_.expires_from_now(boost::posix_time::milliseconds(tick_interval_.count())); tick_timer_.async_wait([this](const boost::system::error_code& ec) { if (ec) return; on_tick(); }); } void run() { for(int i = 0; i < concurrency_; i++) io_service_pool_.emplace_back(new boost::asio::io_service()); get_cached_date_str_pool_.resize(concurrency_); timer_queue_pool_.resize(concurrency_); std::vector> v; std::atomic init_count(0); for(uint16_t i = 0; i < concurrency_; i ++) v.push_back( std::async(std::launch::async, [this, i, &init_count]{ // thread local date string get function auto last = std::chrono::steady_clock::now(); std::string date_str; auto update_date_str = [&] { auto last_time_t = time(0); tm my_tm; #if defined(_MSC_VER) or defined(__MINGW32__) gmtime_s(&my_tm, &last_time_t); #else gmtime_r(&last_time_t, &my_tm); #endif date_str.resize(100); size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm); date_str.resize(date_str_sz); }; update_date_str(); get_cached_date_str_pool_[i] = [&]()->std::string { if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1)) { last = std::chrono::steady_clock::now(); update_date_str(); } return date_str; }; // initializing timer queue detail::dumb_timer_queue timer_queue; timer_queue_pool_[i] = &timer_queue; timer_queue.set_io_service(*io_service_pool_[i]); boost::asio::deadline_timer timer(*io_service_pool_[i]); timer.expires_from_now(boost::posix_time::seconds(1)); std::function handler; handler = [&](const boost::system::error_code& ec){ if (ec) return; timer_queue.process(); timer.expires_from_now(boost::posix_time::seconds(1)); timer.async_wait(handler); }; timer.async_wait(handler); init_count ++; while(1) { try { if (io_service_pool_[i]->run() == 0) { // when io_service.run returns 0, there are no more works to do. break; } } catch(std::exception& e) { CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what(); } } })); if (tick_function_ && tick_interval_.count() > 0) { tick_timer_.expires_from_now(boost::posix_time::milliseconds(tick_interval_.count())); tick_timer_.async_wait([this](const boost::system::error_code& ec) { if (ec) return; on_tick(); }); } CROW_LOG_INFO << server_name_ << " server is running at " << bindaddr_ <<":" << port_ << " using " << concurrency_ << " threads"; CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs."; signals_.async_wait( [&](const boost::system::error_code& /*error*/, int /*signal_number*/){ stop(); }); while(concurrency_ != init_count) std::this_thread::yield(); do_accept(); std::thread([this]{ io_service_.run(); CROW_LOG_INFO << "Exiting."; }).join(); } void stop() { io_service_.stop(); for(auto& io_service:io_service_pool_) io_service->stop(); } void signal_clear() { signals_.clear(); } void signal_add(int signal_number) { signals_.add(signal_number); } private: asio::io_service& pick_io_service() { // TODO load balancing roundrobin_index_++; if (roundrobin_index_ >= io_service_pool_.size()) roundrobin_index_ = 0; return *io_service_pool_[roundrobin_index_]; } void do_accept() { asio::io_service& is = pick_io_service(); auto p = new Connection( is, handler_, server_name_, middlewares_, get_cached_date_str_pool_[roundrobin_index_], *timer_queue_pool_[roundrobin_index_], adaptor_ctx_); acceptor_.async_accept(p->socket(), [this, p, &is](boost::system::error_code ec) { if (!ec) { is.post([p] { p->start(); }); } else { delete p; } do_accept(); }); } private: asio::io_service io_service_; std::vector> io_service_pool_; std::vector timer_queue_pool_; std::vector> get_cached_date_str_pool_; tcp::acceptor acceptor_; boost::asio::signal_set signals_; boost::asio::deadline_timer tick_timer_; Handler* handler_; uint16_t concurrency_{1}; std::string server_name_; uint16_t port_; std::string bindaddr_; unsigned int roundrobin_index_{}; std::chrono::milliseconds tick_interval_; std::function tick_function_; std::tuple* middlewares_; #ifdef CROW_ENABLE_SSL bool use_ssl_{false}; boost::asio::ssl::context ssl_context_{boost::asio::ssl::context::sslv23}; #endif typename Adaptor::context* adaptor_ctx_; }; }