network-experiment/bmrshared-web/lib/internal_server.cpp

197 lines
6.8 KiB
C++

// GNU Lesser General Public License v3.0
// Copyright (c) 2025 Bart Beumer <bart@4beumer.nl>
//
// This program is free software; you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License v3.0 as published by
// the Free Software Foundation.
//
#include "internal_server.h"
#include <bmrshared/request_handler_interface.hpp>
#include <algorithm>
using bmrshared::web::detail::internal_server;
internal_server::internal_server(boost::asio::io_context& io_context,
const boost::asio::ip::tcp::endpoint& listen_endpoint,
std::chrono::seconds incoming_request_timeout,
uint64_t request_body_limit,
unsigned int max_simultaneous_requests,
request_handler_interface& handler)
: m_io_context(io_context)
, m_endPoint(listen_endpoint)
, m_incoming_request_timeout(incoming_request_timeout)
, m_max_simultaneous_requests(max_simultaneous_requests)
, m_request_body_limit(request_body_limit)
, m_acceptor(io_context)
, m_handler(handler)
, m_sessions()
{
}
internal_server::~internal_server() = default;
void internal_server::run()
{
m_acceptor.open(m_endPoint.protocol());
m_acceptor.set_option(boost::asio::socket_base::reuse_address(true));
m_acceptor.bind(m_endPoint);
m_acceptor.listen(boost::asio::socket_base::max_listen_connections);
async_accept();
}
void internal_server::async_accept()
{
// Start accepting incoming connections asynchronously.
m_acceptor.async_accept(
[weak_server = weak_from_this()](boost::system::error_code ec, boost::asio::ip::tcp::socket socket)
{
auto shared_server = weak_server.lock();
if (shared_server)
{
shared_server->do_accept(ec, std::move(socket));
}
});
}
void internal_server::do_accept(boost::system::error_code ec, boost::asio::ip::tcp::socket socket)
{
if (ec)
{
// Error while accepting incoming connection. Ignore.
// TODO: something needs to be done here.
}
else
{
m_sessions.push_back(std::make_shared<http_session>(std::move(socket)));
session_process_queued();
async_accept();
}
}
void internal_server::session_process_queued()
{
// Send out all processed responses.
for (auto& session : m_sessions)
{
if (session)
{
http_session& s = *session;
auto& our = s.m_outstanding_requests;
while (!our.empty() && our.front() && our.front()->m_response_sender)
{
auto& resp = our.front();
(*resp->m_response_sender)(s.stream);
our.pop();
}
}
}
// Determine how many outstanding requests we have.
std::size_t count_responding = 0;
for(const auto& session : m_sessions)
{
count_responding += session->m_outstanding_requests.size();
}
// Allow for more outstanding requests unless we are already over the maximum.
if (count_responding < m_max_simultaneous_requests)
{
for(auto& session : m_sessions)
{
if (!session || (count_responding > m_max_simultaneous_requests))
{
// do nothing.
}
else if (session->m_state == session_state::queue_reading)
{
http_session& s = *session;
s.m_state = session_state::reading;
s.parser.emplace(); // Create empty parser object.
s.parser->body_limit(m_request_body_limit);
s.stream.expires_after(m_incoming_request_timeout);
auto handler = [weak_server = weak_from_this(), session](boost::beast::error_code ec, std::size_t size)
{
auto shared_server = weak_server.lock();
if (shared_server)
{
shared_server->session_on_read(session, ec, size);
}
};
boost::beast::http::async_read(s.stream, s.buffer, *s.parser, handler);
}
}
}
}
void internal_server::session_on_read(std::shared_ptr<http_session> session,
boost::beast::error_code ec,
std::size_t size)
{
http_session& s = *session;
if (ec == boost::beast::http::error::end_of_stream)
{
// End of stream. Shutdown and remove http session
s.stream.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec);
session_remove(session);
}
else if (ec)
{
// Any other kind of error, also shutdown and remove http_session
s.stream.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec);
session_remove(session);
}
else if (boost::beast::websocket::is_upgrade(s.parser->get()))
{
// Received a websocket upgrade. pass socket to request handler, remove session from our administration.
const auto& address = s.stream.socket().remote_endpoint().address();
m_handler.handle_request_websocket_upgrade(address, s.parser->get(), s.stream.release_socket());
session_remove(session);
}
else
{
std::shared_ptr<http_request> req = s.m_outstanding_requests.emplace(std::make_shared<http_request>());
// A regular HTTP request.
auto finalize_response = [session, weak_server = weak_from_this(), req](const response_promise::response_sender& rs)
{
// We have a response, add it to our spot in the queue.
auto shared_server = weak_server.lock();
if (shared_server)
{
// We have a response, add it to our spot in the queue.
req->m_response_sender = rs;
// Queue processing of the queue so we can continue.
shared_server->m_io_context.post([shared_server]{shared_server->session_process_queued();});
}
};
const auto& address = session->stream.socket().remote_endpoint().address();
m_handler.handle_request_http(address, session->parser->get(), response_promise(finalize_response));
s.m_state = session_state::queue_reading;
session_process_queued();
}
}
void internal_server::session_finalize_response(std::shared_ptr<http_session> session)
{
if(session)
{
session->m_state = session_state::queue_reading;
session_process_queued();
}
}
void internal_server::session_remove(std::shared_ptr<http_session> session)
{
m_sessions.erase(std::remove(m_sessions.begin(), m_sessions.end(), session), m_sessions.end());
}