Compare commits
No commits in common. "06cbd98b77615f66c8a568ed77ca25518d76cc0c" and "7c2ac8456e01317c6d763d59121e5b33895d12e9" have entirely different histories.
06cbd98b77
...
7c2ac8456e
|
|
@ -14,7 +14,6 @@ RUN apk update && \
|
|||
libstdc++ \
|
||||
libtool \
|
||||
linux-headers \
|
||||
ninja \
|
||||
m4 \
|
||||
perl \
|
||||
python3 \
|
||||
|
|
|
|||
|
|
@ -126,18 +126,19 @@ void fractal(
|
|||
|
||||
void handle_request_http(const address_type& addr, const request_type& req, bmrshared::web::response_promise promise) override
|
||||
{
|
||||
std::string target = req.target();
|
||||
std::string_view target = req.target();
|
||||
std::cout << "HTTP GET: " << target << std::endl;
|
||||
if (req.method() != boost::beast::http::verb::get)
|
||||
{
|
||||
// Other methods are not supported for directory acces
|
||||
//
|
||||
auto& bad_request = promise.CreateResponse<boost::beast::http::response<boost::beast::http::string_body>>(boost::beast::http::status::bad_request, req.version());
|
||||
boost::beast::http::response<boost::beast::http::string_body> bad_request{boost::beast::http::status::bad_request, req.version()};
|
||||
bad_request.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING);
|
||||
bad_request.set(boost::beast::http::field::content_type, "text/plain");
|
||||
bad_request.prepare_payload();
|
||||
bad_request.body() = "Bad request type.\nOnly GET and HEAD are expected for this URL.";
|
||||
bad_request.keep_alive(req.keep_alive());
|
||||
promise.SendResponse(std::move(bad_request));
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -149,12 +150,11 @@ void fractal(
|
|||
double offset_x = std::stoi(base_match[2]);
|
||||
double z = std::stoi(base_match[3]);
|
||||
|
||||
auto renderfn = [offset_y, offset_x, z, req, target, promise]() mutable
|
||||
auto renderfn = [offset_y, offset_x, z, req, promise]() mutable
|
||||
{
|
||||
std::cout << "HTTP inside render function " << target << std::endl;
|
||||
constexpr int pixel_width = 256;
|
||||
constexpr int pixel_height = 256;
|
||||
constexpr int max_iterations = 255;
|
||||
constexpr int max_iterations = 100;
|
||||
|
||||
|
||||
boost::gil::gray8_image_t image(pixel_width,pixel_height);
|
||||
|
|
@ -188,14 +188,13 @@ void fractal(
|
|||
boost::gil::view(image),
|
||||
boost::gil::image_write_info<boost::gil::jpeg_tag>(95));
|
||||
|
||||
auto& ok = promise.CreateResponse<boost::beast::http::response<boost::beast::http::string_body>>(boost::beast::http::status::ok, req.version());
|
||||
boost::beast::http::response<boost::beast::http::string_body> ok{boost::beast::http::status::ok, req.version()};
|
||||
ok.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING);
|
||||
ok.set(boost::beast::http::field::content_type, "image/jpeg");
|
||||
ok.body() = out_buffer.str();
|
||||
ok.keep_alive(req.keep_alive());
|
||||
ok.prepare_payload();
|
||||
|
||||
std::cout << " DONE HTTP inside render function " << target << std::endl;
|
||||
promise.SendResponse(std::move(ok));
|
||||
};
|
||||
|
||||
m_ioc.post(renderfn);
|
||||
|
|
@ -248,7 +247,7 @@ int main(int argc, char **argv)
|
|||
});
|
||||
|
||||
std::vector<std::jthread> threads;
|
||||
while(threads.size() < 4)
|
||||
while(threads.size() < 20)
|
||||
{
|
||||
threads.emplace_back([&ioc]{ioc.run();});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,66 +10,31 @@
|
|||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/beast.hpp>
|
||||
#include <functional>
|
||||
#include <utility>
|
||||
|
||||
namespace bmrshared::web
|
||||
{
|
||||
|
||||
class response_promise final
|
||||
{
|
||||
private:
|
||||
class response_writer_interface
|
||||
{
|
||||
public:
|
||||
virtual ~response_writer_interface() = default;
|
||||
virtual void write_response(boost::beast::tcp_stream&) = 0;
|
||||
};
|
||||
|
||||
template<typename TResponse>
|
||||
class response_writer : public response_writer_interface
|
||||
{
|
||||
public:
|
||||
template<typename... TArgs>
|
||||
response_writer(TArgs... args)
|
||||
: m_response(args...)
|
||||
{}
|
||||
|
||||
~response_writer() override = default;
|
||||
|
||||
void write_response(boost::beast::tcp_stream& stream) override
|
||||
{
|
||||
boost::beast::http::write(stream, m_response);
|
||||
}
|
||||
|
||||
TResponse& response()
|
||||
{
|
||||
return m_response;
|
||||
}
|
||||
|
||||
private:
|
||||
TResponse m_response;
|
||||
};
|
||||
|
||||
|
||||
public:
|
||||
using response_sender = std::function<void(boost::beast::tcp_stream& stream)>;
|
||||
using callback_on_response = std::function<void(response_sender fnSendResponse)>;
|
||||
using callback_on_response = std::function<void(const response_sender& fnSendResponse)>;
|
||||
|
||||
response_promise() = delete;
|
||||
explicit response_promise(callback_on_response cbOnResponse);
|
||||
~response_promise();
|
||||
|
||||
template<typename TResponse, typename... TArgs>
|
||||
TResponse& CreateResponse(TArgs&&... args)
|
||||
template<typename Body, typename Fields>
|
||||
void SendResponse(boost::beast::http::response<Body, Fields> response)
|
||||
{
|
||||
auto created = std::make_unique<response_writer<TResponse>>(std::forward<TArgs>(args)...);
|
||||
auto& resp = created->response();
|
||||
m_response_writer = std::move(created);
|
||||
return resp;
|
||||
response_sender responder = [&response](boost::beast::tcp_stream& stream)
|
||||
{
|
||||
boost::beast::http::write(stream, response);
|
||||
};
|
||||
m_call_on_response(responder);
|
||||
}
|
||||
|
||||
private:
|
||||
callback_on_response m_call_on_response;
|
||||
std::shared_ptr<response_writer_interface> m_response_writer;
|
||||
};
|
||||
} // namespace bmrshared::web
|
||||
|
|
|
|||
|
|
@ -50,12 +50,13 @@ void directory_request_handler::handle_request_http(const address_type& address,
|
|||
{
|
||||
// Other methods are not supported for directory acces
|
||||
//
|
||||
auto& bad_request = promise.CreateResponse<http::response<http::string_body>>(http::status::bad_request, req.version());
|
||||
http::response<http::string_body> bad_request{http::status::bad_request, req.version()};
|
||||
bad_request.set(http::field::server, BOOST_BEAST_VERSION_STRING);
|
||||
bad_request.set(http::field::content_type, "text/plain");
|
||||
bad_request.prepare_payload();
|
||||
bad_request.body() = "Bad request type.\nOnly GET and HEAD are expected for this URL.";
|
||||
bad_request.keep_alive(req.keep_alive());
|
||||
promise.SendResponse(std::move(bad_request));
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -85,12 +86,13 @@ void directory_request_handler::handle_request_http(const address_type& address,
|
|||
|
||||
if (!found_file)
|
||||
{
|
||||
auto& not_found = promise.CreateResponse<http::response<http::string_body>>(http::status::not_found, req.version());
|
||||
http::response<http::string_body> not_found{http::status::not_found, req.version()};
|
||||
not_found.set(http::field::server, BOOST_BEAST_VERSION_STRING);
|
||||
not_found.set(http::field::content_type, "text/plain");
|
||||
not_found.prepare_payload();
|
||||
not_found.body() = "File not found.";
|
||||
not_found.keep_alive(req.keep_alive());
|
||||
promise.SendResponse(std::move(not_found));
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -100,12 +102,13 @@ void directory_request_handler::handle_request_http(const address_type& address,
|
|||
body.open(found_file->native().c_str(), beast::file_mode::scan, ec);
|
||||
if (ec)
|
||||
{
|
||||
auto& internal = promise.CreateResponse<http::response<http::string_body>>(http::status::internal_server_error, req.version());
|
||||
http::response<http::string_body> internal{http::status::internal_server_error, req.version()};
|
||||
internal.set(http::field::server, BOOST_BEAST_VERSION_STRING);
|
||||
internal.set(http::field::content_type, "text/plain");
|
||||
internal.prepare_payload();
|
||||
internal.body() = "Internal server error.";
|
||||
internal.keep_alive(req.keep_alive());
|
||||
promise.SendResponse(std::move(internal));
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -116,9 +119,10 @@ void directory_request_handler::handle_request_http(const address_type& address,
|
|||
|
||||
if (req.count(http::field::if_modified_since) != 0 && (req[http::field::if_modified_since] == last_modified))
|
||||
{
|
||||
auto& head = promise.CreateResponse<http::response<http::string_body>>(http::status::not_modified, req.version());
|
||||
http::response<http::string_body> head{http::status::not_modified, req.version()};
|
||||
head.set(http::field::server, BOOST_BEAST_VERSION_STRING);
|
||||
head.keep_alive(req.keep_alive());
|
||||
promise.SendResponse(std::move(head));
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -126,27 +130,29 @@ void directory_request_handler::handle_request_http(const address_type& address,
|
|||
// Okay, we have a file so let us send the correct response.
|
||||
if (req.method() == http::verb::head)
|
||||
{
|
||||
auto& head = promise.CreateResponse<http::response<http::string_body>>(http::status::ok, req.version());
|
||||
http::response<http::string_body> head{http::status::ok, req.version()};
|
||||
head.set(http::field::server, BOOST_BEAST_VERSION_STRING);
|
||||
head.set(http::field::content_type, mime_type);
|
||||
head.set(http::field::last_modified, last_modified);
|
||||
head.set(http::field::cache_control, "public, max-age=360");
|
||||
head.set(http::field::content_length, std::to_string(file_size));
|
||||
head.keep_alive(req.keep_alive());
|
||||
promise.SendResponse(std::move(head));
|
||||
return;
|
||||
}
|
||||
else if (req.method() == http::verb::get)
|
||||
{/*
|
||||
auto& head = promise.CreateResponse<http::response<http::file_body>>(std::piecewise_construct,
|
||||
{
|
||||
http::response<http::file_body> head{std::piecewise_construct,
|
||||
std::make_tuple(std::move(body)),
|
||||
std::make_tuple(http::status::ok, req.version()));
|
||||
std::make_tuple(http::status::ok, req.version())};
|
||||
|
||||
head.set(http::field::server, BOOST_BEAST_VERSION_STRING);
|
||||
head.set(http::field::content_type, mime_type);
|
||||
head.set(http::field::last_modified, last_modified);
|
||||
head.set(http::field::cache_control, "public, max-age=360");
|
||||
head.set(http::field::content_length, std::to_string(file_size));
|
||||
head.keep_alive(req.keep_alive());*/
|
||||
head.keep_alive(req.keep_alive());
|
||||
promise.SendResponse(std::move(head));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -73,24 +73,6 @@ void internal_server::do_accept(boost::system::error_code ec, boost::asio::ip::t
|
|||
|
||||
void internal_server::session_process_queued()
|
||||
{
|
||||
// Send out all processed responses.
|
||||
for (auto& session : m_sessions)
|
||||
{
|
||||
if (session)
|
||||
{
|
||||
http_session& s = *session;
|
||||
|
||||
auto& our = s.m_outstanding_requests;
|
||||
|
||||
while (!our.empty() && our.front() && our.front()->m_response_sender)
|
||||
{
|
||||
auto& resp = our.front();
|
||||
(*resp->m_response_sender)(s.stream);
|
||||
our.pop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Determine how many outstanding requests we have.
|
||||
std::size_t count_responding = 0;
|
||||
for(const auto& session : m_sessions)
|
||||
|
|
@ -103,14 +85,78 @@ void internal_server::session_process_queued()
|
|||
{
|
||||
for(auto& session : m_sessions)
|
||||
{
|
||||
if (!session || (count_responding > m_max_simultaneous_requests))
|
||||
if (count_responding > m_max_simultaneous_requests)
|
||||
{
|
||||
// do nothing.
|
||||
break;
|
||||
}
|
||||
else if (session->m_state == session_state::queue_reading)
|
||||
else if (session && (session->m_state == session_state::queue_reading))
|
||||
{
|
||||
http_session& s = *session;
|
||||
s.m_state = session_state::reading;
|
||||
s.state = session_state::reading;
|
||||
s.parser.emplace(); // Create empty parser object.
|
||||
s.parser->body_limit(m_request_body_limit);
|
||||
s.stream.expires_after(m_incoming_request_timeout);
|
||||
|
||||
auto handler = [weak_server = weak_from_this(), session](boost::beast::error_code ec, std::size_t size)
|
||||
{
|
||||
auto shared_server = weak_server.lock();
|
||||
if (shared_server)
|
||||
{
|
||||
shared_server->session_on_read(session, ec, size);
|
||||
}
|
||||
};
|
||||
boost::beast::http::async_read(s.stream, s.buffer, *s.parser, handler);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// First empty out all sessions queued for getting a response as much as we are allowed.
|
||||
auto iter = m_sessions.begin();
|
||||
while ((count_responding < m_max_simultaneous_requests) && (iter != m_sessions.end()))
|
||||
{
|
||||
iter = std::find_if(
|
||||
iter,
|
||||
m_sessions.end(),
|
||||
[](std::shared_ptr<http_session> session)
|
||||
{
|
||||
return session && (session->state == session_state::queue_responding);
|
||||
});
|
||||
|
||||
if (iter != m_sessions.end())
|
||||
{
|
||||
auto& session = *iter;
|
||||
session->state = session_state::responding;
|
||||
++count_responding;
|
||||
|
||||
// Regular http request. Pass to request handler.
|
||||
auto finalize_response = [session, weak_server = weak_from_this()](const response_promise::response_sender& rs)
|
||||
{
|
||||
auto shared_server = weak_server.lock();
|
||||
if (shared_server)
|
||||
{
|
||||
rs(session->stream);
|
||||
// When we receive the response and have sent it, we proceed with reading
|
||||
shared_server->session_finalize_response(session);
|
||||
}
|
||||
};
|
||||
|
||||
const auto& address = session->stream.socket().remote_endpoint().address();
|
||||
m_handler.handle_request_http(address, session->parser->get(), response_promise(finalize_response));
|
||||
}
|
||||
}
|
||||
|
||||
if (count_responding < m_max_simultaneous_requests)
|
||||
{
|
||||
// Now we can dispatch reading.
|
||||
for (const auto& session : m_sessions)
|
||||
{
|
||||
if (session && (session->state == session_state::queue_reading))
|
||||
{
|
||||
http_session& s = *session;
|
||||
s.state = session_state::reading;
|
||||
s.parser.emplace(); // Create empty parser object.
|
||||
s.parser->body_limit(m_request_body_limit);
|
||||
s.stream.expires_after(m_incoming_request_timeout);
|
||||
|
|
@ -156,27 +202,8 @@ void internal_server::session_on_read(std::shared_ptr<http_session> session,
|
|||
}
|
||||
else
|
||||
{
|
||||
std::shared_ptr<http_request> req = s.m_outstanding_requests.emplace(std::make_shared<http_request>());
|
||||
|
||||
// A regular HTTP request.
|
||||
auto finalize_response = [session, weak_server = weak_from_this(), req](const response_promise::response_sender& rs)
|
||||
{
|
||||
// We have a response, add it to our spot in the queue.
|
||||
auto shared_server = weak_server.lock();
|
||||
if (shared_server)
|
||||
{
|
||||
// We have a response, add it to our spot in the queue.
|
||||
req->m_response_sender = rs;
|
||||
|
||||
// Queue processing of the queue so we can continue.
|
||||
shared_server->m_io_context.post([shared_server]{shared_server->session_process_queued();});
|
||||
}
|
||||
};
|
||||
|
||||
const auto& address = session->stream.socket().remote_endpoint().address();
|
||||
m_handler.handle_request_http(address, session->parser->get(), response_promise(finalize_response));
|
||||
|
||||
s.m_state = session_state::queue_reading;
|
||||
// Just a regular http request. Queue for handling later.
|
||||
s.state = session_state::queue_responding;
|
||||
session_process_queued();
|
||||
}
|
||||
}
|
||||
|
|
@ -185,7 +212,7 @@ void internal_server::session_finalize_response(std::shared_ptr<http_session> se
|
|||
{
|
||||
if(session)
|
||||
{
|
||||
session->m_state = session_state::queue_reading;
|
||||
session->state = session_state::queue_reading;
|
||||
session_process_queued();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,11 +21,6 @@ namespace detail {
|
|||
reading
|
||||
};
|
||||
|
||||
struct http_request
|
||||
{
|
||||
std::optional<response_promise::response_sender> m_response_sender;
|
||||
};
|
||||
|
||||
struct http_session {
|
||||
explicit http_session(boost::asio::ip::tcp::socket s)
|
||||
: stream(std::move(s))
|
||||
|
|
@ -34,9 +29,12 @@ namespace detail {
|
|||
boost::beast::flat_buffer buffer;
|
||||
boost::optional<boost::beast::http::request_parser<boost::beast::http::string_body>> parser;
|
||||
session_state m_state;
|
||||
std::queue<std::shared_ptr<http_request>> m_outstanding_requests;
|
||||
std::queue<> m_outstanding_requests;
|
||||
};
|
||||
|
||||
|
||||
|
||||
|
||||
class internal_server final : public std::enable_shared_from_this<internal_server> {
|
||||
public:
|
||||
using address_type = bmrshared::web::server::address_type;
|
||||
|
|
|
|||
|
|
@ -14,14 +14,4 @@ response_promise::response_promise(callback_on_response cbOnResponse)
|
|||
{
|
||||
}
|
||||
|
||||
response_promise::~response_promise()
|
||||
{
|
||||
if (m_response_writer)
|
||||
{
|
||||
m_call_on_response(
|
||||
[r = m_response_writer](boost::beast::tcp_stream& stream)
|
||||
{
|
||||
r->write_response(stream);
|
||||
});
|
||||
}
|
||||
}
|
||||
response_promise::~response_promise() = default;
|
||||
|
|
|
|||
Loading…
Reference in New Issue