improve performance by 2x

change to io_service per CPU model
This commit is contained in:
ipkn 2014-08-17 05:35:21 -04:00
parent 79bbdfebfa
commit daa3c82087
4 changed files with 58 additions and 27 deletions

View File

@ -6,7 +6,7 @@
class ExampleLogHandler : public crow::ILogHandler { class ExampleLogHandler : public crow::ILogHandler {
public: public:
void log(string message, crow::LogLevel level) override { void log(string message, crow::LogLevel level) override {
cerr << "ExampleLogHandler -> " << message; // cerr << "ExampleLogHandler -> " << message;
} }
}; };
@ -68,7 +68,8 @@ int main()
return crow::response{os.str()}; return crow::response{os.str()};
}); });
//crow::logger::setLogLevel(LogLevel::INFO); // ignore all log
crow::logger::setLogLevel(crow::LogLevel::CRITICAL);
//crow::logger::setHandler(std::make_shared<ExampleLogHandler>()); //crow::logger::setHandler(std::make_shared<ExampleLogHandler>());
app.port(18080) app.port(18080)

View File

@ -29,19 +29,17 @@ namespace crow
k.first = nullptr; k.first = nullptr;
if (!self) if (!self)
return; return;
self->mutex_.lock();
unsigned int index = (unsigned int)(k.second - self->step_); unsigned int index = (unsigned int)(k.second - self->step_);
if (index < self->dq_.size()) if (index < self->dq_.size())
self->dq_[index].second = nullptr; self->dq_[index].second = nullptr;
self->mutex_.unlock();
} }
key add(std::function<void()> f) key add(std::function<void()> f)
{ {
mutex_.lock();
dq_.emplace_back(std::chrono::steady_clock::now(), std::move(f)); dq_.emplace_back(std::chrono::steady_clock::now(), std::move(f));
int ret = step_+dq_.size()-1; int ret = step_+dq_.size()-1;
mutex_.unlock();
CROW_LOG_DEBUG << "timer add inside: " << this << ' ' << ret ; CROW_LOG_DEBUG << "timer add inside: " << this << ' ' << ret ;
return {this, ret}; return {this, ret};
} }
@ -50,7 +48,7 @@ namespace crow
{ {
if (!io_service_) if (!io_service_)
return; return;
mutex_.lock();
auto now = std::chrono::steady_clock::now(); auto now = std::chrono::steady_clock::now();
while(!dq_.empty()) while(!dq_.empty())
{ {
@ -60,13 +58,12 @@ namespace crow
if (x.second) if (x.second)
{ {
CROW_LOG_DEBUG << "timer call: " << this << ' ' << step_; CROW_LOG_DEBUG << "timer call: " << this << ' ' << step_;
//io_service_->post(std::move(x.second)); // we know that timer handlers are very simple currenty; call here
x.second(); x.second();
} }
dq_.pop_front(); dq_.pop_front();
step_++; step_++;
} }
mutex_.unlock();
} }
void set_io_service(boost::asio::io_service& io_service) void set_io_service(boost::asio::io_service& io_service)
@ -82,7 +79,6 @@ namespace crow
int tick{5}; int tick{5};
boost::asio::io_service* io_service_{}; boost::asio::io_service* io_service_{};
std::deque<std::pair<decltype(std::chrono::steady_clock::now()), std::function<void()>>> dq_; std::deque<std::pair<decltype(std::chrono::steady_clock::now()), std::function<void()>>> dq_;
std::mutex mutex_;
int step_{}; int step_{};
}; };
} }

View File

@ -26,8 +26,8 @@ namespace crow
class Connection class Connection
{ {
public: public:
Connection(tcp::socket&& socket, Handler* handler, const std::string& server_name) Connection(boost::asio::io_service& io_service, Handler* handler, const std::string& server_name)
: socket_(std::move(socket)), : socket_(io_service),
handler_(handler), handler_(handler),
parser_(this), parser_(this),
server_name_(server_name) server_name_(server_name)
@ -48,6 +48,11 @@ namespace crow
#endif #endif
} }
tcp::socket& socket()
{
return socket_;
}
void start() void start()
{ {
//auto self = this->shared_from_this(); //auto self = this->shared_from_this();

View File

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <cstdint> #include <cstdint>
#include <atomic> #include <atomic>
@ -23,53 +24,80 @@ namespace crow
public: public:
Server(Handler* handler, uint16_t port, uint16_t concurrency = 1) Server(Handler* handler, uint16_t port, uint16_t concurrency = 1)
: acceptor_(io_service_, tcp::endpoint(asio::ip::address(), port)), : acceptor_(io_service_, tcp::endpoint(asio::ip::address(), port)),
socket_(io_service_),
signals_(io_service_, SIGINT, SIGTERM), signals_(io_service_, SIGINT, SIGTERM),
handler_(handler), handler_(handler),
concurrency_(concurrency), concurrency_(concurrency),
port_(port) port_(port)
{ {
do_accept();
} }
void run() void run()
{ {
if (concurrency_ < 0)
concurrency_ = 1;
for(int i = 0; i < concurrency_; i++)
io_service_pool_.emplace_back(new boost::asio::io_service());
std::vector<std::future<void>> v; std::vector<std::future<void>> v;
for(uint16_t i = 0; i < concurrency_; i ++) for(uint16_t i = 0; i < concurrency_; i ++)
v.push_back( v.push_back(
std::async(std::launch::async, [this]{ std::async(std::launch::async, [this, i]{
auto& timer_queue = detail::dumb_timer_queue::get_current_dumb_timer_queue(); auto& timer_queue = detail::dumb_timer_queue::get_current_dumb_timer_queue();
timer_queue.set_io_service(io_service_); timer_queue.set_io_service(*io_service_pool_[i]);
while(!io_service_.stopped()) boost::asio::deadline_timer timer(*io_service_pool_[i]);
{ timer.expires_from_now(boost::posix_time::seconds(1));
std::function<void(const boost::system::error_code& ec)> handler;
handler = [&](const boost::system::error_code& ec){
if (ec)
return;
timer_queue.process(); timer_queue.process();
io_service_.poll_one(); timer.expires_from_now(boost::posix_time::seconds(1));
} timer.async_wait(handler);
};
timer.async_wait(handler);
io_service_pool_[i]->run();
})); }));
CROW_LOG_INFO << server_name_ << " server is running, local port " << port_; CROW_LOG_INFO << server_name_ << " server is running, local port " << port_;
signals_.async_wait( signals_.async_wait(
[&](const boost::system::error_code& error, int signal_number){ [&](const boost::system::error_code& error, int signal_number){
io_service_.stop(); stop();
}); });
do_accept();
v.push_back(std::async(std::launch::async, [this]{
io_service_.run();
CROW_LOG_INFO << "Exiting.";
}));
} }
void stop() void stop()
{ {
io_service_.stop(); io_service_.stop();
for(auto& io_service:io_service_pool_)
io_service->stop();
} }
private: private:
asio::io_service& pick_io_service()
{
// TODO load balancing
roundrobin_index_++;
if (roundrobin_index_ >= io_service_pool_.size())
roundrobin_index_ = 0;
return *io_service_pool_[roundrobin_index_];
}
void do_accept() void do_accept()
{ {
acceptor_.async_accept(socket_, auto p = new Connection<Handler>(pick_io_service(), handler_, server_name_);
[this](boost::system::error_code ec) acceptor_.async_accept(p->socket(),
[this, p](boost::system::error_code ec)
{ {
if (!ec) if (!ec)
{ {
auto p = new Connection<Handler>(std::move(socket_), handler_, server_name_);
p->start(); p->start();
} }
do_accept(); do_accept();
@ -78,13 +106,14 @@ namespace crow
private: private:
asio::io_service io_service_; asio::io_service io_service_;
std::vector<std::unique_ptr<asio::io_service>> io_service_pool_;
tcp::acceptor acceptor_; tcp::acceptor acceptor_;
tcp::socket socket_;
boost::asio::signal_set signals_; boost::asio::signal_set signals_;
Handler* handler_; Handler* handler_;
uint16_t concurrency_{1}; uint16_t concurrency_{1};
std::string server_name_ = "Crow/0.1"; std::string server_name_ = "Crow/0.1";
uint16_t port_; uint16_t port_;
unsigned int roundrobin_index_{};
}; };
} }