Compare commits

..

No commits in common. "06cbd98b77615f66c8a568ed77ca25518d76cc0c" and "7c2ac8456e01317c6d763d59121e5b33895d12e9" have entirely different histories.

7 changed files with 107 additions and 123 deletions

View File

@ -14,7 +14,6 @@ RUN apk update && \
libstdc++ \ libstdc++ \
libtool \ libtool \
linux-headers \ linux-headers \
ninja \
m4 \ m4 \
perl \ perl \
python3 \ python3 \

View File

@ -126,18 +126,19 @@ void fractal(
void handle_request_http(const address_type& addr, const request_type& req, bmrshared::web::response_promise promise) override void handle_request_http(const address_type& addr, const request_type& req, bmrshared::web::response_promise promise) override
{ {
std::string target = req.target(); std::string_view target = req.target();
std::cout << "HTTP GET: " << target << std::endl; std::cout << "HTTP GET: " << target << std::endl;
if (req.method() != boost::beast::http::verb::get) if (req.method() != boost::beast::http::verb::get)
{ {
// Other methods are not supported for directory acces // Other methods are not supported for directory acces
// //
auto& bad_request = promise.CreateResponse<boost::beast::http::response<boost::beast::http::string_body>>(boost::beast::http::status::bad_request, req.version()); boost::beast::http::response<boost::beast::http::string_body> bad_request{boost::beast::http::status::bad_request, req.version()};
bad_request.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); bad_request.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING);
bad_request.set(boost::beast::http::field::content_type, "text/plain"); bad_request.set(boost::beast::http::field::content_type, "text/plain");
bad_request.prepare_payload(); bad_request.prepare_payload();
bad_request.body() = "Bad request type.\nOnly GET and HEAD are expected for this URL."; bad_request.body() = "Bad request type.\nOnly GET and HEAD are expected for this URL.";
bad_request.keep_alive(req.keep_alive()); bad_request.keep_alive(req.keep_alive());
promise.SendResponse(std::move(bad_request));
return; return;
} }
@ -149,12 +150,11 @@ void fractal(
double offset_x = std::stoi(base_match[2]); double offset_x = std::stoi(base_match[2]);
double z = std::stoi(base_match[3]); double z = std::stoi(base_match[3]);
auto renderfn = [offset_y, offset_x, z, req, target, promise]() mutable auto renderfn = [offset_y, offset_x, z, req, promise]() mutable
{ {
std::cout << "HTTP inside render function " << target << std::endl;
constexpr int pixel_width = 256; constexpr int pixel_width = 256;
constexpr int pixel_height = 256; constexpr int pixel_height = 256;
constexpr int max_iterations = 255; constexpr int max_iterations = 100;
boost::gil::gray8_image_t image(pixel_width,pixel_height); boost::gil::gray8_image_t image(pixel_width,pixel_height);
@ -188,14 +188,13 @@ void fractal(
boost::gil::view(image), boost::gil::view(image),
boost::gil::image_write_info<boost::gil::jpeg_tag>(95)); boost::gil::image_write_info<boost::gil::jpeg_tag>(95));
auto& ok = promise.CreateResponse<boost::beast::http::response<boost::beast::http::string_body>>(boost::beast::http::status::ok, req.version()); boost::beast::http::response<boost::beast::http::string_body> ok{boost::beast::http::status::ok, req.version()};
ok.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); ok.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING);
ok.set(boost::beast::http::field::content_type, "image/jpeg"); ok.set(boost::beast::http::field::content_type, "image/jpeg");
ok.body() = out_buffer.str(); ok.body() = out_buffer.str();
ok.keep_alive(req.keep_alive()); ok.keep_alive(req.keep_alive());
ok.prepare_payload(); ok.prepare_payload();
promise.SendResponse(std::move(ok));
std::cout << " DONE HTTP inside render function " << target << std::endl;
}; };
m_ioc.post(renderfn); m_ioc.post(renderfn);
@ -248,7 +247,7 @@ int main(int argc, char **argv)
}); });
std::vector<std::jthread> threads; std::vector<std::jthread> threads;
while(threads.size() < 4) while(threads.size() < 20)
{ {
threads.emplace_back([&ioc]{ioc.run();}); threads.emplace_back([&ioc]{ioc.run();});
} }

View File

