Multithreaded local server

How can I get through very long and very dry, but also very useful technical documents when learning a new tool?

What Brexit proposals are on the table in the indicative votes on the 27th of March 2019?

Purchasing a ticket for someone else in another country?

How did Doctor Strange see the winning outcome in Avengers: Infinity War?

Gears on left are inverse to gears on right?

Sequence of Tenses: Translating the subjunctive

Two monoidal structures and copowering

How to check is there any negative term in a large list?

What does "I’d sit this one out, Cap," imply or mean in the context?

Closest Prime Number

Fastening aluminum fascia to wooden subfascia

Proof of work - lottery approach

Unreliable Magic - Is it worth it?

Is there a good way to store credentials outside of a password manager?

Failed to fetch jessie backports repository

Large drywall patch supports

Applicability of Single Responsibility Principle

How to Reset Passwords on Multiple Websites Easily?

India just shot down a satellite from the ground. At what altitude range is the resulting debris field?

Tiptoe or tiphoof? Adjusting words to better fit fantasy races

How to safely derail a train during transit?

How does the UK government determine the size of a mandate?

What is the best translation for "slot" in the context of multiplayer video games?

Why escape if the_content isnt?



Multithreaded local server














0












$begingroup$


I'm writing several local servers which have almost the code in the main.cpp. Appreciate comments, improvement suggestions and especially notes on potential memory leaks. Thanks!



#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unordered_map>

#include "UdsServer.hpp"
#include "RequestManager.hpp"
#include "MSutils.hpp" //MS::log()



void pullRequests();


const std::string pathToSocket = "/var/run/SomeServer.sock";
const std::string SERVICE_NAME = "SomeServer";

RequestManager service; //Does the actual processing of the request


std::unordered_map<int, std::string> requestQueue;
std::mutex requestQueue_mutex;
std::condition_variable processorsThreadSwitch;
bool gotNewRequests = false;



int main()

UdsServer app; //Server listening on a Unix Domain Socket

try

app.createServer(pathToSocket);

catch (const std::string & err)

MS::log(SERVICE_NAME, "Failed to start the service. Error: " + err, MS::MessageType::FatalException);

return -1;


unsigned n_concThreads = std::thread::hardware_concurrency();

if (!n_concThreads) //if the query failed...

std::ifstream cpuinfo("/proc/cpuinfo");

n_concThreads = std::count(std::istream_iterator<std::string>(cpuinfo),
std::istream_iterator<std::string>(),
std::string("processor"));

if (!n_concThreads)
n_concThreads = 6; // ~number of CPU cores. TODO: make the number of worker processes/threads configurable using a config file


for (int i = 0; i < n_concThreads; ++i)

std::thread t (pullRequests);
t.detach();




while ((int clientConnection = app.newConnectionEstablished()) > -1) //Uses accept() internally

std::string command = app.getMsg (clientConnection); //Uses read() internally

if (command.empty())
app.closeConnection(clientConnection);
else if (command == "SHUTDOWN")

app.closeConnection(clientConnection);

return 0;

else

//Anonymous scope just to get rid of the lock before notifying a thread
std::lock_guard<std::mutex> writeLock(requestQueue_mutex);

requestQueue[clientConnection] = std::move(command);

gotNewRequests = true;


processorsThreadSwitch.notify_one(); //nothing happens here if all threads are busy






void pullRequests()

UnixDomainSocket uds;

std::unique_lock<std::mutex> writeLock(requestQueue_mutex);

while (true) //Let the thread run "forever"

while (!gotNewRequests)
processorsThreadSwitch.wait(writeLock);


std::unordered_map<int, std::string> queueCopy (std::move(requestQueue));
requestQueue.clear();

gotNewRequests = false;

writeLock.unlock(); //Don't let the other threads wait when this threads doesn't need to access the shared data any more


if (queueCopy.empty())
continue;
else if (queueCopy.size() == 1)

std::string response = service.pullRequests(queueCopy.cbegin()->second);


if (response.length())

auto sendResult = uds.sendMsg(queueCopy.cbegin()->first, response);

