// GNU Lesser General Public License v3.0 // Copyright (c) 2025 Bart Beumer // // This program is free software; you can redistribute it and/or modify it // under the terms of the GNU Lesser General Public License v3.0 as published by // the Free Software Foundation. // #include "internal_server.h" #include #include using bmrshared::web::detail::internal_server; internal_server::internal_server(boost::asio::io_context& io_context, const boost::asio::ip::tcp::endpoint& listen_endpoint, std::chrono::seconds incoming_request_timeout, uint64_t request_body_limit, unsigned int max_simultaneous_requests, request_handler_interface& handler) : m_io_context(io_context) , m_endPoint(listen_endpoint) , m_incoming_request_timeout(incoming_request_timeout) , m_max_simultaneous_requests(max_simultaneous_requests) , m_request_body_limit(request_body_limit) , m_acceptor(io_context) , m_handler(handler) , m_sessions() { } internal_server::~internal_server() = default; void internal_server::run() { m_acceptor.open(m_endPoint.protocol()); m_acceptor.set_option(boost::asio::socket_base::reuse_address(true)); m_acceptor.bind(m_endPoint); m_acceptor.listen(boost::asio::socket_base::max_listen_connections); async_accept(); } void internal_server::async_accept() { // Start accepting incoming connections asynchronously. m_acceptor.async_accept( [weak_server = weak_from_this()](boost::system::error_code ec, boost::asio::ip::tcp::socket socket) { auto shared_server = weak_server.lock(); if (shared_server) { shared_server->do_accept(ec, std::move(socket)); } }); } void internal_server::do_accept(boost::system::error_code ec, boost::asio::ip::tcp::socket socket) { if (ec) { // Error while accepting incoming connection. Ignore. // TODO: something needs to be done here. } else { m_sessions.push_back(std::make_shared(std::move(socket))); session_process_queued(); async_accept(); } } void internal_server::session_process_queued() { std::size_t count_responding = std::count_if(m_sessions.begin(), m_sessions.end(), [](const std::shared_ptr& session) { return session && (session->state == session_state::responding); }); // First empty out all sessions queued for getting a response as much as we are allowed. auto iter = m_sessions.begin(); while ((count_responding < m_max_simultaneous_requests) && (iter != m_sessions.end())) { iter = std::find_if( iter, m_sessions.end(), [](std::shared_ptr session) { return session && (session->state == session_state::queue_responding); }); if (iter != m_sessions.end()) { auto& session = *iter; session->state = session_state::responding; ++count_responding; // Regular http request. Pass to request handler. auto finalize_response = [session, weak_server = weak_from_this()](const response_promise::response_sender& rs) { auto shared_server = weak_server.lock(); if (shared_server) { rs(session->stream); // When we receive the response and have sent it, we proceed with reading shared_server->session_finalize_response(session); } }; const auto& address = session->stream.socket().remote_endpoint().address(); m_handler.handle_request_http(address, session->parser->get(), response_promise(finalize_response)); } } if (count_responding < m_max_simultaneous_requests) { // Now we can dispatch reading. for (const auto& session : m_sessions) { if (session && (session->state == session_state::queue_reading)) { http_session& s = *session; s.state = session_state::reading; s.parser.emplace(); // Create empty parser object. s.parser->body_limit(m_request_body_limit); s.stream.expires_after(m_incoming_request_timeout); auto handler = [weak_server = weak_from_this(), session](boost::beast::error_code ec, std::size_t size) { auto shared_server = weak_server.lock(); if (shared_server) { shared_server->session_on_read(session, ec, size); } }; boost::beast::http::async_read(s.stream, s.buffer, *s.parser, handler); } } } } void internal_server::session_on_read(std::shared_ptr session, boost::beast::error_code ec, std::size_t size) { http_session& s = *session; if (ec == boost::beast::http::error::end_of_stream) { // End of stream. Shutdown and remove http session s.stream.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec); session_remove(session); } else if (ec) { // Any other kind of error, also shutdown and remove http_session s.stream.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec); session_remove(session); } else if (boost::beast::websocket::is_upgrade(s.parser->get())) { // Received a websocket upgrade. pass socket to request handler, remove session from our administration. const auto& address = s.stream.socket().remote_endpoint().address(); m_handler.handle_request_websocket_upgrade(address, s.parser->get(), s.stream.release_socket()); session_remove(session); } else { // Just a regular http request. Queue for handling later. s.state = session_state::queue_responding; session_process_queued(); } } void internal_server::session_finalize_response(std::shared_ptr session) { if(session) { session->state = session_state::queue_reading; session_process_queued(); } } void internal_server::session_remove(std::shared_ptr session) { m_sessions.erase(std::remove(m_sessions.begin(), m_sessions.end(), session), m_sessions.end()); }