227 lines
8.2 KiB
C++
227 lines
8.2 KiB
C++
// GNU Lesser General Public License v3.0
|
|
// Copyright (c) 2025 Bart Beumer <bart@4beumer.nl>
|
|
//
|
|
// This program is free software; you can redistribute it and/or modify it
|
|
// under the terms of the GNU Lesser General Public License v3.0 as published by
|
|
// the Free Software Foundation.
|
|
//
|
|
#include "internal_server.h"
|
|
#include <bmrshared/request_handler_interface.hpp>
|
|
#include <algorithm>
|
|
#include <iostream>
|
|
using bmrshared::web::detail::internal_server;
|
|
|
|
|
|
internal_server::internal_server(boost::asio::io_context& io_context,
|
|
const boost::asio::ip::tcp::endpoint& listen_endpoint,
|
|
std::chrono::seconds incoming_request_timeout,
|
|
uint64_t request_body_limit,
|
|
unsigned int max_simultaneous_requests,
|
|
request_handler_interface& handler)
|
|
: m_io_context(io_context)
|
|
, m_strand(io_context)
|
|
, m_endPoint(listen_endpoint)
|
|
, m_incoming_request_timeout(incoming_request_timeout)
|
|
, m_max_simultaneous_requests(max_simultaneous_requests)
|
|
, m_request_body_limit(request_body_limit)
|
|
, m_acceptor(io_context)
|
|
, m_handler(handler)
|
|
, m_sessions()
|
|
{
|
|
}
|
|
|
|
internal_server::~internal_server() = default;
|
|
|
|
void internal_server::run()
|
|
{
|
|
m_acceptor.open(m_endPoint.protocol());
|
|
m_acceptor.set_option(boost::asio::socket_base::reuse_address(true));
|
|
m_acceptor.bind(m_endPoint);
|
|
m_acceptor.listen(boost::asio::socket_base::max_listen_connections);
|
|
|
|
async_accept();
|
|
}
|
|
|
|
void internal_server::async_accept()
|
|
{
|
|
// Start accepting incoming connections asynchronously.
|
|
m_acceptor.async_accept(
|
|
[weak_server = weak_from_this()](boost::system::error_code ec, boost::asio::ip::tcp::socket socket)
|
|
{
|
|
auto shared_server = weak_server.lock();
|
|
if(shared_server)
|
|
{
|
|
boost::asio::post(shared_server->m_strand, [weak_server, ec, s = std::make_shared<boost::asio::ip::tcp::socket>(std::move(socket))]
|
|
{
|
|
auto shared_server = weak_server.lock();
|
|
if (shared_server)
|
|
{
|
|
shared_server->do_accept(ec, std::move(*s));
|
|
}
|
|
});
|
|
}
|
|
});
|
|
}
|
|
|
|
void internal_server::do_accept(boost::system::error_code ec, boost::asio::ip::tcp::socket socket)
|
|
{
|
|
if (ec)
|
|
{
|
|
// Error while accepting incoming connection. Ignore.
|
|
// TODO: something needs to be done here.
|
|
}
|
|
else
|
|
{
|
|
auto sessionptr = std::make_shared<http_session>(std::move(socket));
|
|
m_sessions.push_back(sessionptr);
|
|
}
|
|
async_accept();
|
|
session_process_queued();
|
|
}
|
|
|
|
void internal_server::session_process_queued()
|
|
{
|
|
// Determine how many outstanding requests we have.
|
|
std::size_t count_responding = 0;
|
|
for(const auto& session : m_sessions)
|
|
{
|
|
if (session->m_state == session_state::wait_response)
|
|
{
|
|
count_responding++;
|
|
}
|
|
}
|
|
|
|
// Send out all processed responses.
|
|
for (auto& session : m_sessions)
|
|
{
|
|
if (session)
|
|
{
|
|
http_session& s = *session;
|
|
auto& our = s.m_outstanding_request;
|
|
|
|
if (our && our->m_response_sender)
|
|
{
|
|
(*our->m_response_sender)(s.stream);
|
|
our.reset();
|
|
s.m_state = session_state::queue_reading;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Allow for more outstanding requests unless we are already over the maximum.
|
|
if (count_responding < m_max_simultaneous_requests)
|
|
{
|
|
for(auto& session : m_sessions)
|
|
{
|
|
if (!session)
|
|
{
|
|
// do nothing.
|
|
}
|
|
else if (session->m_state == session_state::queue_reading)
|
|
{
|
|
http_session& s = *session;
|
|
s.m_state = session_state::reading;
|
|
s.parser.emplace(); // Create empty parser object.
|
|
s.parser->body_limit(m_request_body_limit);
|
|
s.stream.expires_after(m_incoming_request_timeout);
|
|
|
|
auto handler = [weak_server = weak_from_this(), session](boost::beast::error_code ec, std::size_t size)
|
|
{
|
|
auto shared_server = weak_server.lock();
|
|
if (shared_server)
|
|
{
|
|
boost::asio::post(shared_server->m_strand, [weak_server, session, ec, size]
|
|
{
|
|
auto shared_server = weak_server.lock();
|
|
if (shared_server)
|
|
{
|
|
shared_server->session_on_read(session, ec, size);
|
|
}
|
|
});
|
|
}
|
|
};
|
|
boost::beast::http::async_read(s.stream, s.buffer, *s.parser, handler);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void internal_server::session_on_read(std::shared_ptr<http_session> session,
|
|
boost::beast::error_code ec,
|
|
std::size_t size)
|
|
{
|
|
http_session& s = *session;
|
|
|
|
if (ec)
|
|
{
|
|
// Any other kind of error, also shutdown and remove http_session
|
|
s.stream.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec);
|
|
session_remove(session);
|
|
}
|
|
else if (boost::beast::websocket::is_upgrade(s.parser->get()))
|
|
{
|
|
// Received a websocket upgrade. pass socket to request handler, remove session from our administration.
|
|
const auto& address = s.stream.socket().remote_endpoint().address();
|
|
m_handler.handle_request_websocket_upgrade(address, s.parser->get(), s.stream.release_socket());
|
|
session_remove(session);
|
|
}
|
|
else
|
|
{
|
|
std::shared_ptr<http_request> req = std::make_shared<http_request>();
|
|
s.m_outstanding_request = req;
|
|
|
|
// A regular HTTP request.
|
|
auto finalize_response = [session, weak_server = weak_from_this(), req](const request_response::response_sender& rs)
|
|
{
|
|
// We have a response, add it to our spot in the queue.
|
|
auto shared_server = weak_server.lock();
|
|
if (shared_server)
|
|
{
|
|
// Queue processing of the queue so we can continue.
|
|
boost::asio::post(shared_server->m_strand,
|
|
[shared_server, req, rs]
|
|
{
|
|
req->m_response_sender = rs;
|
|
shared_server->session_process_queued();
|
|
});
|
|
}
|
|
};
|
|
|
|
const auto& address = session->stream.socket().remote_endpoint().address();
|
|
const auto& request = session->parser->get();
|
|
|
|
auto rs = request_response(request, finalize_response);
|
|
auto& internal = rs.create_response<boost::beast::http::response<boost::beast::http::string_body>>(boost::beast::http::status::internal_server_error, request.version());
|
|
internal.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING);
|
|
internal.set(boost::beast::http::field::content_type, "text/plain");
|
|
internal.prepare_payload();
|
|
internal.body() = "Internal server error.";
|
|
internal.keep_alive(request.keep_alive());
|
|
|
|
m_handler.handle_request_http(rs);
|
|
|
|
s.m_state = session_state::wait_response;
|
|
}
|
|
session_process_queued();
|
|
}
|
|
|
|
void internal_server::session_finalize_response(std::shared_ptr<http_session> session)
|
|
{
|
|
if(session)
|
|
{
|
|
session->m_state = session_state::queue_reading;
|
|
session_process_queued();
|
|
}
|
|
}
|
|
|
|
void internal_server::session_remove(std::shared_ptr<http_session> session)
|
|
{
|
|
m_sessions.erase(std::remove_if(m_sessions.begin(),
|
|
m_sessions.end(),
|
|
[session](const std::shared_ptr<http_session>& s)
|
|
{
|
|
return session.get() == s.get();
|
|
}),
|
|
m_sessions.end());
|
|
}
|