@ -10,66 +10,31 @@
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <boost/beast.hpp> #include <boost/beast.hpp>
#include <functional> #include <functional>
#include <utility>
namespace bmrshared::web namespace bmrshared::web
{ {
class response_promise final class response_promise final
{ {
private:
class response_writer_interface
{
public:
virtual ~response_writer_interface() = default;
virtual void write_response(boost::beast::tcp_stream&) = 0;
};
template<typename TResponse>
class response_writer : public response_writer_interface
{
public:
template<typename... TArgs>
response_writer(TArgs... args)
: m_response(args...)
{}
~response_writer() override = default;
void write_response(boost::beast::tcp_stream& stream) override
{
boost::beast::http::write(stream, m_response);
}
TResponse& response()
{
return m_response;
}
private:
TResponse m_response;
};
public: public:
using response_sender = std::function<void(boost::beast::tcp_stream& stream)>; using response_sender = std::function<void(boost::beast::tcp_stream& stream)>;
using callback_on_response = std::function<void(response_sender fnSendResponse)>; using callback_on_response = std::function<void(const response_sender& fnSendResponse)>;
response_promise() = delete; response_promise() = delete;
explicit response_promise(callback_on_response cbOnResponse); explicit response_promise(callback_on_response cbOnResponse);
~response_promise(); ~response_promise();
template<typename TResponse, typename... TArgs> template<typename Body, typename Fields>
TResponse& CreateResponse(TArgs&&... args) void SendResponse(boost::beast::http::response<Body, Fields> response)
{ {
auto created = std::make_unique<response_writer<TResponse>>(std::forward<TArgs>(args)...); response_sender responder = [&response](boost::beast::tcp_stream& stream)
auto& resp = created->response(); {
m_response_writer = std::move(created); boost::beast::http::write(stream, response);
return resp; };
m_call_on_response(responder);
} }
private: private:
callback_on_response m_call_on_response; callback_on_response m_call_on_response;
std::shared_ptr<response_writer_interface> m_response_writer;
}; };
} // namespace bmrshared::web } // namespace bmrshared::web

View File

@ -50,12 +50,13 @@ void directory_request_handler::handle_request_http(const address_type& address,
{ {
// Other methods are not supported for directory acces // Other methods are not supported for directory acces
// //
auto& bad_request = promise.CreateResponse<http::response<http::string_body>>(http::status::bad_request, req.version()); http::response<http::string_body> bad_request{http::status::bad_request, req.version()};
bad_request.set(http::field::server, BOOST_BEAST_VERSION_STRING); bad_request.set(http::field::server, BOOST_BEAST_VERSION_STRING);
bad_request.set(http::field::content_type, "text/plain"); bad_request.set(http::field::content_type, "text/plain");
bad_request.prepare_payload(); bad_request.prepare_payload();
bad_request.body() = "Bad request type.\nOnly GET and HEAD are expected for this URL."; bad_request.body() = "Bad request type.\nOnly GET and HEAD are expected for this URL.";
bad_request.keep_alive(req.keep_alive()); bad_request.keep_alive(req.keep_alive());
promise.SendResponse(std::move(bad_request));
return; return;
} }
@ -85,12 +86,13 @@ void directory_request_handler::handle_request_http(const address_type& address,
if (!found_file) if (!found_file)
{ {
auto& not_found = promise.CreateResponse<http::response<http::string_body>>(http::status::not_found, req.version()); http::response<http::string_body> not_found{http::status::not_found, req.version()};
not_found.set(http::field::server, BOOST_BEAST_VERSION_STRING); not_found.set(http::field::server, BOOST_BEAST_VERSION_STRING);
not_found.set(http::field::content_type, "text/plain"); not_found.set(http::field::content_type, "text/plain");
not_found.prepare_payload(); not_found.prepare_payload();
not_found.body() = "File not found."; not_found.body() = "File not found.";
not_found.keep_alive(req.keep_alive()); not_found.keep_alive(req.keep_alive());
promise.SendResponse(std::move(not_found));
return; return;
} }
@ -100,12 +102,13 @@ void directory_request_handler::handle_request_http(const address_type& address,
body.open(found_file->native().c_str(), beast::file_mode::scan, ec); body.open(found_file->native().c_str(), beast::file_mode::scan, ec);
if (ec) if (ec)
{ {
auto& internal = promise.CreateResponse<http::response<http::string_body>>(http::status::internal_server_error, req.version()); http::response<http::string_body> internal{http::status::internal_server_error, req.version()};
internal.set(http::field::server, BOOST_BEAST_VERSION_STRING); internal.set(http::field::server, BOOST_BEAST_VERSION_STRING);
internal.set(http::field::content_type, "text/plain"); internal.set(http::field::content_type, "text/plain");
internal.prepare_payload(); internal.prepare_payload();
internal.body() = "Internal server error."; internal.body() = "Internal server error.";
internal.keep_alive(req.keep_alive()); internal.keep_alive(req.keep_alive());
promise.SendResponse(std::move(internal));
return; return;
} }
@ -116,9 +119,10 @@ void directory_request_handler::handle_request_http(const address_type& address,
if (req.count(http::field::if_modified_since) != 0 && (req[http::field::if_modified_since] == last_modified)) if (req.count(http::field::if_modified_since) != 0 && (req[http::field::if_modified_since] == last_modified))
{ {
auto& head = promise.CreateResponse<http::response<http::string_body>>(http::status::not_modified, req.version()); http::response<http::string_body> head{http::status::not_modified, req.version()};
head.set(http::field::server, BOOST_BEAST_VERSION_STRING); head.set(http::field::server, BOOST_BEAST_VERSION_STRING);
head.keep_alive(req.keep_alive()); head.keep_alive(req.keep_alive());
promise.SendResponse(std::move(head));
return; return;
} }
@ -126,27 +130,29 @@ void directory_request_handler::handle_request_http(const address_type& address,
// Okay, we have a file so let us send the correct response. // Okay, we have a file so let us send the correct response.
if (req.method() == http::verb::head) if (req.method() == http::verb::head)
{ {
auto& head = promise.CreateResponse<http::response<http::string_body>>(http::status::ok, req.version()); http::response<http::string_body> head{http::status::ok, req.version()};
head.set(http::field::server, BOOST_BEAST_VERSION_STRING); head.set(http::field::server, BOOST_BEAST_VERSION_STRING);
head.set(http::field::content_type, mime_type); head.set(http::field::content_type, mime_type);
head.set(http::field::last_modified, last_modified); head.set(http::field::last_modified, last_modified);
head.set(http::field::cache_control, "public, max-age=360"); head.set(http::field::cache_control, "public, max-age=360");
head.set(http::field::content_length, std::to_string(file_size)); head.set(http::field::content_length, std::to_string(file_size));
head.keep_alive(req.keep_alive()); head.keep_alive(req.keep_alive());
promise.SendResponse(std::move(head));
return; return;
} }
else if (req.method() == http::verb::get) else if (req.method() == http::verb::get)
{/* {
auto& head = promise.CreateResponse<http::response<http::file_body>>(std::piecewise_construct, http::response<http::file_body> head{std::piecewise_construct,
std::make_tuple(std::move(body)), std::make_tuple(std::move(body)),
std::make_tuple(http::status::ok, req.version())); std::make_tuple(http::status::ok, req.version())};
head.set(http::field::server, BOOST_BEAST_VERSION_STRING); head.set(http::field::server, BOOST_BEAST_VERSION_STRING);
head.set(http::field::content_type, mime_type); head.set(http::field::content_type, mime_type);
head.set(http::field::last_modified, last_modified); head.set(http::field::last_modified, last_modified);
head.set(http::field::cache_control, "public, max-age=360"); head.set(http::field::cache_control, "public, max-age=360");
head.set(http::field::content_length, std::to_string(file_size)); head.set(http::field::content_length, std::to_string(file_size));
head.keep_alive(req.keep_alive());*/ head.keep_alive(req.keep_alive());
promise.SendResponse(std::move(head));
return; return;
} }
} }

View File

@ -73,24 +73,6 @@ void internal_server::do_accept(boost::system::error_code ec, boost::asio::ip::t
void internal_server::session_process_queued() void internal_server::session_process_queued()
{ {
// Send out all processed responses.
for (auto& session : m_sessions)
{
if (session)
{
http_session& s = *session;
auto& our = s.m_outstanding_requests;
while (!our.empty() && our.front() && our.front()->m_response_sender)
{
auto& resp = our.front();
(*resp->m_response_sender)(s.stream);
our.pop();
}
}
}
// Determine how many outstanding requests we have. // Determine how many outstanding requests we have.
std::size_t count_responding = 0; std::size_t count_responding = 0;
for(const auto& session : m_sessions) for(const auto& session : m_sessions)
@ -103,14 +85,78 @@ void internal_server::session_process_queued()
{ {
for(auto& session : m_sessions) for(auto& session : m_sessions)
{ {
if (!session || (count_responding > m_max_simultaneous_requests)) if (count_responding > m_max_simultaneous_requests)
{ {
// do nothing. break;
} }
else if (session->m_state == session_state::queue_reading) else if (session && (session->m_state == session_state::queue_reading))
{ {
http_session& s = *session; http_session& s = *session;
s.m_state = session_state::reading; s.state = session_state::reading;
s.parser.emplace(); // Create empty parser object.
s.parser->body_limit(m_request_body_limit);
s.stream.expires_after(m_incoming_request_timeout);
auto handler = [weak_server = weak_from_this(), session](boost::beast::error_code ec, std::size_t size)
{
auto shared_server = weak_server.lock();
if (shared_server)
{
shared_server->session_on_read(session, ec, size);
}
};
boost::beast::http::async_read(s.stream, s.buffer, *s.parser, handler);
}
}
}
// First empty out all sessions queued for getting a response as much as we are allowed.
auto iter = m_sessions.begin();
while ((count_responding < m_max_simultaneous_requests) && (iter != m_sessions.end()))
{
iter = std::find_if(
iter,
m_sessions.end(),
[](std::shared_ptr<http_session> session)
{
return session && (session->state == session_state::queue_responding);
});
if (iter != m_sessions.end())
{
auto& session = *iter;
session->state = session_state::responding;
++count_responding;
// Regular http request. Pass to request handler.
auto finalize_response = [session, weak_server = weak_from_this()](const response_promise::response_sender& rs)
{
auto shared_server = weak_server.lock();
if (shared_server)
{
rs(session->stream);
// When we receive the response and have sent it, we proceed with reading
shared_server->session_finalize_response(session);
}
};
const auto& address = session->stream.socket().remote_endpoint().address();
m_handler.handle_request_http(address, session->parser->get(), response_promise(finalize_response));
}
}
if (count_responding < m_max_simultaneous_requests)
{
// Now we can dispatch reading.
for (const auto& session : m_sessions)
{
if (session && (session->state == session_state::queue_reading))
{
http_session& s = *session;
s.state = session_state::reading;
s.parser.emplace(); // Create empty parser object. s.parser.emplace(); // Create empty parser object.
s.parser->body_limit(m_request_body_limit); s.parser->body_limit(m_request_body_limit);
s.stream.expires_after(m_incoming_request_timeout); s.stream.expires_after(m_incoming_request_timeout);
@ -156,27 +202,8 @@ void internal_server::session_on_read(std::shared_ptr<http_session> session,
} }
else else
{ {
std::shared_ptr<http_request> req = s.m_outstanding_requests.emplace(std::make_shared<http_request>()); // Just a regular http request. Queue for handling later.
s.state = session_state::queue_responding;
// A regular HTTP request.
auto finalize_response = [session, weak_server = weak_from_this(), req](const response_promise::response_sender& rs)
{
// We have a response, add it to our spot in the queue.
auto shared_server = weak_server.lock();
if (shared_server)
{
// We have a response, add it to our spot in the queue.
req->m_response_sender = rs;
// Queue processing of the queue so we can continue.
shared_server->m_io_context.post([shared_server]{shared_server->session_process_queued();});
}
};
const auto& address = session->stream.socket().remote_endpoint().address();
m_handler.handle_request_http(address, session->parser->get(), response_promise(finalize_response));
s.m_state = session_state::queue_reading;
session_process_queued(); session_process_queued();
} }
} }
@ -185,7 +212,7 @@ void internal_server::session_finalize_response(std::shared_ptr<http_session> se
{ {
if(session) if(session)
{ {
session->m_state = session_state::queue_reading; session->state = session_state::queue_reading;
session_process_queued(); session_process_queued();
} }
} }

View File

@ -21,11 +21,6 @@ namespace detail {
reading reading
}; };
struct http_request
{
std::optional<response_promise::response_sender> m_response_sender;
};
struct http_session { struct http_session {
explicit http_session(boost::asio::ip::tcp::socket s) explicit http_session(boost::asio::ip::tcp::socket s)
: stream(std::move(s)) : stream(std::move(s))
@ -34,9 +29,12 @@ namespace detail {
boost::beast::flat_buffer buffer; boost::beast::flat_buffer buffer;
boost::optional<boost::beast::http::request_parser<boost::beast::http::string_body>> parser; boost::optional<boost::beast::http::request_parser<boost::beast::http::string_body>> parser;
session_state m_state; session_state m_state;
std::queue<std::shared_ptr<http_request>> m_outstanding_requests; std::queue<> m_outstanding_requests;
}; };
class internal_server final : public std::enable_shared_from_this<internal_server> { class internal_server final : public std::enable_shared_from_this<internal_server> {
public: public:
using address_type = bmrshared::web::server::address_type; using address_type = bmrshared::web::server::address_type;

View File

@ -14,14 +14,4 @@ response_promise::response_promise(callback_on_response cbOnResponse)
{ {
} }
response_promise::~response_promise() response_promise::~response_promise() = default;
{
if (m_response_writer)
{
m_call_on_response(
[r = m_response_writer](boost::beast::tcp_stream& stream)
{
r->write_response(stream);
});
}
}