if (!sendResult.isValid())
MS::log(SERVICE_NAME, "Could not send the response for request: " + queueCopy.begin()->second, MS::MessageType::Error);


if (!uds.closeConnection(queueCopy.begin()->first))
MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);

else //Multiplex

std::unordered_map<std::string, std::vector<int>> multiplexedRequests;

for (auto & request : queueCopy)
multiplexedRequests[std::move(request.second)].push_back(request.first);

for (const auto & request : multiplexedRequests)

std::string response = service.pullRequests(request.first);

if (response.length())
for (auto socket : request.second)

auto sendResult = uds.sendMsg(socket, response);

if (!sendResult.isValid())
MS::log(SERVICE_NAME, "Could not send the response for request: " + request.first, MS::MessageType::Error);

if (!uds.closeConnection(socket))
MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);





writeLock.lock();










share







New contributor




Sceptical Jule is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.







$endgroup$
















    0












    $begingroup$


    I'm writing several local servers which have almost the code in the main.cpp. Appreciate comments, improvement suggestions and especially notes on potential memory leaks. Thanks!



    #include <iostream>
    #include <thread>
    #include <mutex>
    #include <condition_variable>
    #include <unordered_map>

    #include "UdsServer.hpp"
    #include "RequestManager.hpp"
    #include "MSutils.hpp" //MS::log()



    void pullRequests();


    const std::string pathToSocket = "/var/run/SomeServer.sock";
    const std::string SERVICE_NAME = "SomeServer";

    RequestManager service; //Does the actual processing of the request


    std::unordered_map<int, std::string> requestQueue;
    std::mutex requestQueue_mutex;
    std::condition_variable processorsThreadSwitch;
    bool gotNewRequests = false;



    int main()

    UdsServer app; //Server listening on a Unix Domain Socket

    try

    app.createServer(pathToSocket);

    catch (const std::string & err)

    MS::log(SERVICE_NAME, "Failed to start the service. Error: " + err, MS::MessageType::FatalException);

    return -1;


    unsigned n_concThreads = std::thread::hardware_concurrency();

    if (!n_concThreads) //if the query failed...

    std::ifstream cpuinfo("/proc/cpuinfo");

    n_concThreads = std::count(std::istream_iterator<std::string>(cpuinfo),
    std::istream_iterator<std::string>(),
    std::string("processor"));

    if (!n_concThreads)
    n_concThreads = 6; // ~number of CPU cores. TODO: make the number of worker processes/threads configurable using a config file


    for (int i = 0; i < n_concThreads; ++i)

    std::thread t (pullRequests);
    t.detach();




    while ((int clientConnection = app.newConnectionEstablished()) > -1) //Uses accept() internally

    std::string command = app.getMsg (clientConnection); //Uses read() internally

    if (command.empty())
    app.closeConnection(clientConnection);
    else if (command == "SHUTDOWN")

    app.closeConnection(clientConnection);

    return 0;

    else

    //Anonymous scope just to get rid of the lock before notifying a thread
    std::lock_guard<std::mutex> writeLock(requestQueue_mutex);

    requestQueue[clientConnection] = std::move(command);

    gotNewRequests = true;


    processorsThreadSwitch.notify_one(); //nothing happens here if all threads are busy






    void pullRequests()

    UnixDomainSocket uds;

    std::unique_lock<std::mutex> writeLock(requestQueue_mutex);

    while (true) //Let the thread run "forever"

    while (!gotNewRequests)
    processorsThreadSwitch.wait(writeLock);


    std::unordered_map<int, std::string> queueCopy (std::move(requestQueue));
    requestQueue.clear();

    gotNewRequests = false;

    writeLock.unlock(); //Don't let the other threads wait when this threads doesn't need to access the shared data any more


    if (queueCopy.empty())
    continue;
    else if (queueCopy.size() == 1)

    std::string response = service.pullRequests(queueCopy.cbegin()->second);


    if (response.length())

    auto sendResult = uds.sendMsg(queueCopy.cbegin()->first, response);

    if (!sendResult.isValid())
    MS::log(SERVICE_NAME, "Could not send the response for request: " + queueCopy.begin()->second, MS::MessageType::Error);


    if (!uds.closeConnection(queueCopy.begin()->first))
    MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);

    else //Multiplex

    std::unordered_map<std::string, std::vector<int>> multiplexedRequests;

    for (auto & request : queueCopy)
    multiplexedRequests[std::move(request.second)].push_back(request.first);

    for (const auto & request : multiplexedRequests)

    std::string response = service.pullRequests(request.first);

    if (response.length())
    for (auto socket : request.second)

    auto sendResult = uds.sendMsg(socket, response);

    if (!sendResult.isValid())
    MS::log(SERVICE_NAME, "Could not send the response for request: " + request.first, MS::MessageType::Error);

    if (!uds.closeConnection(socket))
    MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);





    writeLock.lock();










    share







    New contributor




    Sceptical Jule is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
    Check out our Code of Conduct.







    $endgroup$














      0












      0








      0





      $begingroup$


      I'm writing several local servers which have almost the code in the main.cpp. Appreciate comments, improvement suggestions and especially notes on potential memory leaks. Thanks!



      #include <iostream>
      #include <thread>
      #include <mutex>
      #include <condition_variable>
      #include <unordered_map>

      #include "UdsServer.hpp"
      #include "RequestManager.hpp"
      #include "MSutils.hpp" //MS::log()



      void pullRequests();


      const std::string pathToSocket = "/var/run/SomeServer.sock";
      const std::string SERVICE_NAME = "SomeServer";

      RequestManager service; //Does the actual processing of the request


      std::unordered_map<int, std::string> requestQueue;
      std::mutex requestQueue_mutex;
      std::condition_variable processorsThreadSwitch;
      bool gotNewRequests = false;



      int main()

      UdsServer app; //Server listening on a Unix Domain Socket

      try

      app.createServer(pathToSocket);

      catch (const std::string & err)

      MS::log(SERVICE_NAME, "Failed to start the service. Error: " + err, MS::MessageType::FatalException);

      return -1;


      unsigned n_concThreads = std::thread::hardware_concurrency();

      if (!n_concThreads) //if the query failed...

      std::ifstream cpuinfo("/proc/cpuinfo");

      n_concThreads = std::count(std::istream_iterator<std::string>(cpuinfo),
      std::istream_iterator<std::string>(),
      std::string("processor"));

      if (!n_concThreads)
      n_concThreads = 6; // ~number of CPU cores. TODO: make the number of worker processes/threads configurable using a config file


      for (int i = 0; i < n_concThreads; ++i)

      std::thread t (pullRequests);
      t.detach();




      while ((int clientConnection = app.newConnectionEstablished()) > -1) //Uses accept() internally

      std::string command = app.getMsg (clientConnection); //Uses read() internally

      if (command.empty())
      app.closeConnection(clientConnection);
      else if (command == "SHUTDOWN")

      app.closeConnection(clientConnection);

      return 0;

      else

      //Anonymous scope just to get rid of the lock before notifying a thread
      std::lock_guard<std::mutex> writeLock(requestQueue_mutex);

      requestQueue[clientConnection] = std::move(command);

      gotNewRequests = true;


      processorsThreadSwitch.notify_one(); //nothing happens here if all threads are busy






      void pullRequests()

      UnixDomainSocket uds;

      std::unique_lock<std::mutex> writeLock(requestQueue_mutex);

      while (true) //Let the thread run "forever"

      while (!gotNewRequests)
      processorsThreadSwitch.wait(writeLock);


      std::unordered_map<int, std::string> queueCopy (std::move(requestQueue));
      requestQueue.clear();

      gotNewRequests = false;

      writeLock.unlock(); //Don't let the other threads wait when this threads doesn't need to access the shared data any more


      if (queueCopy.empty())
      continue;
      else if (queueCopy.size() == 1)

      std::string response = service.pullRequests(queueCopy.cbegin()->second);


      if (response.length())

      auto sendResult = uds.sendMsg(queueCopy.cbegin()->first, response);

      if (!sendResult.isValid())
      MS::log(SERVICE_NAME, "Could not send the response for request: " + queueCopy.begin()->second, MS::MessageType::Error);


      if (!uds.closeConnection(queueCopy.begin()->first))
      MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);

      else //Multiplex

      std::unordered_map<std::string, std::vector<int>> multiplexedRequests;

      for (auto & request : queueCopy)
      multiplexedRequests[std::move(request.second)].push_back(request.first);

      for (const auto & request : multiplexedRequests)

      std::string response = service.pullRequests(request.first);

      if (response.length())
      for (auto socket : request.second)

      auto sendResult = uds.sendMsg(socket, response);

      if (!sendResult.isValid())
      MS::log(SERVICE_NAME, "Could not send the response for request: " + request.first, MS::MessageType::Error);

      if (!uds.closeConnection(socket))
      MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);





      writeLock.lock();










      share







      New contributor




      Sceptical Jule is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
      Check out our Code of Conduct.







      $endgroup$




      I'm writing several local servers which have almost the code in the main.cpp. Appreciate comments, improvement suggestions and especially notes on potential memory leaks. Thanks!



      #include <iostream>
      #include <thread>
      #include <mutex>
      #include <condition_variable>
      #include <unordered_map>

      #include "UdsServer.hpp"
      #include "RequestManager.hpp"
      #include "MSutils.hpp" //MS::log()



      void pullRequests();


      const std::string pathToSocket = "/var/run/SomeServer.sock";
      const std::string SERVICE_NAME = "SomeServer";

      RequestManager service; //Does the actual processing of the request


      std::unordered_map<int, std::string> requestQueue;
      std::mutex requestQueue_mutex;
      std::condition_variable processorsThreadSwitch;
      bool gotNewRequests = false;



      int main()

      UdsServer app; //Server listening on a Unix Domain Socket

      try

      app.createServer(pathToSocket);

      catch (const std::string & err)

      MS::log(SERVICE_NAME, "Failed to start the service. Error: " + err, MS::MessageType::FatalException);

      return -1;


      unsigned n_concThreads = std::thread::hardware_concurrency();

      if (!n_concThreads) //if the query failed...

      std::ifstream cpuinfo("/proc/cpuinfo");

      n_concThreads = std::count(std::istream_iterator<std::string>(cpuinfo),
      std::istream_iterator<std::string>(),
      std::string("processor"));

      if (!n_concThreads)
      n_concThreads = 6; // ~number of CPU cores. TODO: make the number of worker processes/threads configurable using a config file


      for (int i = 0; i < n_concThreads; ++i)

      std::thread t (pullRequests);
      t.detach();




      while ((int clientConnection = app.newConnectionEstablished()) > -1) //Uses accept() internally

      std::string command = app.getMsg (clientConnection); //Uses read() internally

      if (command.empty())
      app.closeConnection(clientConnection);
      else if (command == "SHUTDOWN")

      app.closeConnection(clientConnection);

      return 0;

      else

      //Anonymous scope just to get rid of the lock before notifying a thread
      std::lock_guard<std::mutex> writeLock(requestQueue_mutex);

      requestQueue[clientConnection] = std::move(command);

      gotNewRequests = true;


      processorsThreadSwitch.notify_one(); //nothing happens here if all threads are busy






      void pullRequests()

      UnixDomainSocket uds;

      std::unique_lock<std::mutex> writeLock(requestQueue_mutex);

      while (true) //Let the thread run "forever"

      while (!gotNewRequests)
      processorsThreadSwitch.wait(writeLock);


      std::unordered_map<int, std::string> queueCopy (std::move(requestQueue));
      requestQueue.clear();

      gotNewRequests = false;

      writeLock.unlock(); //Don't let the other threads wait when this threads doesn't need to access the shared data any more


      if (queueCopy.empty())
      continue;
      else if (queueCopy.size() == 1)

      std::string response = service.pullRequests(queueCopy.cbegin()->second);


      if (response.length())

      auto sendResult = uds.sendMsg(queueCopy.cbegin()->first, response);

      if (!sendResult.isValid())
      MS::log(SERVICE_NAME, "Could not send the response for request: " + queueCopy.begin()->second, MS::MessageType::Error);


      if (!uds.closeConnection(queueCopy.begin()->first))
      MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);

      else //Multiplex

      std::unordered_map<std::string, std::vector<int>> multiplexedRequests;

      for (auto & request : queueCopy)
      multiplexedRequests[std::move(request.second)].push_back(request.first);

      for (const auto & request : multiplexedRequests)

      std::string response = service.pullRequests(request.first);

      if (response.length())
      for (auto socket : request.second)

      auto sendResult = uds.sendMsg(socket, response);

      if (!sendResult.isValid())
      MS::log(SERVICE_NAME, "Could not send the response for request: " + request.first, MS::MessageType::Error);

      if (!uds.closeConnection(socket))
      MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);





      writeLock.lock();








      c++ multithreading server





      share







      New contributor




      Sceptical Jule is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
      Check out our Code of Conduct.










      share







      New contributor




      Sceptical Jule is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
      Check out our Code of Conduct.








      share



      share






      New contributor




      Sceptical Jule is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
      Check out our Code of Conduct.









      asked 47 secs ago









      Sceptical JuleSceptical Jule

      101




      101




      New contributor




      Sceptical Jule is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
      Check out our Code of Conduct.





      New contributor





      Sceptical Jule is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
      Check out our Code of Conduct.






      Sceptical Jule is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
      Check out our Code of Conduct.




















          0






          active

          oldest

          votes











          Your Answer





          StackExchange.ifUsing("editor", function ()
          return StackExchange.using("mathjaxEditing", function ()
          StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
          StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
          );
          );
          , "mathjax-editing");

          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "196"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: false,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: null,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );






          Sceptical Jule is a new contributor. Be nice, and check out our Code of Conduct.









          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f216379%2fmultithreaded-local-server%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          0






          active

          oldest

          votes








          0






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes








          Sceptical Jule is a new contributor. Be nice, and check out our Code of Conduct.









          draft saved

          draft discarded


















          Sceptical Jule is a new contributor. Be nice, and check out our Code of Conduct.












          Sceptical Jule is a new contributor. Be nice, and check out our Code of Conduct.











          Sceptical Jule is a new contributor. Be nice, and check out our Code of Conduct.














          Thanks for contributing an answer to Code Review Stack Exchange!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          Use MathJax to format equations. MathJax reference.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f216379%2fmultithreaded-local-server%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          名間水力發電廠 目录 沿革 設施 鄰近設施 註釋 外部連結 导航菜单23°50′10″N 120°42′41″E / 23.83611°N 120.71139°E / 23.83611; 120.7113923°50′10″N 120°42′41″E / 23.83611°N 120.71139°E / 23.83611; 120.71139計畫概要原始内容臺灣第一座BOT 模式開發的水力發電廠-名間水力電廠名間水力發電廠 水利署首件BOT案原始内容《小檔案》名間電廠 首座BOT水力發電廠原始内容名間電廠BOT - 經濟部水利署中區水資源局

          Prove that NP is closed under karp reduction?Space(n) not closed under Karp reductions - what about NTime(n)?Class P is closed under rotation?Prove or disprove that $NL$ is closed under polynomial many-one reductions$mathbfNC_2$ is closed under log-space reductionOn Karp reductionwhen can I know if a class (complexity) is closed under reduction (cook/karp)Check if class $PSPACE$ is closed under polyonomially space reductionIs NPSPACE also closed under polynomial-time reduction and under log-space reduction?Prove PSPACE is closed under complement?Prove PSPACE is closed under union?

          Is my guitar’s action too high? Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 23, 2019 at 23:30 UTC (7:30pm US/Eastern)Strings too stiff on a recently purchased acoustic guitar | Cort AD880CEIs the action of my guitar really high?Μy little finger is too weak to play guitarWith guitar, how long should I give my fingers to strengthen / callous?When playing a fret the guitar sounds mutedPlaying (Barre) chords up the guitar neckI think my guitar strings are wound too tight and I can't play barre chordsF barre chord on an SG guitarHow to find to the right strings of a barre chord by feel?High action on higher fret on my steel acoustic guitar