Multithreaded local server
How can I get through very long and very dry, but also very useful technical documents when learning a new tool?
What Brexit proposals are on the table in the indicative votes on the 27th of March 2019?
Purchasing a ticket for someone else in another country?
How did Doctor Strange see the winning outcome in Avengers: Infinity War?
Gears on left are inverse to gears on right?
Sequence of Tenses: Translating the subjunctive
Two monoidal structures and copowering
How to check is there any negative term in a large list?
What does "I’d sit this one out, Cap," imply or mean in the context?
Closest Prime Number
Fastening aluminum fascia to wooden subfascia
Proof of work - lottery approach
Unreliable Magic - Is it worth it?
Is there a good way to store credentials outside of a password manager?
Failed to fetch jessie backports repository
Large drywall patch supports
Applicability of Single Responsibility Principle
How to Reset Passwords on Multiple Websites Easily?
India just shot down a satellite from the ground. At what altitude range is the resulting debris field?
Tiptoe or tiphoof? Adjusting words to better fit fantasy races
How to safely derail a train during transit?
How does the UK government determine the size of a mandate?
What is the best translation for "slot" in the context of multiplayer video games?
Why escape if the_content isnt?
Multithreaded local server
$begingroup$
I'm writing several local servers which have almost the code in the main.cpp
. Appreciate comments, improvement suggestions and especially notes on potential memory leaks. Thanks!
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include "UdsServer.hpp"
#include "RequestManager.hpp"
#include "MSutils.hpp" //MS::log()
void pullRequests();
const std::string pathToSocket = "/var/run/SomeServer.sock";
const std::string SERVICE_NAME = "SomeServer";
RequestManager service; //Does the actual processing of the request
std::unordered_map<int, std::string> requestQueue;
std::mutex requestQueue_mutex;
std::condition_variable processorsThreadSwitch;
bool gotNewRequests = false;
int main()
UdsServer app; //Server listening on a Unix Domain Socket
try
app.createServer(pathToSocket);
catch (const std::string & err)
MS::log(SERVICE_NAME, "Failed to start the service. Error: " + err, MS::MessageType::FatalException);
return -1;
unsigned n_concThreads = std::thread::hardware_concurrency();
if (!n_concThreads) //if the query failed...
std::ifstream cpuinfo("/proc/cpuinfo");
n_concThreads = std::count(std::istream_iterator<std::string>(cpuinfo),
std::istream_iterator<std::string>(),
std::string("processor"));
if (!n_concThreads)
n_concThreads = 6; // ~number of CPU cores. TODO: make the number of worker processes/threads configurable using a config file
for (int i = 0; i < n_concThreads; ++i)
std::thread t (pullRequests);
t.detach();
while ((int clientConnection = app.newConnectionEstablished()) > -1) //Uses accept() internally
std::string command = app.getMsg (clientConnection); //Uses read() internally
if (command.empty())
app.closeConnection(clientConnection);
else if (command == "SHUTDOWN")
app.closeConnection(clientConnection);
return 0;
else
//Anonymous scope just to get rid of the lock before notifying a thread
std::lock_guard<std::mutex> writeLock(requestQueue_mutex);
requestQueue[clientConnection] = std::move(command);
gotNewRequests = true;
processorsThreadSwitch.notify_one(); //nothing happens here if all threads are busy
void pullRequests()
UnixDomainSocket uds;
std::unique_lock<std::mutex> writeLock(requestQueue_mutex);
while (true) //Let the thread run "forever"
while (!gotNewRequests)
processorsThreadSwitch.wait(writeLock);
std::unordered_map<int, std::string> queueCopy (std::move(requestQueue));
requestQueue.clear();
gotNewRequests = false;
writeLock.unlock(); //Don't let the other threads wait when this threads doesn't need to access the shared data any more
if (queueCopy.empty())
continue;
else if (queueCopy.size() == 1)
std::string response = service.pullRequests(queueCopy.cbegin()->second);
if (response.length())
auto sendResult = uds.sendMsg(queueCopy.cbegin()->first, response);
if (!sendResult.isValid())
MS::log(SERVICE_NAME, "Could not send the response for request: " + queueCopy.begin()->second, MS::MessageType::Error);
if (!uds.closeConnection(queueCopy.begin()->first))
MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);
else //Multiplex
std::unordered_map<std::string, std::vector<int>> multiplexedRequests;
for (auto & request : queueCopy)
multiplexedRequests[std::move(request.second)].push_back(request.first);
for (const auto & request : multiplexedRequests)
std::string response = service.pullRequests(request.first);
if (response.length())
for (auto socket : request.second)
auto sendResult = uds.sendMsg(socket, response);
if (!sendResult.isValid())
MS::log(SERVICE_NAME, "Could not send the response for request: " + request.first, MS::MessageType::Error);
if (!uds.closeConnection(socket))
MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);
writeLock.lock();
c++ multithreading server
New contributor
$endgroup$
add a comment |
$begingroup$
I'm writing several local servers which have almost the code in the main.cpp
. Appreciate comments, improvement suggestions and especially notes on potential memory leaks. Thanks!
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include "UdsServer.hpp"
#include "RequestManager.hpp"
#include "MSutils.hpp" //MS::log()
void pullRequests();
const std::string pathToSocket = "/var/run/SomeServer.sock";
const std::string SERVICE_NAME = "SomeServer";
RequestManager service; //Does the actual processing of the request
std::unordered_map<int, std::string> requestQueue;
std::mutex requestQueue_mutex;
std::condition_variable processorsThreadSwitch;
bool gotNewRequests = false;
int main()
UdsServer app; //Server listening on a Unix Domain Socket
try
app.createServer(pathToSocket);
catch (const std::string & err)
MS::log(SERVICE_NAME, "Failed to start the service. Error: " + err, MS::MessageType::FatalException);
return -1;
unsigned n_concThreads = std::thread::hardware_concurrency();
if (!n_concThreads) //if the query failed...
std::ifstream cpuinfo("/proc/cpuinfo");
n_concThreads = std::count(std::istream_iterator<std::string>(cpuinfo),
std::istream_iterator<std::string>(),
std::string("processor"));
if (!n_concThreads)
n_concThreads = 6; // ~number of CPU cores. TODO: make the number of worker processes/threads configurable using a config file
for (int i = 0; i < n_concThreads; ++i)
std::thread t (pullRequests);
t.detach();
while ((int clientConnection = app.newConnectionEstablished()) > -1) //Uses accept() internally
std::string command = app.getMsg (clientConnection); //Uses read() internally
if (command.empty())
app.closeConnection(clientConnection);
else if (command == "SHUTDOWN")
app.closeConnection(clientConnection);
return 0;
else
//Anonymous scope just to get rid of the lock before notifying a thread
std::lock_guard<std::mutex> writeLock(requestQueue_mutex);
requestQueue[clientConnection] = std::move(command);
gotNewRequests = true;
processorsThreadSwitch.notify_one(); //nothing happens here if all threads are busy
void pullRequests()
UnixDomainSocket uds;
std::unique_lock<std::mutex> writeLock(requestQueue_mutex);
while (true) //Let the thread run "forever"
while (!gotNewRequests)
processorsThreadSwitch.wait(writeLock);
std::unordered_map<int, std::string> queueCopy (std::move(requestQueue));
requestQueue.clear();
gotNewRequests = false;
writeLock.unlock(); //Don't let the other threads wait when this threads doesn't need to access the shared data any more
if (queueCopy.empty())
continue;
else if (queueCopy.size() == 1)
std::string response = service.pullRequests(queueCopy.cbegin()->second);
if (response.length())
auto sendResult = uds.sendMsg(queueCopy.cbegin()->first, response);
if (!sendResult.isValid())
MS::log(SERVICE_NAME, "Could not send the response for request: " + queueCopy.begin()->second, MS::MessageType::Error);
if (!uds.closeConnection(queueCopy.begin()->first))
MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);
else //Multiplex
std::unordered_map<std::string, std::vector<int>> multiplexedRequests;
for (auto & request : queueCopy)
multiplexedRequests[std::move(request.second)].push_back(request.first);
for (const auto & request : multiplexedRequests)
std::string response = service.pullRequests(request.first);
if (response.length())
for (auto socket : request.second)
auto sendResult = uds.sendMsg(socket, response);
if (!sendResult.isValid())
MS::log(SERVICE_NAME, "Could not send the response for request: " + request.first, MS::MessageType::Error);
if (!uds.closeConnection(socket))
MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);
writeLock.lock();
c++ multithreading server
New contributor
$endgroup$
add a comment |
$begingroup$
I'm writing several local servers which have almost the code in the main.cpp
. Appreciate comments, improvement suggestions and especially notes on potential memory leaks. Thanks!
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include "UdsServer.hpp"
#include "RequestManager.hpp"
#include "MSutils.hpp" //MS::log()
void pullRequests();
const std::string pathToSocket = "/var/run/SomeServer.sock";
const std::string SERVICE_NAME = "SomeServer";
RequestManager service; //Does the actual processing of the request
std::unordered_map<int, std::string> requestQueue;
std::mutex requestQueue_mutex;
std::condition_variable processorsThreadSwitch;
bool gotNewRequests = false;
int main()
UdsServer app; //Server listening on a Unix Domain Socket
try
app.createServer(pathToSocket);
catch (const std::string & err)
MS::log(SERVICE_NAME, "Failed to start the service. Error: " + err, MS::MessageType::FatalException);
return -1;
unsigned n_concThreads = std::thread::hardware_concurrency();
if (!n_concThreads) //if the query failed...
std::ifstream cpuinfo("/proc/cpuinfo");
n_concThreads = std::count(std::istream_iterator<std::string>(cpuinfo),
std::istream_iterator<std::string>(),
std::string("processor"));
if (!n_concThreads)
n_concThreads = 6; // ~number of CPU cores. TODO: make the number of worker processes/threads configurable using a config file
for (int i = 0; i < n_concThreads; ++i)
std::thread t (pullRequests);
t.detach();
while ((int clientConnection = app.newConnectionEstablished()) > -1) //Uses accept() internally
std::string command = app.getMsg (clientConnection); //Uses read() internally
if (command.empty())
app.closeConnection(clientConnection);
else if (command == "SHUTDOWN")
app.closeConnection(clientConnection);
return 0;
else
//Anonymous scope just to get rid of the lock before notifying a thread
std::lock_guard<std::mutex> writeLock(requestQueue_mutex);
requestQueue[clientConnection] = std::move(command);
gotNewRequests = true;
processorsThreadSwitch.notify_one(); //nothing happens here if all threads are busy
void pullRequests()
UnixDomainSocket uds;
std::unique_lock<std::mutex> writeLock(requestQueue_mutex);
while (true) //Let the thread run "forever"
while (!gotNewRequests)
processorsThreadSwitch.wait(writeLock);
std::unordered_map<int, std::string> queueCopy (std::move(requestQueue));
requestQueue.clear();
gotNewRequests = false;
writeLock.unlock(); //Don't let the other threads wait when this threads doesn't need to access the shared data any more
if (queueCopy.empty())
continue;
else if (queueCopy.size() == 1)
std::string response = service.pullRequests(queueCopy.cbegin()->second);
if (response.length())
auto sendResult = uds.sendMsg(queueCopy.cbegin()->first, response);
if (!sendResult.isValid())
MS::log(SERVICE_NAME, "Could not send the response for request: " + queueCopy.begin()->second, MS::MessageType::Error);
if (!uds.closeConnection(queueCopy.begin()->first))
MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);
else //Multiplex
std::unordered_map<std::string, std::vector<int>> multiplexedRequests;
for (auto & request : queueCopy)
multiplexedRequests[std::move(request.second)].push_back(request.first);
for (const auto & request : multiplexedRequests)
std::string response = service.pullRequests(request.first);
if (response.length())
for (auto socket : request.second)
auto sendResult = uds.sendMsg(socket, response);
if (!sendResult.isValid())
MS::log(SERVICE_NAME, "Could not send the response for request: " + request.first, MS::MessageType::Error);
if (!uds.closeConnection(socket))
MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);
writeLock.lock();
c++ multithreading server
New contributor
$endgroup$
I'm writing several local servers which have almost the code in the main.cpp
. Appreciate comments, improvement suggestions and especially notes on potential memory leaks. Thanks!
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include "UdsServer.hpp"
#include "RequestManager.hpp"
#include "MSutils.hpp" //MS::log()
void pullRequests();
const std::string pathToSocket = "/var/run/SomeServer.sock";
const std::string SERVICE_NAME = "SomeServer";
RequestManager service; //Does the actual processing of the request
std::unordered_map<int, std::string> requestQueue;
std::mutex requestQueue_mutex;
std::condition_variable processorsThreadSwitch;
bool gotNewRequests = false;
int main()
UdsServer app; //Server listening on a Unix Domain Socket
try
app.createServer(pathToSocket);
catch (const std::string & err)
MS::log(SERVICE_NAME, "Failed to start the service. Error: " + err, MS::MessageType::FatalException);
return -1;
unsigned n_concThreads = std::thread::hardware_concurrency();
if (!n_concThreads) //if the query failed...
std::ifstream cpuinfo("/proc/cpuinfo");
n_concThreads = std::count(std::istream_iterator<std::string>(cpuinfo),
std::istream_iterator<std::string>(),
std::string("processor"));
if (!n_concThreads)
n_concThreads = 6; // ~number of CPU cores. TODO: make the number of worker processes/threads configurable using a config file
for (int i = 0; i < n_concThreads; ++i)
std::thread t (pullRequests);
t.detach();
while ((int clientConnection = app.newConnectionEstablished()) > -1) //Uses accept() internally
std::string command = app.getMsg (clientConnection); //Uses read() internally
if (command.empty())
app.closeConnection(clientConnection);
else if (command == "SHUTDOWN")
app.closeConnection(clientConnection);
return 0;
else
//Anonymous scope just to get rid of the lock before notifying a thread
std::lock_guard<std::mutex> writeLock(requestQueue_mutex);
requestQueue[clientConnection] = std::move(command);
gotNewRequests = true;
processorsThreadSwitch.notify_one(); //nothing happens here if all threads are busy
void pullRequests()
UnixDomainSocket uds;
std::unique_lock<std::mutex> writeLock(requestQueue_mutex);
while (true) //Let the thread run "forever"
while (!gotNewRequests)
processorsThreadSwitch.wait(writeLock);
std::unordered_map<int, std::string> queueCopy (std::move(requestQueue));
requestQueue.clear();
gotNewRequests = false;
writeLock.unlock(); //Don't let the other threads wait when this threads doesn't need to access the shared data any more
if (queueCopy.empty())
continue;
else if (queueCopy.size() == 1)
std::string response = service.pullRequests(queueCopy.cbegin()->second);
if (response.length())
auto sendResult = uds.sendMsg(queueCopy.cbegin()->first, response);
if (!sendResult.isValid())
MS::log(SERVICE_NAME, "Could not send the response for request: " + queueCopy.begin()->second, MS::MessageType::Error);
if (!uds.closeConnection(queueCopy.begin()->first))
MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);
else //Multiplex
std::unordered_map<std::string, std::vector<int>> multiplexedRequests;
for (auto & request : queueCopy)
multiplexedRequests[std::move(request.second)].push_back(request.first);
for (const auto & request : multiplexedRequests)
std::string response = service.pullRequests(request.first);
if (response.length())
for (auto socket : request.second)
auto sendResult = uds.sendMsg(socket, response);
if (!sendResult.isValid())
MS::log(SERVICE_NAME, "Could not send the response for request: " + request.first, MS::MessageType::Error);
if (!uds.closeConnection(socket))
MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);
writeLock.lock();
c++ multithreading server
c++ multithreading server
New contributor
New contributor
New contributor
asked 47 secs ago
Sceptical JuleSceptical Jule
101
101
New contributor
New contributor
add a comment |
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function ()
return StackExchange.using("mathjaxEditing", function ()
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
);
);
, "mathjax-editing");
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "196"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: false,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sceptical Jule is a new contributor. Be nice, and check out our Code of Conduct.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
var $window = $(window),
onScroll = function(e)
var $elem = $('.new-login-left'),
docViewTop = $window.scrollTop(),
docViewBottom = docViewTop + $window.height(),
elemTop = $elem.offset().top,
elemBottom = elemTop + $elem.height();
if ((docViewTop elemBottom))
StackExchange.using('gps', function() StackExchange.gps.track('embedded_signup_form.view', location: 'question_page' ); );
$window.unbind('scroll', onScroll);
;
$window.on('scroll', onScroll);
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f216379%2fmultithreaded-local-server%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Sceptical Jule is a new contributor. Be nice, and check out our Code of Conduct.
Sceptical Jule is a new contributor. Be nice, and check out our Code of Conduct.
Sceptical Jule is a new contributor. Be nice, and check out our Code of Conduct.
Sceptical Jule is a new contributor. Be nice, and check out our Code of Conduct.
Thanks for contributing an answer to Code Review Stack Exchange!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
Use MathJax to format equations. MathJax reference.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
var $window = $(window),
onScroll = function(e)
var $elem = $('.new-login-left'),
docViewTop = $window.scrollTop(),
docViewBottom = docViewTop + $window.height(),
elemTop = $elem.offset().top,
elemBottom = elemTop + $elem.height();
if ((docViewTop elemBottom))
StackExchange.using('gps', function() StackExchange.gps.track('embedded_signup_form.view', location: 'question_page' ); );
$window.unbind('scroll', onScroll);
;
$window.on('scroll', onScroll);
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f216379%2fmultithreaded-local-server%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
var $window = $(window),
onScroll = function(e)
var $elem = $('.new-login-left'),
docViewTop = $window.scrollTop(),
docViewBottom = docViewTop + $window.height(),
elemTop = $elem.offset().top,
elemBottom = elemTop + $elem.height();
if ((docViewTop elemBottom))
StackExchange.using('gps', function() StackExchange.gps.track('embedded_signup_form.view', location: 'question_page' ); );
$window.unbind('scroll', onScroll);
;
$window.on('scroll', onScroll);
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
var $window = $(window),
onScroll = function(e)
var $elem = $('.new-login-left'),
docViewTop = $window.scrollTop(),
docViewBottom = docViewTop + $window.height(),
elemTop = $elem.offset().top,
elemBottom = elemTop + $elem.height();
if ((docViewTop elemBottom))
StackExchange.using('gps', function() StackExchange.gps.track('embedded_signup_form.view', location: 'question_page' ); );
$window.unbind('scroll', onScroll);
;
$window.on('scroll', onScroll);
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
var $window = $(window),
onScroll = function(e)
var $elem = $('.new-login-left'),
docViewTop = $window.scrollTop(),
docViewBottom = docViewTop + $window.height(),
elemTop = $elem.offset().top,
elemBottom = elemTop + $elem.height();
if ((docViewTop elemBottom))
StackExchange.using('gps', function() StackExchange.gps.track('embedded_signup_form.view', location: 'question_page' ); );
$window.unbind('scroll', onScroll);
;
$window.on('scroll', onScroll);
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown