// GNU Lesser General Public License v3.0 // Copyright (c) 2025 Bart Beumer // // This program is free software; you can redistribute it and/or modify it // under the terms of the GNU Lesser General Public License v3.0 as published by // the Free Software Foundation. // #include "internal_server.h" #include #include using bmrshared::web::detail::internal_server; internal_server::internal_server(boost::asio::io_context& io_context, const boost::asio::ip::tcp::endpoint& listen_endpoint, std::chrono::seconds incoming_request_timeout, uint64_t request_body_limit, unsigned int max_simultaneous_requests, request_handler_interface& handler) : m_io_context(io_context) , m_endPoint(listen_endpoint) , m_incoming_request_timeout(incoming_request_timeout) , m_max_simultaneous_requests(max_simultaneous_requests) , m_request_body_limit(request_body_limit) , m_acceptor(io_context) , m_handler(handler) , m_sessions() { } internal_server::~internal_server() = default; void internal_server::run() { m_acceptor.open(m_endPoint.protocol()); m_acceptor.set_option(boost::asio::socket_base::reuse_address(true)); m_acceptor.bind(m_endPoint); m_acceptor.listen(boost::asio::socket_base::max_listen_connections); async_accept(); } void internal_server::async_accept() { // Start accepting incoming connections asynchronously. m_acceptor.async_accept( [weak_server = weak_from_this()](boost::system::error_code ec, boost::asio::ip::tcp::socket socket) { auto shared_server = weak_server.lock(); if (shared_server) { shared_server->do_accept(ec, std::move(socket)); } }); } void internal_server::do_accept(boost::system::error_code ec, boost::asio::ip::tcp::socket socket) { if (ec) { // Error while accepting incoming connection. Ignore. // TODO: something needs to be done here. } else { m_sessions.push_back(std::make_shared(std::move(socket))); session_process_queued(); async_accept(); } } void internal_server::session_process_queued() { // Send out all processed responses. for (auto& session : m_sessions) { if (session) { http_session& s = *session; auto& our = s.m_outstanding_requests; while (!our.empty() && our.front() && our.front()->m_response_sender) { auto& resp = our.front(); (*resp->m_response_sender)(s.stream); our.pop(); } } } // Determine how many outstanding requests we have. std::size_t count_responding = 0; for(const auto& session : m_sessions) { count_responding += session->m_outstanding_requests.size(); } // Allow for more outstanding requests unless we are already over the maximum. if (count_responding < m_max_simultaneous_requests) { for(auto& session : m_sessions) { if (!session || (count_responding > m_max_simultaneous_requests)) { // do nothing. } else if (session->m_state == session_state::queue_reading) { http_session& s = *session; s.m_state = session_state::reading; s.parser.emplace(); // Create empty parser object. s.parser->body_limit(m_request_body_limit); s.stream.expires_after(m_incoming_request_timeout); auto handler = [weak_server = weak_from_this(), session](boost::beast::error_code ec, std::size_t size) { auto shared_server = weak_server.lock(); if (shared_server) { shared_server->session_on_read(session, ec, size); } }; boost::beast::http::async_read(s.stream, s.buffer, *s.parser, handler); } } } } void internal_server::session_on_read(std::shared_ptr session, boost::beast::error_code ec, std::size_t size) { http_session& s = *session; if (ec == boost::beast::http::error::end_of_stream) { // End of stream. Shutdown and remove http session s.stream.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec); session_remove(session); } else if (ec) { // Any other kind of error, also shutdown and remove http_session s.stream.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec); session_remove(session); } else if (boost::beast::websocket::is_upgrade(s.parser->get())) { // Received a websocket upgrade. pass socket to request handler, remove session from our administration. const auto& address = s.stream.socket().remote_endpoint().address(); m_handler.handle_request_websocket_upgrade(address, s.parser->get(), s.stream.release_socket()); session_remove(session); } else { std::shared_ptr req = s.m_outstanding_requests.emplace(std::make_shared()); // A regular HTTP request. auto finalize_response = [session, weak_server = weak_from_this(), req](const response_promise::response_sender& rs) { // We have a response, add it to our spot in the queue. auto shared_server = weak_server.lock(); if (shared_server) { // We have a response, add it to our spot in the queue. req->m_response_sender = rs; // Queue processing of the queue so we can continue. boost::asio::post(shared_server->m_io_context,[shared_server]{shared_server->session_process_queued();}); } }; const auto& address = session->stream.socket().remote_endpoint().address(); m_handler.handle_request_http(address, session->parser->get(), response_promise(finalize_response)); s.m_state = session_state::queue_reading; session_process_queued(); } } void internal_server::session_finalize_response(std::shared_ptr session) { if(session) { session->m_state = session_state::queue_reading; session_process_queued(); } } void internal_server::session_remove(std::shared_ptr session) { m_sessions.erase(std::remove(m_sessions.begin(), m_sessions.end(), session), m_sessions.end()); }