From aeb0d3c4455db28282c62cd894e084cc28a9d722 Mon Sep 17 00:00:00 2001 From: Bart Beumer Date: Thu, 6 Nov 2025 19:50:29 +0100 Subject: [PATCH] WIP --- .devcontainer/Dockerfile | 1 + applications/http-mandelbrot/src/main.cpp | 7 +- .../include/bmrshared/response_promise.hpp | 7 +- .../lib/directory_request_handler.cpp | 2 +- bmrshared-web/lib/internal_server.cpp | 115 +++++++----------- bmrshared-web/lib/internal_server.h | 10 +- 6 files changed, 61 insertions(+), 81 deletions(-) diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 796c970..fc2fd59 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -14,6 +14,7 @@ RUN apk update && \ libstdc++ \ libtool \ linux-headers \ + ninja \ m4 \ perl \ python3 \ diff --git a/applications/http-mandelbrot/src/main.cpp b/applications/http-mandelbrot/src/main.cpp index b539a98..1b1f9de 100644 --- a/applications/http-mandelbrot/src/main.cpp +++ b/applications/http-mandelbrot/src/main.cpp @@ -126,7 +126,7 @@ void fractal( void handle_request_http(const address_type& addr, const request_type& req, bmrshared::web::response_promise promise) override { - std::string_view target = req.target(); + std::string target = req.target(); std::cout << "HTTP GET: " << target << std::endl; if (req.method() != boost::beast::http::verb::get) { @@ -150,8 +150,9 @@ void fractal( double offset_x = std::stoi(base_match[2]); double z = std::stoi(base_match[3]); - auto renderfn = [offset_y, offset_x, z, req, promise]() mutable + auto renderfn = [offset_y, offset_x, z, req, target, promise]() mutable { + std::cout << "HTTP inside render function " << target << std::endl; constexpr int pixel_width = 256; constexpr int pixel_height = 256; constexpr int max_iterations = 100; @@ -195,6 +196,8 @@ void fractal( ok.keep_alive(req.keep_alive()); ok.prepare_payload(); promise.SendResponse(std::move(ok)); + + std::cout << " DONE HTTP inside render function " << target << std::endl; }; m_ioc.post(renderfn); diff --git a/bmrshared-web/include/bmrshared/response_promise.hpp b/bmrshared-web/include/bmrshared/response_promise.hpp index 079e119..2d239d0 100644 --- a/bmrshared-web/include/bmrshared/response_promise.hpp +++ b/bmrshared-web/include/bmrshared/response_promise.hpp @@ -27,11 +27,12 @@ class response_promise final template void SendResponse(boost::beast::http::response response) { - response_sender responder = [&response](boost::beast::tcp_stream& stream) + response_sender responder = [r = std::move(response)](boost::beast::tcp_stream& stream) { - boost::beast::http::write(stream, response); + boost::beast::http::write(stream, r); }; - m_call_on_response(responder); + + m_call_on_response(std::move(responder)); } private: diff --git a/bmrshared-web/lib/directory_request_handler.cpp b/bmrshared-web/lib/directory_request_handler.cpp index ba4889c..7f405dd 100644 --- a/bmrshared-web/lib/directory_request_handler.cpp +++ b/bmrshared-web/lib/directory_request_handler.cpp @@ -152,7 +152,7 @@ void directory_request_handler::handle_request_http(const address_type& address, head.set(http::field::cache_control, "public, max-age=360"); head.set(http::field::content_length, std::to_string(file_size)); head.keep_alive(req.keep_alive()); - promise.SendResponse(std::move(head)); + //promise.SendResponse(std::move(head)); return; } } diff --git a/bmrshared-web/lib/internal_server.cpp b/bmrshared-web/lib/internal_server.cpp index c0952d8..e96d5e0 100644 --- a/bmrshared-web/lib/internal_server.cpp +++ b/bmrshared-web/lib/internal_server.cpp @@ -73,6 +73,24 @@ void internal_server::do_accept(boost::system::error_code ec, boost::asio::ip::t void internal_server::session_process_queued() { + // Send out all processed responses. + for (auto& session : m_sessions) + { + if (session) + { + http_session& s = *session; + + auto& our = s.m_outstanding_requests; + + while (!our.empty() && our.front() && our.front()->m_response_sender) + { + auto& resp = our.front(); + (*resp->m_response_sender)(s.stream); + our.pop(); + } + } + } + // Determine how many outstanding requests we have. std::size_t count_responding = 0; for(const auto& session : m_sessions) @@ -85,78 +103,14 @@ void internal_server::session_process_queued() { for(auto& session : m_sessions) { - if (count_responding > m_max_simultaneous_requests) + if (!session || (count_responding > m_max_simultaneous_requests)) { - break; + // do nothing. } - else if (session && (session->m_state == session_state::queue_reading)) + else if (session->m_state == session_state::queue_reading) { http_session& s = *session; - s.state = session_state::reading; - s.parser.emplace(); // Create empty parser object. - s.parser->body_limit(m_request_body_limit); - s.stream.expires_after(m_incoming_request_timeout); - - auto handler = [weak_server = weak_from_this(), session](boost::beast::error_code ec, std::size_t size) - { - auto shared_server = weak_server.lock(); - if (shared_server) - { - shared_server->session_on_read(session, ec, size); - } - }; - boost::beast::http::async_read(s.stream, s.buffer, *s.parser, handler); - } - } - } - - - - - // First empty out all sessions queued for getting a response as much as we are allowed. - auto iter = m_sessions.begin(); - while ((count_responding < m_max_simultaneous_requests) && (iter != m_sessions.end())) - { - iter = std::find_if( - iter, - m_sessions.end(), - [](std::shared_ptr session) - { - return session && (session->state == session_state::queue_responding); - }); - - if (iter != m_sessions.end()) - { - auto& session = *iter; - session->state = session_state::responding; - ++count_responding; - - // Regular http request. Pass to request handler. - auto finalize_response = [session, weak_server = weak_from_this()](const response_promise::response_sender& rs) - { - auto shared_server = weak_server.lock(); - if (shared_server) - { - rs(session->stream); - // When we receive the response and have sent it, we proceed with reading - shared_server->session_finalize_response(session); - } - }; - - const auto& address = session->stream.socket().remote_endpoint().address(); - m_handler.handle_request_http(address, session->parser->get(), response_promise(finalize_response)); - } - } - - if (count_responding < m_max_simultaneous_requests) - { - // Now we can dispatch reading. - for (const auto& session : m_sessions) - { - if (session && (session->state == session_state::queue_reading)) - { - http_session& s = *session; - s.state = session_state::reading; + s.m_state = session_state::reading; s.parser.emplace(); // Create empty parser object. s.parser->body_limit(m_request_body_limit); s.stream.expires_after(m_incoming_request_timeout); @@ -202,8 +156,27 @@ void internal_server::session_on_read(std::shared_ptr session, } else { - // Just a regular http request. Queue for handling later. - s.state = session_state::queue_responding; + std::shared_ptr req = s.m_outstanding_requests.emplace(std::make_shared()); + + // A regular HTTP request. + auto finalize_response = [session, weak_server = weak_from_this(), req](const response_promise::response_sender& rs) + { + // We have a response, add it to our spot in the queue. + auto shared_server = weak_server.lock(); + if (shared_server) + { + // We have a response, add it to our spot in the queue. + req->m_response_sender = rs; + + // Queue processing of the queue so we can continue. + shared_server->m_io_context.post([shared_server]{shared_server->session_process_queued();}); + } + }; + + const auto& address = session->stream.socket().remote_endpoint().address(); + m_handler.handle_request_http(address, session->parser->get(), response_promise(finalize_response)); + + s.m_state = session_state::queue_reading; session_process_queued(); } } @@ -212,7 +185,7 @@ void internal_server::session_finalize_response(std::shared_ptr se { if(session) { - session->state = session_state::queue_reading; + session->m_state = session_state::queue_reading; session_process_queued(); } } diff --git a/bmrshared-web/lib/internal_server.h b/bmrshared-web/lib/internal_server.h index 89cdf02..f49ef65 100644 --- a/bmrshared-web/lib/internal_server.h +++ b/bmrshared-web/lib/internal_server.h @@ -21,6 +21,11 @@ namespace detail { reading }; + struct http_request + { + std::optional m_response_sender; + }; + struct http_session { explicit http_session(boost::asio::ip::tcp::socket s) : stream(std::move(s)) @@ -29,12 +34,9 @@ namespace detail { boost::beast::flat_buffer buffer; boost::optional> parser; session_state m_state; - std::queue<> m_outstanding_requests; + std::queue> m_outstanding_requests; }; - - - class internal_server final : public std::enable_shared_from_this { public: using address_type = bmrshared::web::server::address_type;