197 lines
6.8 KiB
C++
197 lines
6.8 KiB
C++
// GNU Lesser General Public License v3.0
|
|
// Copyright (c) 2025 Bart Beumer <bart@4beumer.nl>
|
|
//
|
|
// This program is free software; you can redistribute it and/or modify it
|
|
// under the terms of the GNU Lesser General Public License v3.0 as published by
|
|
// the Free Software Foundation.
|
|
//
|
|
#include "internal_server.h"
|
|
#include <bmrshared/request_handler_interface.hpp>
|
|
#include <algorithm>
|
|
|
|
using bmrshared::web::detail::internal_server;
|
|
|
|
|
|
internal_server::internal_server(boost::asio::io_context& io_context,
|
|
const boost::asio::ip::tcp::endpoint& listen_endpoint,
|
|
std::chrono::seconds incoming_request_timeout,
|
|
uint64_t request_body_limit,
|
|
unsigned int max_simultaneous_requests,
|
|
request_handler_interface& handler)
|
|
: m_io_context(io_context)
|
|
, m_endPoint(listen_endpoint)
|
|
, m_incoming_request_timeout(incoming_request_timeout)
|
|
, m_max_simultaneous_requests(max_simultaneous_requests)
|
|
, m_request_body_limit(request_body_limit)
|
|
, m_acceptor(io_context)
|
|
, m_handler(handler)
|
|
, m_sessions()
|
|
{
|
|
}
|
|
|
|
internal_server::~internal_server() = default;
|
|
|
|
void internal_server::run()
|
|
{
|
|
m_acceptor.open(m_endPoint.protocol());
|
|
m_acceptor.set_option(boost::asio::socket_base::reuse_address(true));
|
|
m_acceptor.bind(m_endPoint);
|
|
m_acceptor.listen(boost::asio::socket_base::max_listen_connections);
|
|
|
|
async_accept();
|
|
}
|
|
|
|
void internal_server::async_accept()
|
|
{
|
|
// Start accepting incoming connections asynchronously.
|
|
m_acceptor.async_accept(
|
|
[weak_server = weak_from_this()](boost::system::error_code ec, boost::asio::ip::tcp::socket socket)
|
|
{
|
|
auto shared_server = weak_server.lock();
|
|
if (shared_server)
|
|
{
|
|
shared_server->do_accept(ec, std::move(socket));
|
|
}
|
|
});
|
|
}
|
|
|
|
void internal_server::do_accept(boost::system::error_code ec, boost::asio::ip::tcp::socket socket)
|
|
{
|
|
if (ec)
|
|
{
|
|
// Error while accepting incoming connection. Ignore.
|
|
// TODO: something needs to be done here.
|
|
}
|
|
else
|
|
{
|
|
m_sessions.push_back(std::make_shared<http_session>(std::move(socket)));
|
|
session_process_queued();
|
|
async_accept();
|
|
|
|
}
|
|
}
|
|
|
|
void internal_server::session_process_queued()
|
|
{
|
|
// Send out all processed responses.
|
|
for (auto& session : m_sessions)
|
|
{
|
|
if (session)
|
|
{
|
|
http_session& s = *session;
|
|
|
|
auto& our = s.m_outstanding_requests;
|
|
|
|
while (!our.empty() && our.front() && our.front()->m_response_sender)
|
|
{
|
|
auto& resp = our.front();
|
|
(*resp->m_response_sender)(s.stream);
|
|
our.pop();
|
|
}
|
|
}
|
|
}
|
|
|
|
// Determine how many outstanding requests we have.
|
|
std::size_t count_responding = 0;
|
|
for(const auto& session : m_sessions)
|
|
{
|
|
count_responding += session->m_outstanding_requests.size();
|
|
}
|
|
|
|
// Allow for more outstanding requests unless we are already over the maximum.
|
|
if (count_responding < m_max_simultaneous_requests)
|
|
{
|
|
for(auto& session : m_sessions)
|
|
{
|
|
if (!session || (count_responding > m_max_simultaneous_requests))
|
|
{
|
|
// do nothing.
|
|
}
|
|
else if (session->m_state == session_state::queue_reading)
|
|
{
|
|
http_session& s = *session;
|
|
s.m_state = session_state::reading;
|
|
s.parser.emplace(); // Create empty parser object.
|
|
s.parser->body_limit(m_request_body_limit);
|
|
s.stream.expires_after(m_incoming_request_timeout);
|
|
|
|
auto handler = [weak_server = weak_from_this(), session](boost::beast::error_code ec, std::size_t size)
|
|
{
|
|
auto shared_server = weak_server.lock();
|
|
if (shared_server)
|
|
{
|
|
shared_server->session_on_read(session, ec, size);
|
|
}
|
|
};
|
|
boost::beast::http::async_read(s.stream, s.buffer, *s.parser, handler);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void internal_server::session_on_read(std::shared_ptr<http_session> session,
|
|
boost::beast::error_code ec,
|
|
std::size_t size)
|
|
{
|
|
http_session& s = *session;
|
|
|
|
if (ec == boost::beast::http::error::end_of_stream)
|
|
{
|
|
// End of stream. Shutdown and remove http session
|
|
s.stream.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec);
|
|
session_remove(session);
|
|
}
|
|
else if (ec)
|
|
{
|
|
// Any other kind of error, also shutdown and remove http_session
|
|
s.stream.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec);
|
|
session_remove(session);
|
|
}
|
|
else if (boost::beast::websocket::is_upgrade(s.parser->get()))
|
|
{
|
|
// Received a websocket upgrade. pass socket to request handler, remove session from our administration.
|
|
const auto& address = s.stream.socket().remote_endpoint().address();
|
|
m_handler.handle_request_websocket_upgrade(address, s.parser->get(), s.stream.release_socket());
|
|
session_remove(session);
|
|
}
|
|
else
|
|
{
|
|
std::shared_ptr<http_request> req = s.m_outstanding_requests.emplace(std::make_shared<http_request>());
|
|
|
|
// A regular HTTP request.
|
|
auto finalize_response = [session, weak_server = weak_from_this(), req](const response_promise::response_sender& rs)
|
|
{
|
|
// We have a response, add it to our spot in the queue.
|
|
auto shared_server = weak_server.lock();
|
|
if (shared_server)
|
|
{
|
|
// We have a response, add it to our spot in the queue.
|
|
req->m_response_sender = rs;
|
|
|
|
// Queue processing of the queue so we can continue.
|
|
shared_server->m_io_context.post([shared_server]{shared_server->session_process_queued();});
|
|
}
|
|
};
|
|
|
|
const auto& address = session->stream.socket().remote_endpoint().address();
|
|
m_handler.handle_request_http(address, session->parser->get(), response_promise(finalize_response));
|
|
|
|
s.m_state = session_state::queue_reading;
|
|
session_process_queued();
|
|
}
|
|
}
|
|
|
|
void internal_server::session_finalize_response(std::shared_ptr<http_session> session)
|
|
{
|
|
if(session)
|
|
{
|
|
session->m_state = session_state::queue_reading;
|
|
session_process_queued();
|
|
}
|
|
}
|
|
|
|
void internal_server::session_remove(std::shared_ptr<http_session> session)
|
|
{
|
|
m_sessions.erase(std::remove(m_sessions.begin(), m_sessions.end(), session), m_sessions.end());
|
|
}
|